题目
使用Spark DataFrame过滤和统计员工数据
信息
- 类型:问答
- 难度:⭐
考点
DataFrame创建, 数据过滤, 聚合操作
快速回答
核心操作步骤:
- 使用
spark.read.csv加载CSV数据 - 通过
filter或where筛选部门为'Engineering'的员工 - 使用
count()行动操作获取结果数量 - 用
show()查看结果(调试用)
原理说明
本题考察Spark DataFrame的基础操作:
1. 惰性执行:转换操作(如filter)不立即执行,行动操作(如count)触发计算
2. 结构化处理:DataFrame提供列式结构化API,比RDD更高效
3. CSV解析:Spark自动推断Schema,也可手动指定
代码示例
import org.apache.spark.sql.SparkSession
object EmployeeAnalysis {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder()
.appName("EmployeeFilter")
.master("local[*]")
.getOrCreate()
// 读取CSV(含表头)
val df = spark.read
.option("header", "true")
.csv("employees.csv")
// 过滤并统计
val engineeringCount = df.filter("department = 'Engineering'").count()
// 结果输出
println(s"Engineering department count: $engineeringCount")
spark.stop()
}
}最佳实践
- Schema处理:对于生产环境,建议显式定义Schema:
val schema = StructType(Array(StructField("id",IntegerType), ...)) - 过滤语法:优先使用
filter(col("department") === "Engineering")类型安全写法 - 资源管理:用
try-finally确保spark.stop()始终执行
常见错误
| 错误类型 | 示例 | 解决方案 |
|---|---|---|
| 大小写敏感 | filter("Department = 'Engineering'") | Spark默认大小写敏感,需匹配列名大小写 |
| 行动操作缺失 | 忘记调用count() | 转换操作需行动操作触发执行 |
| 路径错误 | csv("employee.csv") | 使用绝对路径或检查文件权限 |
扩展知识
- 性能优化:对常用过滤列使用
df.cache()缓存数据 - SQL语法:可注册临时表后用SQL查询:
df.createTempView("employees")
spark.sql("SELECT COUNT(*) FROM employees WHERE department='Engineering'") - 数据源扩展:同样模式适用于JSON/Parquet等数据源,只需替换
read.json()等方法