题目
Hadoop MapReduce实现用户访问URL频次统计与排序
信息
- 类型:问答
- 难度:⭐⭐
考点
MapReduce编程模型, Shuffle过程理解, 自定义排序
快速回答
实现步骤:
- Mapper阶段:解析日志,输出<用户_URL, 1>键值对
- Combiner阶段:本地聚合减少网络传输
- Reducer阶段:全局汇总访问次数
- 自定义排序:通过实现WritableComparable接口实现按访问次数降序排列
- 使用Secondary Sort技术确保相同用户的URL按访问次数排序
问题场景
处理TB级Web服务器日志,统计每个用户访问不同URL的次数,并按访问次数从高到低排序。原始日志格式:192.168.1.1 - user123 [10/Oct/2023:14:22:01] \"GET /product/123 HTTP/1.1\" 200
原理说明
- MapReduce流程:Mapper解析日志 → Combiner本地聚合 → Partitioner按用户分区 → Shuffle排序分组 → Reducer全局计算
- 自定义排序:通过实现
WritableComparable接口控制Key的排序逻辑 - Secondary Sort:利用复合Key(用户+URL)和自定义GroupingComparator实现用户分组内按URL访问量排序
代码实现
1. 自定义Key类
public class UserUrlKey implements WritableComparable<UserUrlKey> {
private Text userId;
private Text url;
private IntWritable count;
// 构造方法、序列化方法省略
@Override
public int compareTo(UserUrlKey o) {
int userComp = userId.compareTo(o.userId);
if (userComp != 0) return userComp;
// 降序排列:大值在前
return -count.compareTo(o.count);
}
}2. Mapper实现
public class LogMapper extends Mapper<LongWritable, Text, UserUrlKey, IntWritable> {
private UserUrlKey outputKey = new UserUrlKey();
private IntWritable ONE = new IntWritable(1);
protected void map(LongWritable key, Text value, Context context) {
// 解析日志获取userId和url
String[] parts = value.toString().split(" \\");
String userId = parts[0].split(" - ")[1];
String url = parts[1].split(" ")[1];
outputKey.set(userId, url, ONE);
context.write(outputKey, ONE);
}
}3. Reducer实现
public class CountReducer extends Reducer<UserUrlKey, IntWritable, Text, IntWritable> {
protected void reduce(UserUrlKey key, Iterable<IntWritable> values, Context context) {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
// 输出格式:user123:/product/123 → 15
context.write(new Text(key.getUserId() + ":" + key.getUrl()), new IntWritable(sum));
}
}4. 自定义GroupingComparator
public class UserGroupComparator extends WritableComparator {
protected UserGroupComparator() {
super(UserUrlKey.class, true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
// 仅按userId分组
UserUrlKey k1 = (UserUrlKey)a;
UserUrlKey k2 = (UserUrlKey)b;
return k1.getUserId().compareTo(k2.getUserId());
}
}最佳实践
- Combiner优化:在Mapper后添加Combiner(可直接复用Reducer代码)减少Shuffle数据量
- 分区优化:自定义Partitioner确保相同用户的数据发送到同一Reducer
public int getPartition(UserUrlKey key, int numPartitions) { return (key.getUserId().hashCode() & Integer.MAX_VALUE) % numPartitions; } - 压缩传输:启用Map输出压缩(
mapreduce.map.output.compress=true)
常见错误
- 对象重用问题:在Reducer中
for (IntWritable val : values)循环内未克隆对象导致值被覆盖 - 内存溢出:单个用户访问URL过多时,Reducer的values迭代器可能耗尽内存 → 增加JVM堆大小或分治处理
- 排序失效:忘记设置GroupingComparator或在Job配置中遗漏
job.setGroupingComparatorClass()
扩展知识
- Shuffle过程:包括环形缓冲区(默认100MB)、Spill、Merge、Fetch等阶段,可通过
mapreduce.task.io.sort.*参数优化 - 性能对比:当数据倾斜严重时(如热门用户),考虑使用Two-Phase Sorting或改用Spark RDD的
repartitionAndSortWithinPartitions - 替代方案:Hive实现方案(需2个MR Job):
SELECT userId, url, COUNT(1) AS cnt
FROM logs
GROUP BY userId, url
ORDER BY userId, cnt DESC