前言
上文 Snail Job 的 Map 任务 中已经对 Map 任务有了大致的了解。并且上文中已经预告了本文介绍的任务类型 MapReduce。在 Map 任务的基础上再加上汇总的方法,就是 MapReduce 任务了。按照小学语文老师教给我们的写作技巧:总分总的关系来看。Map 任务仅仅是总分关系,而 MapReduce 就是总分总关系了。
可能你对上面的描述还有所疑惑,不过咱们通过上文的例子,和本文例子的结合对比,就能有所共鸣了。
本文目标
上文的例子是:200 个数字,切成 4 个片,每个片中有 50 个数,对每个切片进行汇总计算。本文在这个基础上最后再多一步计算最终汇总值。从本例中了解的知识点如下:
- 客户端采用继承类方式实现 MapReduce 任务
- 客户端采用注解方式实现 MapReduce 任务
- 服务器端配置分片数的作用
客户端代码 1
这里针对的是 reduce 分片数设置 = 1 的情况,后面会解释这个分片数的作用。
开发环境
- JDK 版本:openjdk-21.0.2
- snail-job 版本:1.2.0
Maven 依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- snail-job 客户端依赖 -->
<dependency>
<groupId>com.aizuda</groupId>
<artifactId>snail-job-client-starter</artifactId>
<version>1.2.0</version>
</dependency>
<!-- snail-job 重试相关依赖 -->
<dependency>
<groupId>com.aizuda</groupId>
<artifactId>snail-job-client-retry-core</artifactId>
<version>1.2.0</version>
</dependency>
<!-- snail-job 客户端核心依赖 -->
<dependency>
<groupId>com.aizuda</groupId>
<artifactId>snail-job-client-job-core</artifactId>
<version>1.2.0</version>
</dependency>
</dependencies>继承类方式
@Component
public class TestMapReduce1 extends AbstractMapReduceExecutor {
@Override
public ExecuteResult doJobMapExecute(MapArgs mapArgs, MapHandler mapHandler) {
return switch (mapArgs.getTaskName()) {
case SystemConstants.ROOT_MAP -> {
// 生成1~200数值并分片
int partitionSize = 50;
List<List<Integer>> partition = IntStream.rangeClosed(1, 200)
.boxed()
.collect(Collectors.groupingBy(i -> (i - 1) / partitionSize))
.values()
.stream()
.toList();
SnailJobLog.REMOTE.info("端口:{}完成分配任务", SpringUtil.getProperty("server.port"));
yield mapHandler.doMap(partition, "doCalc");
}
case "doCalc" -> {
List<Integer> sourceList = (List<Integer>) mapArgs.getMapResult();
// 遍历sourceList的每一个元素,计算出一个累加值partitionTotal
int partitionTotal = sourceList.stream().mapToInt(i -> i).sum();
// 打印日志到服务器
ThreadUtil.sleep(3, TimeUnit.SECONDS);
SnailJobLog.REMOTE.info("端口:{},partitionTotal:{}", SpringUtil.getProperty("server.port"), partitionTotal);
yield ExecuteResult.success(partitionTotal);
}
default -> ExecuteResult.failure();
};
}
@Override
protected ExecuteResult doReduceExecute(ReduceArgs reduceArgs) {
// 数据进行累加计算
int reduceTotal = reduceArgs.getMapResult().stream().mapToInt(i -> Integer.parseInt((String) i)).sum();
SnailJobLog.REMOTE.info("端口:{},reduceTotal:{}", SpringUtil.getProperty("server.port"), reduceTotal);
return ExecuteResult.success(reduceTotal);
}
@Override
protected ExecuteResult doMergeReduceExecute(MergeReduceArgs mergeReduceArgs) {
return null;
}
}解释说明:
- 通过继承
AbstractMapReduceExecutor类实现 MapReduce 任务- 其中的
doJobMapExecute和上文的作用一样,都是通过任务名称,来区分是 ROOT_MAP 任务还是分片后的处理。doReduceExecute方法,如果配置的 reduce 分片数是 1 的话,那么会随机调用 1 个客户端进行 reduce【汇总】操作。doMergeReduceExecute方法是在配置 reduce 分片数 > 1 的时候才会去执行。也是随机调用一个存活的客户端去执行。本例采用的 reduce 分片数 = 1,故这里没有任何代码。详细配置见服务端配置 1。
注解方式
@Component
@JobExecutor(name = "testMapReduceAnnotation1")
public class TestMapReduceAnnotation1 {
@MapExecutor
public ExecuteResult rootMapExecute(MapArgs mapArgs, MapHandler mapHandler) {
int partitionSize = 50;
List<List<Integer>> partition = IntStream.rangeClosed(1, 200)
.boxed()
.collect(Collectors.groupingBy(i -> (i - 1) / partitionSize))
.values()
.stream()
.toList();
SnailJobLog.REMOTE.info("端口:{}完成分配任务", SpringUtil.getProperty("server.port"));
return mapHandler.doMap(partition, "doCalc");
}
@MapExecutor(taskName = "doCalc")
public ExecuteResult doCalc(MapArgs mapArgs) {
List<Integer> sourceList = (List<Integer>) mapArgs.getMapResult();
// 遍历sourceList的每一个元素,计算出一个累加值partitionTotal
int partitionTotal = sourceList.stream().mapToInt(i -> i).sum();
// 打印日志到服务器
ThreadUtil.sleep(3, TimeUnit.SECONDS);
SnailJobLog.REMOTE.info("端口:{},partitionTotal:{}", SpringUtil.getProperty("server.port"), partitionTotal);
return ExecuteResult.success(partitionTotal);
}
@ReduceExecutor
public ExecuteResult reduceExecute(ReduceArgs reduceArgs) {
int reduceTotal = reduceArgs.getMapResult().stream().mapToInt(i -> Integer.parseInt((String) i)).sum();
SnailJobLog.REMOTE.info("端口:{},reduceTotal:{}", SpringUtil.getProperty("server.port"), reduceTotal);
return ExecuteResult.success(reduceTotal);
}
}解释说明:
- 通过
@JobExecutor注解标识该类是一个定时任务- 通过
@MapExecutor注解来来处理分片、分片后的任务处理- 通过
@ReduceExecutor注解来处理汇总任务- 由于配置的 reduce 分片数是 1,所以这里用到把上面
@ReduceExecutor汇总任务执行结果再次进行合并操作的注解
示意图
服务端配置 1
继承类方式
| 配置项 | 配置内容 |
|---|---|
| 任务名称 | MapReduce 任务 1- 继承类 |
| 状态 | 禁用 |
| 任务类型 | MapReduce |
| 自定义执行器 | com.mayuanfei.test.TestMapReduce1 |
| reduce 分片数 | 1 |
| 并行数 | 1 |
说明:
- 状态:状态设置为禁用,是想通过手动执行来触发
- 任务类型:要选本文介绍的任务类型 MapReduce
- 自定义执行器:继承类的方式要写全路径【复习】
- 并行数:指客户端每台机器的线程数【复习】
- reduce 分片数:它设置的数量决定的
doReduceExecute由几台客户端来执行。当它设置的数 > 1 时,还会随机抽取一台客户端执行doMergeReduceExecute方法
注解方式
| 配置项 | 配置内容 |
|---|---|
| 任务名称 | MapReduce 任务 1- 注解 |
| 状态 | 禁用 |
| 任务类型 | MapReduce |
| 自定义执行器 | testMapReduceAnnotation1 |
| reduce 分片数 | 1 |
| 并行数 | 1 |
说明:
- 自定义执行器 : 与注解
@JobExecutor(name = "testMapReduceAnnotation1")名称一致【复习】
客户端代码 1 测试
这里不管是继承类方式,还是注解方式测试的结果都是一样的。
测试前提
| web 端口 | snail-job 的客户端端口 |
|---|---|
| 9100 | 1900 |
| 9200 | 2900 |
可以参考 Snail Job 广播任务 的本机两个客户端启动章节的介绍。idea 中配置如下:
ver.port=9100 -Dsnail-job.port=1900
-Dserver.port=9200 -Dsnail-job.port=2900测试 MapReduce 任务
9100Web 端口
9200Web 端口
服务端管理页面
客户端代码 2
这里针对 reduce 分片数设置 > 1 的情况,会有 reduce 分片数设置的数量的客户端执行 reduce 方法。最后通过合并汇总方法来执行最后的运算。
继承类方式
@Component
public class TestMapReduce2 extends AbstractMapReduceExecutor {
@Override
public ExecuteResult doJobMapExecute(MapArgs mapArgs, MapHandler mapHandler) {
return switch (mapArgs.getTaskName()) {
case SystemConstants.ROOT_MAP -> {
// 生成1~200数值并分片
int partitionSize = 50;
List<List<Integer>> partition = IntStream.rangeClosed(1, 200)
.boxed()
.collect(Collectors.groupingBy(i -> (i - 1) / partitionSize))
.values()
.stream()
.toList();
SnailJobLog.REMOTE.info("端口:{}完成分配任务", SpringUtil.getProperty("server.port"));
yield mapHandler.doMap(partition, "doCalc");
}
case "doCalc" -> {
List<Integer> sourceList = (List<Integer>) mapArgs.getMapResult();
// 遍历sourceList的每一个元素,计算出一个累加值partitionTotal
int partitionTotal = sourceList.stream().mapToInt(i -> i).sum();
// 打印日志到服务器
ThreadUtil.sleep(3, TimeUnit.SECONDS);
SnailJobLog.REMOTE.info("端口:{},partitionTotal:{}", SpringUtil.getProperty("server.port"), partitionTotal);
yield ExecuteResult.success(partitionTotal);
}
default -> ExecuteResult.failure();
};
}
@Override
protected ExecuteResult doReduceExecute(ReduceArgs reduceArgs) {
// 数据进行累加计算
int reduceTotal = reduceArgs.getMapResult().stream().mapToInt(i -> Integer.parseInt((String) i)).sum();
SnailJobLog.REMOTE.info("端口:{},reduceTotal:{}", SpringUtil.getProperty("server.port"), reduceTotal);
return ExecuteResult.success(reduceTotal);
}
@Override
protected ExecuteResult doMergeReduceExecute(MergeReduceArgs mergeReduceArgs) {
// 把reduce的结果进行累加计算
int mergeReduceTotal = mergeReduceArgs.getReduces().stream().mapToInt(i -> Integer.parseInt((String) i)).sum();
SnailJobLog.REMOTE.info("端口:{},mergeReduceTotal:{}", SpringUtil.getProperty("server.port"), mergeReduceTotal);
return ExecuteResult.success(mergeReduceTotal);
}
}说明:
- 只有 reduce 分片数 > 1 时,
doMergeReduceExecute方法才会被触发执行- 并且仅一个客户端节点执行
doMergeReduceExecute方法
注解方式
@Component
@JobExecutor(name = "testMapReduceAnnotation2")
public class TestMapReduceAnnotation2 {
@MapExecutor
public ExecuteResult rootMapExecute(MapArgs mapArgs, MapHandler mapHandler) {
int partitionSize = 50;
List<List<Integer>> partition = IntStream.rangeClosed(1, 200)
.boxed()
.collect(Collectors.groupingBy(i -> (i - 1) / partitionSize))
.values()
.stream()
.toList();
SnailJobLog.REMOTE.info("端口:{}完成分配任务", SpringUtil.getProperty("server.port"));
return mapHandler.doMap(partition, "doCalc");
}
@MapExecutor(taskName = "doCalc")
public ExecuteResult doCalc(MapArgs mapArgs) {
List<Integer> sourceList = (List<Integer>) mapArgs.getMapResult();
// 遍历sourceList的每一个元素,计算出一个累加值partitionTotal
int partitionTotal = sourceList.stream().mapToInt(i -> i).sum();
// 打印日志到服务器
ThreadUtil.sleep(3, TimeUnit.SECONDS);
SnailJobLog.REMOTE.info("端口:{},partitionTotal:{}", SpringUtil.getProperty("server.port"), partitionTotal);
return ExecuteResult.success(partitionTotal);
}
@ReduceExecutor
public ExecuteResult reduceExecute(ReduceArgs reduceArgs) {
int reduceTotal = reduceArgs.getMapResult().stream().mapToInt(i -> Integer.parseInt((String) i)).sum();
SnailJobLog.REMOTE.info("端口:{},reduceTotal:{}", SpringUtil.getProperty("server.port"), reduceTotal);
return ExecuteResult.success(reduceTotal);
}
@MergeReduceExecutor
public ExecuteResult mergeReduceExecute(MergeReduceArgs mergeReduceArgs) {
// 把reduce的结果进行累加计算
int mergeReduceTotal = mergeReduceArgs.getReduces().stream().mapToInt(i -> Integer.parseInt((String) i)).sum();
SnailJobLog.REMOTE.info("端口:{},mergeReduceTotal:{}", SpringUtil.getProperty("server.port"), mergeReduceTotal);
return ExecuteResult.success(mergeReduceTotal);
}
}说明:
和继承类方式完全一致,只不过这里采用注解方式实现。同理
@MergeReduceExecutor只有 reduce 分片数 > 1 时才有意义。
示意图
服务端配置 2
继承类方式
| 配置项 | 配置内容 |
|---|---|
| 任务名称 | MapReduce 任务 1- 继承类 |
| 状态 | 禁用 |
| 任务类型 | MapReduce |
| 自定义执行器 | com.mayuanfei.test.TestMapReduce2 |
| reduce 分片数 | 2 |
| 并行数 | 1 |
说明:
这里 reduce 分片数 = 2
注解方式
| 配置项 | 配置内容 |
|---|---|
| 任务名称 | MapReduce 任务 2- 注解 |
| 状态 | 禁用 |
| 任务类型 | MapReduce |
| 自定义执行器 | testMapReduceAnnotation2 |
| reduce 分片数 | 2 |
| 并行数 | 1 |
说明:
这里 reduce 分片数 = 2
客户端代码 2 测试
测试前提
| web 端口 | snail-job 的客户端端口 |
|---|---|
| 9100 | 1900 |
| 9200 | 2900 |
可以参考 Snail Job 广播任务 的本机两个客户端启动章节的介绍。idea 中配置如下:
-Dserver.port=9100 -Dsnail-job.port=1900
-Dserver.port=9200 -Dsnail-job.port=2900测试 MapReduce 任务
9100Web 端口
9200Web 端口
服务端管理页面
总结
MapReduce 任务就是一个总分总的过程。
@MapExecutor【总】@MapExecutor(taskName="doCalc")【分】@ReduceExecutor【总】
- MapReduce 任务用注解方式似乎更容易理解。
reduce 分片数的作用:
- 等于 1 时:
@ReduceExecutor会随机客户端执行,@MergeReduceExecutor无效。 - 大于 1 时:
@ReduceExecutor按照指定配置数量随机找出后执行,@MergeReduceExecutor会随机找一台客户端执行
- 等于 1 时:
- 分片数量不建议过多(大于 200 时会提示分片过多,最多不能超过 500 个分片)
本笔记原文来自博主 老马 9527 的文章 6.snail-job的MapReduce任务
如有侵权,请联系作者 马铃薯头 删除







