侧边栏壁纸
博主头像
colo

欲买桂花同载酒

  • 累计撰写 1823 篇文章
  • 累计收到 0 条评论

使用Spark DataFrame过滤和统计员工数据

2025-12-12 / 0 评论 / 4 阅读

题目

使用Spark DataFrame过滤和统计员工数据

信息

  • 类型:问答
  • 难度:⭐

考点

DataFrame创建, 数据过滤, 聚合操作

快速回答

核心操作步骤:

  1. 使用spark.read.csv加载CSV数据
  2. 通过filterwhere筛选部门为'Engineering'的员工
  3. 使用count()行动操作获取结果数量
  4. show()查看结果(调试用)
## 解析

原理说明

本题考察Spark DataFrame的基础操作:
1. 惰性执行:转换操作(如filter)不立即执行,行动操作(如count)触发计算
2. 结构化处理:DataFrame提供列式结构化API,比RDD更高效
3. CSV解析:Spark自动推断Schema,也可手动指定

代码示例

import org.apache.spark.sql.SparkSession

object EmployeeAnalysis {
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder()
      .appName("EmployeeFilter")
      .master("local[*]")
      .getOrCreate()

    // 读取CSV(含表头)
    val df = spark.read
      .option("header", "true")
      .csv("employees.csv")

    // 过滤并统计
    val engineeringCount = df.filter("department = 'Engineering'").count()

    // 结果输出
    println(s"Engineering department count: $engineeringCount")

    spark.stop()
  }
}

最佳实践

  • Schema处理:对于生产环境,建议显式定义Schema:
    val schema = StructType(Array(StructField("id",IntegerType), ...))
  • 过滤语法:优先使用filter(col("department") === "Engineering")类型安全写法
  • 资源管理:用try-finally确保spark.stop()始终执行

常见错误

错误类型示例解决方案
大小写敏感filter("Department = 'Engineering'")Spark默认大小写敏感,需匹配列名大小写
行动操作缺失忘记调用count()转换操作需行动操作触发执行
路径错误csv("employee.csv")使用绝对路径或检查文件权限

扩展知识

  • 性能优化:对常用过滤列使用df.cache()缓存数据
  • SQL语法:可注册临时表后用SQL查询:
    df.createTempView("employees")
    spark.sql("SELECT COUNT(*) FROM employees WHERE department='Engineering'")
  • 数据源扩展:同样模式适用于JSON/Parquet等数据源,只需替换read.json()等方法