题目
使用Spark DataFrame过滤并统计CSV文件中特定条件的记录数
信息
- 类型:问答
- 难度:⭐
考点
DataFrame API基础, 条件过滤, 行动操作, 数据读取
快速回答
实现步骤:
- 使用
SparkSession.read.csv()加载CSV文件 - 用
filter()或where()进行条件过滤 - 调用
count()行动操作获取结果 - 注意处理列名和数据类型
问题场景
实际开发中经常需要从数据源筛选特定数据并统计数量。例如:统计CSV文件中年龄大于30岁的用户数量。
解决方案
import org.apache.spark.sql.SparkSession
object SimpleFilter {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("CSV_Filter_Example")
.master("local[*]")
.getOrCreate()
// 读取CSV(含表头)
val df = spark.read
.option("header", "true")
.option("inferSchema", "true") // 自动推断数据类型
.csv("path/to/users.csv")
// 过滤并统计
val count = df.filter("age > 30").count()
println(s"大于30岁的用户数量: $count")
spark.stop()
}
}原理说明
- 惰性执行:
filter()是转换操作(Transformation),直到count()行动操作(Action)时才触发计算 - 条件表达式:支持SQL语法字符串(如
"age > 30")或Column对象(如df("age") > 30) - 数据读取:
header=true将首行作为列名,inferSchema=true自动解析数据类型
最佳实践
- 显式定义Schema:替代
inferSchema,避免性能开销和类型错误import org.apache.spark.sql.types._ val schema = StructType(Seq( StructField("id", IntegerType), StructField("name", StringType), StructField("age", IntegerType) )) val df = spark.read.schema(schema).csv(...) - 列名处理:列名含空格时使用反引号
`column name` - 资源释放:操作完成后调用
spark.stop()
常见错误
| 错误类型 | 示例 | 解决方案 |
|---|---|---|
| 未指定列名 | df.filter("30 > age") | 确保CSV含表头或手动定义列名 |
| 类型不匹配 | 比较字符串和数字 | 检查数据类型或用cast()转换 |
| 路径错误 | csv("user.csv") | 使用绝对路径或检查HDFS路径 |
扩展知识
- 多条件过滤:
df.filter("age > 30 AND name LIKE 'A%'") - DSL风格:
df.filter(df("age") > 30)支持编译时类型检查 - 性能提示:对过滤后的DataFrame复用可缓存
.cache()