前言

上文 Snail Job 广播任务 中已经对广播任务的使用有了大致的了解。本文主要对分片任务的使用做一个介绍。首先解释一下什么是分片,"分片"通常指的是将一个大的任务或者数据集分割成多个较小的部分来处理的方法。这种方式可以提高处理效率,尤其是在大数据处理或分布式计算环境中。

就 Snail Job 中的静态分片任务来说,指的是将一个大任务拆分成多个小任务并发执行,每个小任务处理数据的一个子集。而这个要处理的子集,就是你每个任务获得静态参数来决定的。所以这里才称为静态分片,就是事先你已经知道要执行哪些数据集了。

本文目标

假设公司要进行数据安全方面的审查,但是已知 User 表中的手机号还是明文保存的,并且自增 id 是从 1 到 40000。总共也就这四万条数据。那么本文就对这个假设的场景通过 Snail Job 的静态分片任务来实现,从这个实现的过程中,了解如下知识点:

  • 客户端如何获得配置中的分片参数
  • 服务端如何配置一个静态分片任务
  • 服务端配置并行数的作用
  • Snail Job 集群定时任务 中,我们在阻塞策略中还遗留了一个恢复策略,通过本文例子加以验证

    策略解释
    恢复继续把失败的任务干完

本机两个客户端启动

这个启动方式可以参考 Snail Job 广播任务 的本机两个客户端启动章节的介绍。另外也可以参考本文最后的补充知识点内容介绍。我这里通过增加 Java 系统属性的方式,设置了本机两个客户端的 web 端口和 netty 端口。

web 端口snail-job 的客户端端口
80891789
80901790
-Dserver.port=8089 -Dsnail-job.port=1789
-Dserver.port=8090 -Dsnail-job.port=1790

image-20241008145312800

服务端分片任务配置说明

Snail Job 集群定时任务 一文中已经对任务的配置有了比较详细的说明。这里仅仅关注配置静态分片任务时比较特别的配置项。

image-20241008153657789

方法参数

在方法参数中,可以通过点击页面中的 + 按钮来添加多个分片参数。每个分片参数中所填写的内容,均会传送给客户端进行处理。每多一个分片参数你可以理解成多了一个要分配的任务。

并行数

是客户端可以同时执行的任务数量。上面说了,每多一个分片参数相当于多了一个任务。每个客户端都有一个线程池,这里设置的并行数,就是你打算让客户端多少个线程来同时执行任务。

进行测试

验证并行数 1

  • 客户端代码

    @Component
    public class TestStaticShardingJob extends AbstractJobExecutor {
    
        @Override
        protected ExecuteResult doJobExecute(JobArgs jobArgs) {
            String jobParams = String.valueOf(jobArgs.getJobParams());
            SnailJobLog.LOCAL.info("开始执行分片任务,参数:{}", jobParams);
            // 获得 jobArgs 中传入的开始 id 和结束 id
            String[] split = jobParams.split(",");
            Long fromId = Long.parseLong(split[0]);
            Long toId = Long.parseLong(split[1]);
            // 模拟数据库操作,对范围 id,进行加密处理
            try {
                SnailJobLog.REMOTE.info("开始对 id 范围:{}进行加密处理", fromId+"-"+toId);
                Thread.sleep(3000);
                SnailJobLog.REMOTE.info("对 id 范围:{}进行加密处理完成", fromId+"-"+toId);
            }catch (InterruptedException e){
               return ExecuteResult.failure("任务执行失败");
            }
            return ExecuteResult.success("执行分片任务完成");
        }
    }

    这里通过 jobArgs.getJobParams() 获得服务端分给本客户端的任务参数。

  • 服务端分片任务配置

    | 配置项 | 配置内容 |

      | ------------ | ------------------------------------------------------------ |

    | 任务名称 | 静态分片任务-并行数 |
    | 组名称 | service_plat |
    | 状态 | 禁用 |
    | 任务类型 | 静态分片 |
    | 执行器类型 | java |
    | 执行器名称 | com.mayuanfei.test.TestStaticShardingJob |
    | 方法参数 | 分片参数 1 :1,10000
    分片参数 2 :10001,20000
    分片参数 3 :20001,30000
    分片参数 4 :30001,40000 |
    | 最大重试次数 | 0 |
    | 并行数 | 1 |

  • 测试结果【手动执行】

    • 8089 端口客户端

      2024-10-09 08:11:47 [snail-netty-server-5] INFO  c.a.s.c.job.core.client.JobEndPoint
       - 批次:[2346] 任务调度成功. 
      2024-10-09 08:11:47 [snail-netty-server-6] INFO  c.a.s.c.job.core.client.JobEndPoint
       - 批次:[2346] 任务调度成功. 
      2024-10-09 08:11:47 [snail-job-job-2,346-1] INFO  c.m.test.TestStaticShardingJob
       - 开始执行分片任务,参数:30001,40000
      2024-10-09 08:11:47 [snail-job-job-2,346-1] INFO  c.m.test.TestStaticShardingJob
       - 开始对id范围:30001-40000进行加密处理
      2024-10-09 08:11:50 [snail-job-job-2,346-1] INFO  c.m.test.TestStaticShardingJob
       - 对id范围:30001-40000进行加密处理完成
      2024-10-09 08:11:50 [snail-job-job-2,346-1] INFO  c.m.test.TestStaticShardingJob
       - 开始执行分片任务,参数:10001,20000
      2024-10-09 08:11:50 [snail-job-job-2,346-1] INFO  c.m.test.TestStaticShardingJob
       - 开始对id范围:10001-20000进行加密处理
      2024-10-09 08:11:53 [snail-job-job-2,346-1] INFO  c.m.test.TestStaticShardingJob
       - 对id范围:10001-20000进行加密处理完成
      2024-10-09 08:11:53 [snail-job-job-2,346-1] INFO  c.a.s.c.j.c.e.JobExecutorFutureCallback
       - 任务执行成功 taskBatchId:[2346] [{"status":1,"result":"执行分片任务完成","message":"任务执行成功"}]

      注意:

      • 该客户端执行了 id 范围为 10001-20000 和 30001-40000
      • 每个任务执行用时 3 秒,总体大致花费了 6 秒的时间完成 2 个任务
    • 8090 端口客户端

      2024-10-09 08:11:47 [snail-netty-server-6] INFO  c.a.s.c.job.core.client.JobEndPoint
       - 批次:[2346] 任务调度成功. 
      2024-10-09 08:11:47 [snail-netty-server-5] INFO  c.a.s.c.job.core.client.JobEndPoint
       - 批次:[2346] 任务调度成功. 
      2024-10-09 08:11:47 [snail-job-job-2,346-1] INFO  c.m.test.TestStaticShardingJob
       - 开始执行分片任务,参数:1,10000
      2024-10-09 08:11:47 [snail-job-job-2,346-1] INFO  c.m.test.TestStaticShardingJob
       - 开始对id范围:1-10000进行加密处理
      2024-10-09 08:11:50 [snail-job-job-2,346-1] INFO  c.m.test.TestStaticShardingJob
       - 对id范围:1-10000进行加密处理完成
      2024-10-09 08:11:50 [snail-job-job-2,346-1] INFO  c.m.test.TestStaticShardingJob
       - 开始执行分片任务,参数:20001,30000
      2024-10-09 08:11:50 [snail-job-job-2,346-1] INFO  c.m.test.TestStaticShardingJob
       - 开始对id范围:20001-30000进行加密处理
      2024-10-09 08:11:53 [snail-job-job-2,346-1] INFO  c.m.test.TestStaticShardingJob
       - 对id范围:20001-30000进行加密处理完成
      2024-10-09 08:11:53 [snail-job-job-2,346-1] INFO  c.a.s.c.j.c.e.JobExecutorFutureCallback
       - 任务执行成功 taskBatchId:[2346] [{"status":1,"result":"执行分片任务完成","message":"任务执行成功"}]

      注意:

      • 该客户端执行了 id 范围为 1-10001 和 20001-30000
      • 每个任务执行用时 3 秒,总体大致花费了 6 秒的时间完成 2 个任务
    • 任务批次列表中执行时长

      image-20241009082113174

验证并行数 2

  • 客户端代码

    同并行数 1 的代码

  • 服务端分片任务配置

    | 配置项 | 配置内容 |

      | -------- | ------------------- |

    | 任务名称 | 静态分片任务-并行数 |
    | 并行数 | 2 |

    这里仅仅是把并行数修改为 2。

  • 测试结果【手动执行】

    • 8089 端口客户端

      2024-10-09 09:32:11 [snail-netty-server-9] INFO  c.a.s.c.job.core.client.JobEndPoint
       - 批次:[2347] 任务调度成功. 
      2024-10-09 09:32:11 [snail-netty-server-10] INFO  c.a.s.c.job.core.client.JobEndPoint
       - 批次:[2347] 任务调度成功. 
      2024-10-09 09:32:11 [snail-job-job-2,347-1] INFO  c.m.test.TestStaticShardingJob
       - 开始执行分片任务,参数:10001,20000
      2024-10-09 09:32:11 [snail-job-job-2,347-2] INFO  c.m.test.TestStaticShardingJob
       - 开始执行分片任务,参数:30001,40000
      2024-10-09 09:32:11 [snail-job-job-2,347-1] INFO  c.m.test.TestStaticShardingJob
       - 开始对id范围:10001-20000进行加密处理
      2024-10-09 09:32:11 [snail-job-job-2,347-2] INFO  c.m.test.TestStaticShardingJob
       - 开始对id范围:30001-40000进行加密处理
      2024-10-09 09:32:14 [snail-job-job-2,347-1] INFO  c.m.test.TestStaticShardingJob
       - 对id范围:10001-20000进行加密处理完成
      2024-10-09 09:32:14 [snail-job-job-2,347-1] INFO  c.a.s.c.j.c.e.JobExecutorFutureCallback
       - 任务执行成功 taskBatchId:[2347] [{"status":1,"result":"执行分片任务完成","message":"任务执行成功"}]
      2024-10-09 09:32:14 [snail-job-job-2,347-2] INFO  c.m.test.TestStaticShardingJob
       - 对id范围:30001-40000进行加密处理完成
      2024-10-09 09:32:14 [snail-job-job-2,347-2] INFO  c.a.s.c.j.c.e.JobExecutorFutureCallback
       - 任务执行成功 taskBatchId:[2347] [{"status":1,"result":"执行分片任务完成","message":"任务执行成功"}]

      注意:

      • 该客户端执行了 id 范围为 10001-20000 和 30001-40000
      • 由于是并发数为 2,虽然每个任务执行用时 3 秒,但是花费了 3 秒的时间完成 2 个任务
    • 8090 端口客户端

      2024-10-09 09:32:11 [snail-netty-server-9] INFO  c.a.s.c.job.core.client.JobEndPoint
       - 批次:[2347] 任务调度成功. 
      2024-10-09 09:32:11 [snail-netty-server-10] INFO  c.a.s.c.job.core.client.JobEndPoint
       - 批次:[2347] 任务调度成功. 
      2024-10-09 09:32:11 [snail-job-job-2,347-1] INFO  c.m.test.TestStaticShardingJob
       - 开始执行分片任务,参数:20001,30000
      2024-10-09 09:32:11 [snail-job-job-2,347-2] INFO  c.m.test.TestStaticShardingJob
       - 开始执行分片任务,参数:1,10000
      2024-10-09 09:32:11 [snail-job-job-2,347-1] INFO  c.m.test.TestStaticShardingJob
       - 开始对id范围:20001-30000进行加密处理
      2024-10-09 09:32:11 [snail-job-job-2,347-2] INFO  c.m.test.TestStaticShardingJob
       - 开始对id范围:1-10000进行加密处理
      2024-10-09 09:32:14 [snail-job-job-2,347-2] INFO  c.m.test.TestStaticShardingJob
       - 对id范围:1-10000进行加密处理完成
      2024-10-09 09:32:14 [snail-job-job-2,347-1] INFO  c.m.test.TestStaticShardingJob
       - 对id范围:20001-30000进行加密处理完成
      2024-10-09 09:32:14 [snail-job-job-2,347-2] INFO  c.a.s.c.j.c.e.JobExecutorFutureCallback
       - 任务执行成功 taskBatchId:[2347] [{"status":1,"result":"执行分片任务完成","message":"任务执行成功"}]
      2024-10-09 09:32:14 [snail-job-job-2,347-1] INFO  c.a.s.c.j.c.e.JobExecutorFutureCallback
       - 任务执行成功 taskBatchId:[2347] [{"status":1,"result":"执行分片任务完成","message":"任务执行成功"}]

      注意:

      • 该客户端执行了 id 范围为 1-10001 和 20001-30000
      • 由于是并发数为 2,虽然每个任务执行用时 3 秒,但是花费了 3 秒的时间完成 2 个任务
    • 任务批次列表中执行时长

      image-20241009093842977

验证恢复策略

策略解释
恢复继续把失败的任务干完

继续把失败的任务干完,那可能你有疑问了。那超时的任务咋处理呢?所以这里我们验证两种情况:

  • 恢复策略中超时的任务

    不创建新的任务批次,等待任务完成。

  • 恢复策略中失败的任务

恢复策略中超时的任务

为了应用策略,前提就是要执行当前任务时,前一个任务还没有执行完成。我们如何来验证恢复策略把失败的任务继续干完呢?

  1. 客户端任务执行超过 1 分钟,而服务端触发是 1 分钟一次。这样保证了能应用策略。
  2. 不希望重试,所以可以把最大重试次数设置为 0。
  3. 超时时间又不能太短。防止它中断线程。所以这里超时设置 30 分钟
  • 客户端代码

    @Component
    public class TestStaticShardingRecovery extends AbstractJobExecutor {
        @Override
        protected ExecuteResult doJobExecute(JobArgs jobArgs) {
    
            // 最后一个分片参数时,客户端睡 90 秒
            if(jobArgs.getJobParams().toString().equals("20001,30000")) {
                try {
                    Thread.sleep(90000);
                    SnailJobLog.REMOTE.info("参数: {}睡眠 90 秒", jobArgs.getJobParams());
                } catch (InterruptedException e) {
                    return ExecuteResult.failure("任务执行失败.任务:"+jobArgs.getJobParams());
                }
            }
            return ExecuteResult.success("任务执行成功.任务:"+jobArgs.getJobParams());
        }
    }
    客户端代码仅仅对 20001,30000 这个批次休眠了 90 秒。其他两个分片任务应该是立刻执行成功。
  • 服务端分片任务配置

    配置项配置内容
    任务名称静态分片任务-恢复策略
    组名称service_plat
    状态启用
    任务类型静态分片
    执行器类型java
    执行器名称com.mayuanfei.test.TestStaticShardingRecovery
    方法参数分片参数 1 :1,10000
    分片参数 2 :10001,20000
    分片参数 3 :20001,30000
    阻塞策略恢复
    固定时间60秒
    最大重试次数0
    超时时间1800秒
    并行数3
  • 测试结果

    我们仅仅开一个 8090 端口的客户端进行测试,可以看到日志输出如下:

    image-20241017105332593

    image-20241017105409696

恢复策略中失败的任务

要保证前一个任务还没有执行完成,还要有失败。

  • 客户端代码

    @Component
    public class TestStaticShardingRecoveryFail extends AbstractJobExecutor {
        @Override
        protected ExecuteResult doJobExecute(JobArgs jobArgs) {
    
            // 第 2 个分片失败
            if(jobArgs.getJobParams().toString().equals("10001,20000")) {
                SnailJobLog.REMOTE.info("参数: {}失败", jobArgs.getJobParams());
                // 生成一个 1 到 100 的随机数,如果数值 >50 则返回失败;否则返回成功
                return ExecuteResult.failure("任务执行失败.任务:"+jobArgs.getJobParams());
    
            }
    
            // 最后一个分片参数时,客户端睡 90 秒
            if(jobArgs.getJobParams().toString().equals("20001,30000")) {
                try {
    
                    Thread.sleep(90000);
                    SnailJobLog.REMOTE.info("参数: {}睡眠 90 秒", jobArgs.getJobParams());
                } catch (InterruptedException e) {
                    return ExecuteResult.failure("任务执行失败.任务:"+jobArgs.getJobParams());
                }
            }
            return ExecuteResult.success("任务执行成功.任务:"+jobArgs.getJobParams());
        }
    }
  • 服务端分片任务配置

    | 配置项 | 配置内容 |

      | ------------ | ------------------------------------------------------------ |

    | 任务名称 | 静态分片任务-恢复策略-失败 |
    | 组名称 | service_plat |
    | 状态 | 启用 |
    | 任务类型 | 静态分片 |
    | 执行器类型 | java |
    | 执行器名称 | com.mayuanfei.test.TestStaticShardingRecoveryFail |
    | 方法参数 | 分片参数 1 :1,10000
    分片参数 2 :10001,20000
    分片参数 3 :20001,30000
    |
    | 阻塞策略 | 恢复 |
    | 固定时间 | 60秒 |
    | 最大重试次数 | 0 |
    | 超时时间 | 1800秒 |
    | 并行数 | 3 |

  • 测试结果

    image-20241017132004252

    当再次调度失败后,整个批次任务状态为处理失败:

    image-20241017132126391

总结

  • 静态分片任务是把大任务分为一个一个的小任务,交给客户端去执行。之所以前面要加一个静态这个定语,就是说你要事先定义好分片的范围。有静态就意味会有动态,这个在下面的章节中介绍。
  • 客户端代码通过 jobArgs.getJobParams() 获得事先分片好的参数内容
  • 并行数的配置,是设置客户端线程池并发的数量
  • 恢复策略恢复的任务是失败的任务

补充知识点

Spring Boot 的配置优先级:

命令行参数 > Java 系统属性 > 环境变量 > properties 配置文件 > yml 配置文件 > yaml 配置文件 > @PropertySource 注解 > 默认配置

命令行参数:通过 -- 传递的参数。

java -jar your-app.jar --server.port=8081

在 IDEA 中通过添加 Program arguments 来增加。

image-20241008102847500

Java 系统属性:使用 -D 设置的系统属性。

java -Dapp.env=development -Dapp.version=1.0.0 -jar your-app.jar

在 IDEA 中通过添加 Add VM options 来添加

image-20241008103500345

环境变量:操作系统的环境变量。

Spring Boot 项目中的配置文件properties 配置文件 > yml 配置文件 > yaml 配置文件

@PropertySource 注解:自定义的属性源。

默认配置:Spring Boot 内置的默认值。


本笔记原文来自博主 老马 9527 的文章 4.snail-job分片任务
如有侵权,请联系作者 马铃薯头 删除
最后修改:2025 年 10 月 31 日
温柔的好天气总是和我一样,帅的鸭皮!