题目
优化销售数据分析:计算每个产品的总销售额和平均销售额
信息
- 类型:问答
- 难度:⭐⭐
考点
DataFrame API使用, 聚合函数应用, 执行计划优化, 数据倾斜处理
快速回答
核心实现步骤:
- 使用
groupBy按产品ID分组 - 应用
agg聚合函数计算总销售额和平均销售额 - 优化执行:避免
groupBy导致的全量数据Shuffle - 处理数据倾斜:添加随机前缀或使用
salting技术
关键优化点:
- 优先使用
reduceByKey替代groupByKey - 对倾斜Key进行分桶处理
- 监控Stage执行时间定位瓶颈
问题场景
在电商销售数据分析中,需要处理TB级交易数据,计算每个产品的总销售额和平均销售额。数据包含product_id, amount等字段,需考虑数据倾斜(热门商品交易量极大)和计算效率问题。
基础实现与问题
// 基础实现(存在性能问题)
val salesDF = spark.read.parquet("s3://bucket/sales_data")
val result = salesDF
.groupBy("product_id")
.agg(
sum("amount").alias("total_sales"),
avg("amount").alias("avg_sales")
)
result.write.parquet("s3://bucket/result")问题分析:
groupBy导致全量数据Shuffle,网络IO成为瓶颈- 当某些
product_id的记录量极大时(如热门商品),会引起数据倾斜 - 默认Hash分区可能使少数Executor负载过高
优化方案
1. 执行计划优化
// 查看执行计划(关键诊断步骤)
result.explain(true)
// 优化1:启用自适应查询执行(AQE)
spark.conf.set("spark.sql.adaptive.enabled", true)
// 优化2:使用Map端预聚合
val optimizedResult = salesDF
.groupBy("product_id")
.agg(
sum("amount").alias("total_sales"),
expr("sum(amount)/count(1)").alias("avg_sales") // 避免单独计算avg
)原理说明:
- AQE自动合并小分区,动态调整Join策略
- 用
sum/count替代avg减少计算步骤 - 通过
explain分析物理计划中的Exchange操作
2. 数据倾斜处理
// Salting技术解决倾斜
import org.apache.spark.sql.functions._
val saltedDF = salesDF
.withColumn("salt", (rand() * 100).cast("int")) // 添加0-99随机盐值
val saltedAgg = saltedDF
.groupBy("product_id", "salt")
.agg(sum("amount").as("partial_sum"), count("*").as("partial_count"))
val finalResult = saltedAgg
.groupBy("product_id")
.agg(
sum("partial_sum").as("total_sales"),
(sum("partial_sum") / sum("partial_count")).as("avg_sales")
)最佳实践:
- 倾斜Key识别:
salesDF.stat.freqItems(Seq("product_id"), 0.1) - 盐值范围根据倾斜程度调整(建议20-100倍于最大分区核心数)
- 两阶段聚合将负载分散
常见错误
- OOM问题: 在倾斜节点上Executor内存不足 → 增加
spark.executor.memoryOverhead - Shuffle溢出: 未启用AQE导致小文件过多 → 设置
spark.sql.adaptive.coalescePartitions.enabled=true - 计算不精确: 使用
approx_count_distinct时未设置误差率 → 明确指定rsd参数
扩展知识
- Spark 3.0优化: AQE自动处理倾斜Join,需设置
spark.sql.adaptive.skewJoin.enabled=true - 结构化流处理: 相同逻辑可应用于流计算,使用
groupByKey+mapGroupsWithState - 性能监控: 通过Spark UI观察:
- Shuffle Read Size/Records 是否均衡
- GC时间占比(应<10%)
- Scheduler Delay(理想状态≈0)