侧边栏壁纸
博主头像
colo

欲买桂花同载酒

  • 累计撰写 1823 篇文章
  • 累计收到 0 条评论

用户行为日志分析:计算PV/UV并排序

2025-12-12 / 0 评论 / 4 阅读

题目

用户行为日志分析:计算PV/UV并排序

信息

  • 类型:问答
  • 难度:⭐⭐

考点

DataFrame聚合操作, 性能优化, 数据倾斜处理, 排序策略

快速回答

核心解决方案:

  • 使用groupBy按用户分组
  • agg结合count计算PV,countDistinct计算UV
  • 通过orderBy(desc("pv"))降序排序
  • 优化:处理数据倾斜时添加随机前缀或使用repartition
## 解析

问题场景

给定用户行为日志DataFrame,包含user_idpage_id字段,要求:

  1. 计算每个用户的PV(总访问次数)
  2. 计算每个用户的UV(访问独立页面数)
  3. 按PV值降序排序输出

核心代码实现

import org.apache.spark.sql.functions._
import org.apache.spark.sql.DataFrame

// 输入数据Schema: | user_id: String | page_id: String | timestamp: Long |
def processUserLogs(logDF: DataFrame): DataFrame = {
  logDF.groupBy("user_id")
    .agg(
      count("*").alias("pv"),          // 计算总访问次数
      countDistinct("page_id").alias("uv") // 计算独立页面数
    )
    .orderBy(desc("pv"))  // 按PV降序排序
}

关键考点解析

1. 聚合函数选择

  • PV计算count("*")统计所有行,比count(col)更高效(避免null检查)
  • UV计算:必须使用countDistinct而非count,后者会重复计数相同页面

2. 性能优化策略

  • 数据倾斜处理:当某些user_id数据量极大时:
    // 方法1:添加随机前缀
    val saltedDF = logDF.withColumn("salted_uid", 
      concat(col("user_id"), lit("_"), (rand() * 10).cast("int"))
    )
    saltedDF.groupBy("salted_uid")
      .agg(count("*").as("partial_pv"), approx_count_distinct("page_id").as("partial_uv"))
      .groupBy(substring_index(col("salted_uid"), "_", 1).as("user_id")) // 移除前缀
      .agg(sum("partial_pv").as("pv"), sum("partial_uv").as("uv"))
    
    // 方法2:开启AQE自适应查询(Spark 3.0+)
    spark.conf.set("spark.sql.adaptive.enabled", true)
  • 内存优化:对user_id使用repartition避免OOM
    logDF.repartition(1000, col("user_id")) // 根据集群规模调整分区数

3. 排序陷阱

  • 全局排序风险orderBy会导致全局排序,大数据集应加limit或改用sortWithinPartitions
  • 替代方案:若只需TopN用户,优先使用Window函数:
    import org.apache.spark.sql.expressions.Window
    val w = Window.orderBy(desc("pv"))
    aggDF.withColumn("rank", rank().over(w))
      .filter(col("rank") <= 100) // 取Top100

常见错误

错误类型后果修正方案
误用count(page_id)计算UV重复页面被多次计数改用countDistinct
未处理数据倾斜部分Task执行缓慢甚至OOM添加随机前缀或开启AQE
全局排序未限制结果Driver内存溢出添加limit或分阶段排序

扩展知识

  • 近似UV计算:海量数据时用approx_count_distinct替代countDistinct,误差<5%但性能提升10倍
  • 结构化流处理:实时场景可扩展为流式计算:
    val streamingDF = spark.readStream.schema(schema).json("path")
    streamingDF.groupBy("user_id")
      .agg(count("*").alias("pv"))
      .writeStream.outputMode("complete").format("console").start()
  • 存储优化:结果写入Parquet时启用Z-Order优化查询:
    df.write.option("optimizeWrite", "true").parquet("output_path")