侧边栏壁纸
博主头像
colo

欲买桂花同载酒

  • 累计撰写 1823 篇文章
  • 累计收到 0 条评论

使用Spark DataFrame过滤并统计CSV文件中特定条件的记录数

2025-12-11 / 0 评论 / 4 阅读

题目

使用Spark DataFrame过滤并统计CSV文件中特定条件的记录数

信息

  • 类型:问答
  • 难度:⭐

考点

DataFrame API基础, 条件过滤, 行动操作, 数据读取

快速回答

实现步骤:

  1. 使用SparkSession.read.csv()加载CSV文件
  2. filter()where()进行条件过滤
  3. 调用count()行动操作获取结果
  4. 注意处理列名和数据类型
## 解析

问题场景

实际开发中经常需要从数据源筛选特定数据并统计数量。例如:统计CSV文件中年龄大于30岁的用户数量。

解决方案

import org.apache.spark.sql.SparkSession

object SimpleFilter {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .appName("CSV_Filter_Example")
      .master("local[*]")
      .getOrCreate()

    // 读取CSV(含表头)
    val df = spark.read
      .option("header", "true")
      .option("inferSchema", "true")  // 自动推断数据类型
      .csv("path/to/users.csv")

    // 过滤并统计
    val count = df.filter("age > 30").count()

    println(s"大于30岁的用户数量: $count")
    spark.stop()
  }
}

原理说明

  • 惰性执行filter()是转换操作(Transformation),直到count()行动操作(Action)时才触发计算
  • 条件表达式:支持SQL语法字符串(如"age > 30")或Column对象(如df("age") > 30
  • 数据读取header=true将首行作为列名,inferSchema=true自动解析数据类型

最佳实践

  1. 显式定义Schema:替代inferSchema,避免性能开销和类型错误
    import org.apache.spark.sql.types._
    
    val schema = StructType(Seq(
      StructField("id", IntegerType),
      StructField("name", StringType),
      StructField("age", IntegerType)
    ))
    
    val df = spark.read.schema(schema).csv(...)
  2. 列名处理:列名含空格时使用反引号`column name`
  3. 资源释放:操作完成后调用spark.stop()

常见错误

错误类型示例解决方案
未指定列名df.filter("30 > age")确保CSV含表头或手动定义列名
类型不匹配比较字符串和数字检查数据类型或用cast()转换
路径错误csv("user.csv")使用绝对路径或检查HDFS路径

扩展知识

  • 多条件过滤df.filter("age > 30 AND name LIKE 'A%'")
  • DSL风格df.filter(df("age") > 30)支持编译时类型检查
  • 性能提示:对过滤后的DataFrame复用可缓存.cache()