前言

上文 Snail Job 的 Map 任务 中已经对 Map 任务有了大致的了解。并且上文中已经预告了本文介绍的任务类型 MapReduce。在 Map 任务的基础上再加上汇总的方法,就是 MapReduce 任务了。按照小学语文老师教给我们的写作技巧:总分总的关系来看。Map 任务仅仅是总分关系,而 MapReduce 就是总分总关系了。

可能你对上面的描述还有所疑惑,不过咱们通过上文的例子,和本文例子的结合对比,就能有所共鸣了。

本文目标

上文的例子是:200 个数字,切成 4 个片,每个片中有 50 个数,对每个切片进行汇总计算。本文在这个基础上最后再多一步计算最终汇总值。从本例中了解的知识点如下:

  • 客户端采用继承类方式实现 MapReduce 任务
  • 客户端采用注解方式实现 MapReduce 任务
  • 服务器端配置分片数的作用

客户端代码 1

这里针对的是 reduce 分片数设置 = 1 的情况,后面会解释这个分片数的作用。

开发环境

  • JDK 版本:openjdk-21.0.2
  • snail-job 版本:1.2.0

Maven 依赖

<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!-- snail-job 客户端依赖 -->
        <dependency>
            <groupId>com.aizuda</groupId>
            <artifactId>snail-job-client-starter</artifactId>
            <version>1.2.0</version>
        </dependency>
        <!-- snail-job 重试相关依赖 -->
        <dependency>
            <groupId>com.aizuda</groupId>
            <artifactId>snail-job-client-retry-core</artifactId>
            <version>1.2.0</version>
        </dependency>
        <!-- snail-job 客户端核心依赖 -->
        <dependency>
            <groupId>com.aizuda</groupId>
            <artifactId>snail-job-client-job-core</artifactId>
            <version>1.2.0</version>
        </dependency>
    </dependencies>

继承类方式

@Component
public class TestMapReduce1 extends AbstractMapReduceExecutor {

    @Override
    public ExecuteResult doJobMapExecute(MapArgs mapArgs, MapHandler mapHandler) {
        return switch (mapArgs.getTaskName()) {
            case SystemConstants.ROOT_MAP -> {
                // 生成1~200数值并分片
                int partitionSize = 50;
                List<List<Integer>> partition = IntStream.rangeClosed(1, 200)
                        .boxed()
                        .collect(Collectors.groupingBy(i -> (i - 1) / partitionSize))
                        .values()
                        .stream()
                        .toList();
                SnailJobLog.REMOTE.info("端口:{}完成分配任务", SpringUtil.getProperty("server.port"));
                yield mapHandler.doMap(partition, "doCalc");
            }
            case "doCalc" -> {
                List<Integer> sourceList = (List<Integer>) mapArgs.getMapResult();
                // 遍历sourceList的每一个元素,计算出一个累加值partitionTotal
                int partitionTotal = sourceList.stream().mapToInt(i -> i).sum();
                // 打印日志到服务器
                ThreadUtil.sleep(3, TimeUnit.SECONDS);
                SnailJobLog.REMOTE.info("端口:{},partitionTotal:{}", SpringUtil.getProperty("server.port"), partitionTotal);
                yield ExecuteResult.success(partitionTotal);
            }
            default -> ExecuteResult.failure();
        };
    }

    @Override
    protected ExecuteResult doReduceExecute(ReduceArgs reduceArgs) {
        // 数据进行累加计算
        int reduceTotal = reduceArgs.getMapResult().stream().mapToInt(i -> Integer.parseInt((String) i)).sum();
        SnailJobLog.REMOTE.info("端口:{},reduceTotal:{}", SpringUtil.getProperty("server.port"), reduceTotal);
        return ExecuteResult.success(reduceTotal);
    }

    @Override
    protected ExecuteResult doMergeReduceExecute(MergeReduceArgs mergeReduceArgs) {
        return null;
    }
}

解释说明:

  • 通过继承 AbstractMapReduceExecutor 类实现 MapReduce 任务
  • 其中的 doJobMapExecute 和上文的作用一样,都是通过任务名称,来区分是 ROOT_MAP 任务还是分片后的处理。
  • doReduceExecute 方法,如果配置的 reduce 分片数是 1 的话,那么会随机调用 1 个客户端进行 reduce【汇总】操作。
  • doMergeReduceExecute 方法是在配置 reduce 分片数 > 1 的时候才会去执行。也是随机调用一个存活的客户端去执行。本例采用的 reduce 分片数 = 1,故这里没有任何代码。详细配置见服务端配置 1

注解方式

@Component
@JobExecutor(name = "testMapReduceAnnotation1")
public class TestMapReduceAnnotation1 {

    @MapExecutor
    public ExecuteResult rootMapExecute(MapArgs mapArgs, MapHandler mapHandler) {
        int partitionSize = 50;
        List<List<Integer>> partition = IntStream.rangeClosed(1, 200)
                .boxed()
                .collect(Collectors.groupingBy(i -> (i - 1) / partitionSize))
                .values()
                .stream()
                .toList();
        SnailJobLog.REMOTE.info("端口:{}完成分配任务", SpringUtil.getProperty("server.port"));
        return mapHandler.doMap(partition, "doCalc");
    }

    @MapExecutor(taskName = "doCalc")
    public ExecuteResult doCalc(MapArgs mapArgs) {
        List<Integer> sourceList = (List<Integer>) mapArgs.getMapResult();
        // 遍历sourceList的每一个元素,计算出一个累加值partitionTotal
        int partitionTotal = sourceList.stream().mapToInt(i -> i).sum();
        // 打印日志到服务器
        ThreadUtil.sleep(3, TimeUnit.SECONDS);
        SnailJobLog.REMOTE.info("端口:{},partitionTotal:{}", SpringUtil.getProperty("server.port"), partitionTotal);
        return ExecuteResult.success(partitionTotal);
    }

    @ReduceExecutor
    public ExecuteResult reduceExecute(ReduceArgs reduceArgs) {
        int reduceTotal = reduceArgs.getMapResult().stream().mapToInt(i -> Integer.parseInt((String) i)).sum();
        SnailJobLog.REMOTE.info("端口:{},reduceTotal:{}", SpringUtil.getProperty("server.port"), reduceTotal);
        return ExecuteResult.success(reduceTotal);
    }
}

解释说明:

  • 通过 @JobExecutor 注解标识该类是一个定时任务
  • 通过 @MapExecutor 注解来来处理分片、分片后的任务处理
  • 通过 @ReduceExecutor 注解来处理汇总任务
  • 由于配置的 reduce 分片数是 1,所以这里用到把上面 @ReduceExecutor 汇总任务执行结果再次进行合并操作的注解

示意图

image-20241213160126069

服务端配置 1

继承类方式

配置项配置内容
任务名称MapReduce 任务 1- 继承类
状态禁用
任务类型MapReduce
自定义执行器com.mayuanfei.test.TestMapReduce1
reduce 分片数1
并行数1

说明:

  • 状态:状态设置为禁用,是想通过手动执行来触发
  • 任务类型:要选本文介绍的任务类型 MapReduce
  • 自定义执行器:继承类的方式要写全路径【复习】
  • 并行数:指客户端每台机器的线程数【复习】
  • reduce 分片数:它设置的数量决定的 doReduceExecute 由几台客户端来执行。当它设置的数 > 1 时,还会随机抽取一台客户端执行 doMergeReduceExecute 方法

注解方式

配置项配置内容
任务名称MapReduce 任务 1- 注解
状态禁用
任务类型MapReduce
自定义执行器testMapReduceAnnotation1
reduce 分片数1
并行数1

说明:

  • 自定义执行器 : 与注解 @JobExecutor(name = "testMapReduceAnnotation1") 名称一致【复习】

客户端代码 1 测试

这里不管是继承类方式,还是注解方式测试的结果都是一样的。

测试前提

web 端口snail-job 的客户端端口
91001900
92002900

可以参考 Snail Job 广播任务 的本机两个客户端启动章节的介绍。idea 中配置如下:

ver.port=9100 -Dsnail-job.port=1900
-Dserver.port=9200 -Dsnail-job.port=2900

测试 MapReduce 任务

  • 9100Web 端口

    image-20241216083116432

  • 9200Web 端口

    image-20241216083330658

服务端管理页面

image-20241216083704850

客户端代码 2

这里针对 reduce 分片数设置 > 1 的情况,会有 reduce 分片数设置的数量的客户端执行 reduce 方法。最后通过合并汇总方法来执行最后的运算。

继承类方式

@Component
public class TestMapReduce2 extends AbstractMapReduceExecutor {

    @Override
    public ExecuteResult doJobMapExecute(MapArgs mapArgs, MapHandler mapHandler) {
        return switch (mapArgs.getTaskName()) {
            case SystemConstants.ROOT_MAP -> {
                // 生成1~200数值并分片
                int partitionSize = 50;
                List<List<Integer>> partition = IntStream.rangeClosed(1, 200)
                        .boxed()
                        .collect(Collectors.groupingBy(i -> (i - 1) / partitionSize))
                        .values()
                        .stream()
                        .toList();
                SnailJobLog.REMOTE.info("端口:{}完成分配任务", SpringUtil.getProperty("server.port"));
                yield mapHandler.doMap(partition, "doCalc");
            }
            case "doCalc" -> {
                List<Integer> sourceList = (List<Integer>) mapArgs.getMapResult();
                // 遍历sourceList的每一个元素,计算出一个累加值partitionTotal
                int partitionTotal = sourceList.stream().mapToInt(i -> i).sum();
                // 打印日志到服务器
                ThreadUtil.sleep(3, TimeUnit.SECONDS);
                SnailJobLog.REMOTE.info("端口:{},partitionTotal:{}", SpringUtil.getProperty("server.port"), partitionTotal);
                yield ExecuteResult.success(partitionTotal);
            }
            default -> ExecuteResult.failure();
        };
    }

    @Override
    protected ExecuteResult doReduceExecute(ReduceArgs reduceArgs) {
        // 数据进行累加计算
        int reduceTotal = reduceArgs.getMapResult().stream().mapToInt(i -> Integer.parseInt((String) i)).sum();
        SnailJobLog.REMOTE.info("端口:{},reduceTotal:{}", SpringUtil.getProperty("server.port"), reduceTotal);
        return ExecuteResult.success(reduceTotal);
    }

    @Override
    protected ExecuteResult doMergeReduceExecute(MergeReduceArgs mergeReduceArgs) {
        // 把reduce的结果进行累加计算
        int mergeReduceTotal = mergeReduceArgs.getReduces().stream().mapToInt(i -> Integer.parseInt((String) i)).sum();
        SnailJobLog.REMOTE.info("端口:{},mergeReduceTotal:{}", SpringUtil.getProperty("server.port"), mergeReduceTotal);
        return ExecuteResult.success(mergeReduceTotal);
    }
}

说明:

  • 只有 reduce 分片数 > 1 时,doMergeReduceExecute 方法才会被触发执行
  • 并且仅一个客户端节点执行 doMergeReduceExecute 方法

注解方式

@Component
@JobExecutor(name = "testMapReduceAnnotation2")
public class TestMapReduceAnnotation2 {

    @MapExecutor
    public ExecuteResult rootMapExecute(MapArgs mapArgs, MapHandler mapHandler) {
        int partitionSize = 50;
        List<List<Integer>> partition = IntStream.rangeClosed(1, 200)
                .boxed()
                .collect(Collectors.groupingBy(i -> (i - 1) / partitionSize))
                .values()
                .stream()
                .toList();
        SnailJobLog.REMOTE.info("端口:{}完成分配任务", SpringUtil.getProperty("server.port"));
        return mapHandler.doMap(partition, "doCalc");
    }

    @MapExecutor(taskName = "doCalc")
    public ExecuteResult doCalc(MapArgs mapArgs) {
        List<Integer> sourceList = (List<Integer>) mapArgs.getMapResult();
        // 遍历sourceList的每一个元素,计算出一个累加值partitionTotal
        int partitionTotal = sourceList.stream().mapToInt(i -> i).sum();
        // 打印日志到服务器
        ThreadUtil.sleep(3, TimeUnit.SECONDS);
        SnailJobLog.REMOTE.info("端口:{},partitionTotal:{}", SpringUtil.getProperty("server.port"), partitionTotal);
        return ExecuteResult.success(partitionTotal);
    }

    @ReduceExecutor
    public ExecuteResult reduceExecute(ReduceArgs reduceArgs) {
        int reduceTotal = reduceArgs.getMapResult().stream().mapToInt(i -> Integer.parseInt((String) i)).sum();
        SnailJobLog.REMOTE.info("端口:{},reduceTotal:{}", SpringUtil.getProperty("server.port"), reduceTotal);
        return ExecuteResult.success(reduceTotal);
    }

    @MergeReduceExecutor
    public ExecuteResult mergeReduceExecute(MergeReduceArgs mergeReduceArgs) {
        // 把reduce的结果进行累加计算
        int mergeReduceTotal = mergeReduceArgs.getReduces().stream().mapToInt(i -> Integer.parseInt((String) i)).sum();
        SnailJobLog.REMOTE.info("端口:{},mergeReduceTotal:{}", SpringUtil.getProperty("server.port"), mergeReduceTotal);
        return ExecuteResult.success(mergeReduceTotal);
    }
}

说明:

和继承类方式完全一致,只不过这里采用注解方式实现。同理 @MergeReduceExecutor 只有 reduce 分片数 > 1 时才有意义。

示意图

image-20241213160631540

服务端配置 2

继承类方式

配置项配置内容
任务名称MapReduce 任务 1- 继承类
状态禁用
任务类型MapReduce
自定义执行器com.mayuanfei.test.TestMapReduce2
reduce 分片数2
并行数1

说明:

这里 reduce 分片数 = 2

注解方式

配置项配置内容
任务名称MapReduce 任务 2- 注解
状态禁用
任务类型MapReduce
自定义执行器testMapReduceAnnotation2
reduce 分片数2
并行数1

说明:

这里 reduce 分片数 = 2

客户端代码 2 测试

测试前提

web 端口snail-job 的客户端端口
91001900
92002900

可以参考 Snail Job 广播任务 的本机两个客户端启动章节的介绍。idea 中配置如下:

-Dserver.port=9100 -Dsnail-job.port=1900
-Dserver.port=9200 -Dsnail-job.port=2900

测试 MapReduce 任务

  • 9100Web 端口

    image-20241216084216524

  • 9200Web 端口

    image-20241216084454429

服务端管理页面

image-20241216084817923

总结

  • MapReduce 任务就是一个总分总的过程。

    • @MapExecutor【总】
    • @MapExecutor(taskName="doCalc") 【分】
    • @ReduceExecutor【总】
  • MapReduce 任务用注解方式似乎更容易理解。
  • reduce 分片数的作用:

    • 等于 1 时:@ReduceExecutor 会随机客户端执行,@MergeReduceExecutor 无效。
    • 大于 1 时:@ReduceExecutor 按照指定配置数量随机找出后执行,@MergeReduceExecutor 会随机找一台客户端执行
  • 分片数量不建议过多(大于 200 时会提示分片过多,最多不能超过 500 个分片)

本笔记原文来自博主 老马 9527 的文章 6.snail-job的MapReduce任务
如有侵权,请联系作者 马铃薯头 删除
最后修改:2025 年 11 月 03 日
温柔的好天气总是和我一样,帅的鸭皮!