题目
用户行为日志分析:计算PV/UV并排序
信息
- 类型:问答
- 难度:⭐⭐
考点
DataFrame聚合操作, 性能优化, 数据倾斜处理, 排序策略
快速回答
核心解决方案:
- 使用
groupBy按用户分组 - 用
agg结合count计算PV,countDistinct计算UV - 通过
orderBy(desc("pv"))降序排序 - 优化:处理数据倾斜时添加随机前缀或使用
repartition
问题场景
给定用户行为日志DataFrame,包含user_id和page_id字段,要求:
- 计算每个用户的PV(总访问次数)
- 计算每个用户的UV(访问独立页面数)
- 按PV值降序排序输出
核心代码实现
import org.apache.spark.sql.functions._
import org.apache.spark.sql.DataFrame
// 输入数据Schema: | user_id: String | page_id: String | timestamp: Long |
def processUserLogs(logDF: DataFrame): DataFrame = {
logDF.groupBy("user_id")
.agg(
count("*").alias("pv"), // 计算总访问次数
countDistinct("page_id").alias("uv") // 计算独立页面数
)
.orderBy(desc("pv")) // 按PV降序排序
}关键考点解析
1. 聚合函数选择
- PV计算:
count("*")统计所有行,比count(col)更高效(避免null检查) - UV计算:必须使用
countDistinct而非count,后者会重复计数相同页面
2. 性能优化策略
- 数据倾斜处理:当某些user_id数据量极大时:
// 方法1:添加随机前缀 val saltedDF = logDF.withColumn("salted_uid", concat(col("user_id"), lit("_"), (rand() * 10).cast("int")) ) saltedDF.groupBy("salted_uid") .agg(count("*").as("partial_pv"), approx_count_distinct("page_id").as("partial_uv")) .groupBy(substring_index(col("salted_uid"), "_", 1).as("user_id")) // 移除前缀 .agg(sum("partial_pv").as("pv"), sum("partial_uv").as("uv")) // 方法2:开启AQE自适应查询(Spark 3.0+) spark.conf.set("spark.sql.adaptive.enabled", true) - 内存优化:对
user_id使用repartition避免OOMlogDF.repartition(1000, col("user_id")) // 根据集群规模调整分区数
3. 排序陷阱
- 全局排序风险:
orderBy会导致全局排序,大数据集应加limit或改用sortWithinPartitions - 替代方案:若只需TopN用户,优先使用
Window函数:import org.apache.spark.sql.expressions.Window val w = Window.orderBy(desc("pv")) aggDF.withColumn("rank", rank().over(w)) .filter(col("rank") <= 100) // 取Top100
常见错误
| 错误类型 | 后果 | 修正方案 |
|---|---|---|
误用count(page_id)计算UV | 重复页面被多次计数 | 改用countDistinct |
| 未处理数据倾斜 | 部分Task执行缓慢甚至OOM | 添加随机前缀或开启AQE |
| 全局排序未限制结果 | Driver内存溢出 | 添加limit或分阶段排序 |
扩展知识
- 近似UV计算:海量数据时用
approx_count_distinct替代countDistinct,误差<5%但性能提升10倍 - 结构化流处理:实时场景可扩展为流式计算:
val streamingDF = spark.readStream.schema(schema).json("path") streamingDF.groupBy("user_id") .agg(count("*").alias("pv")) .writeStream.outputMode("complete").format("console").start() - 存储优化:结果写入Parquet时启用
Z-Order优化查询:df.write.option("optimizeWrite", "true").parquet("output_path")