前言
Snail Job 目前最新的版本已经来到了 v1.4.0 版本。最直观的感受是服务端的后台报警信息少了很多。可能与通讯模式默认改为 gRPC 有关。
本文主要是针对工作流任务进行测试。在前面的 6 篇文章,我们搭建一个普通定时任务已经完全没有问题了,而这个工作流其实就是把多个普通定时进行编排,从而形成一个有前后执行顺序甚至是有条件的流程任务。
抓重点
第一段叽里呱啦说了一大段,真正有营养的就最后一句话。拆解这句话,可以归纳:
- 工作流是普通定时任务的集合
- 工作流有前后执行顺序
- 工作流在执行某个任务可以加条件
本文目标 1
这里实现一个假设的业务场景:一个是微信的账单收集任务,统计某一天的总金额;另一个是支付宝的账单收集任务,同样也是统计某一天的总金额;等这两个任务完成后再执行一个会汇总账单总金额的定时任务。

基础的流程就如上图这么简单的业务流程。后面可能随着深入学习,此图可能会有所变化。
目标 1 实现
客户端开发环境
- JDK 版本:openjdk-21.0.2
- snail-job 版本:1.4.0
客户端 Maven 依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- snail-job 客户端依赖 -->
<dependency>
<groupId>com.aizuda</groupId>
<artifactId>snail-job-client-starter</artifactId>
<version>1.4.0</version>
</dependency>
<!-- snail-job 重试相关依赖 -->
<dependency>
<groupId>com.aizuda</groupId>
<artifactId>snail-job-client-retry-core</artifactId>
<version>1.4.0</version>
</dependency>
<!-- snail-job 客户端核心依赖 -->
<dependency>
<groupId>com.aizuda</groupId>
<artifactId>snail-job-client-job-core</artifactId>
<version>1.4.0</version>
</dependency>
</dependencies>客户端代码
Dto 对象
这个对象用于在几个定时任务的上下文中进行传输。
@Data
public class BillDto {
/**
* 账单 ID
*/
private Long billId;
/**
* 账单渠道
*/
private String billChannel;
/**
* 账单日期
*/
private String billDate;
/**
* 账单金额
*/
private BigDecimal billAmount;
}微信账单任务
该任务就是用于微信账单的统计任务。该任务从工作流的上下文中获得一个 settlementDate 清算日期值,如果是 sysdate 就设置当前天为账单日期。并且把 BillDto 放到上下文中,供后面的任务使用。代码如下:
@Component
@JobExecutor(name = "wechatBillTask")
public class WechatBillTask {
public ExecuteResult jobExecute(JobArgs jobArgs) throws InterruptedException {
BillDto billDto = new BillDto();
billDto.setBillId(123456789L);
billDto.setBillChannel("wechat");
// 从上下文中获得清算日期并设置,如果上下文中清算日期
// 是 sysdate 设置为当前日期;否则取管理页面设置的值
String settlementDate = (String) jobArgs.getWfContext().get("settlementDate");
if(StrUtil.equals(settlementDate, "sysdate")) {
settlementDate = DateUtil.today();
}
billDto.setBillDate(settlementDate);
billDto.setBillAmount(new BigDecimal("1234.56"));
// 把 billDto 对象放入上下文进行传递
jobArgs.appendContext("wechat", JSONUtil.toJsonStr(billDto));
SnailJobLog.REMOTE.info("上下文: {}", jobArgs.getWfContext());
return ExecuteResult.success(billDto);
}
}补充说明:
我这里采用注解方式来实现定时任务。如果想通过实现类方式请自行参考前面的几篇文章。
支付宝账单任务
支付宝账单任务同微信账单任务。
@Component
@JobExecutor(name = "alipayBillTask")
public class AlipayBillTask {
public ExecuteResult jobExecute(JobArgs jobArgs) throws InterruptedException {
BillDto billDto = new BillDto();
billDto.setBillId(23456789L);
billDto.setBillChannel("alipay");
// 设置清算日期
String settlementDate = (String) jobArgs.getWfContext().get("settlementDate");
if(StrUtil.equals(settlementDate, "sysdate")) {
settlementDate = DateUtil.today();
}
billDto.setBillDate(settlementDate);
billDto.setBillAmount(new BigDecimal("2345.67"));
// 把 billDto 对象放入上下文进行传递
jobArgs.appendContext("alipay", JSONUtil.toJsonStr(billDto));
SnailJobLog.REMOTE.info("上下文: {}", jobArgs.getWfContext());
return ExecuteResult.success(billDto);
}
}汇总账单任务
该任务就是从上下文取得微信、支付宝账单内容,把 dto 中的金额进行汇总。代码如下:
@Component
@JobExecutor(name = "summaryBillTask")
public class SummaryBillTask {
public ExecuteResult jobExecute(JobArgs jobArgs) throws InterruptedException {
// 获得微信账单
BigDecimal wechatAmount = BigDecimal.valueOf(0);
String wechat = (String) jobArgs.getWfContext("wechat");
if(StrUtil.isNotBlank(wechat)) {
BillDto wechatBillDto = JSONUtil.toBean(wechat, BillDto.class);
wechatAmount = wechatBillDto.getBillAmount();
}
// 获得支付宝账单
BigDecimal alipayAmount = BigDecimal.valueOf(0);
String alipay = (String) jobArgs.getWfContext("alipay");
if(StrUtil.isNotBlank(alipay)) {
BillDto alipayBillDto = JSONUtil.toBean(alipay, BillDto.class);
alipayAmount = alipayBillDto.getBillAmount();
}
// 汇总账单
BigDecimal totalAmount = wechatAmount.add(alipayAmount);
SnailJobLog.REMOTE.info("总金额: {}", totalAmount);
return ExecuteResult.success(totalAmount);
}
}服务端配置
定时任务配置
微信对账单任务
配置项 配置内容 任务名称 微信对账单 状态 启用 任务类型 集群 自定义执行器 wechatBillTask 路由策略 轮询 阻塞策略 丢弃 触发类型 工作流 超时时间 60 秒 最大重试次数 0 重试间隔 1 秒 说明:
这里很关键的一个配置项是触发类型。当设置为工作流触发时,阻塞策略以工作流的策略为准,所以这里配置什么已经不重要了。
支付宝对账单任务
配置项 配置内容 任务名称 支付宝对账单 自定义执行器 alipayBillTask 其他配置项同上面的微信对账单任务
汇总账单金额任务
配置项 配置内容 任务名称 汇总账单金额 自定义执行器 summaryBillTask 其他配置项同上面的微信对账单任务
工作流配置
工作流总体配置
说明:
- 这里是对整个工作流的配置,这里设置的触发类型和间隔时间,实际上是调用具体定时任务的时间
- 执行超时时间,是所有任务执行完成的超时时间。所以最好根据实际情合理的设置。
阻塞策略,同理也是该工作流下所有任务的阻塞策略。
阻塞策略 策略功能 解释 丢弃 放弃创建新工作流批次 你都没干完,我不干 覆盖 关闭还在执行的工作流批次,执行新工作流批次 你别干了,我来干 并行 不管正在执行的工作流批次,直接执行新的工作流批次 直接开干 恢复 执行正在执行中的工作流批次中的失败任务(非重试) 继续把失败的任务干完 - 工作流上下文,可以通过点击减号去掉不用设置。该例中是需要的。
- 节点状态。这里关闭或者开启该工作流。
微信对账单任务
说明:
- 优先级:同一级中数字越小越先执行
失败策略:
类型 说明 跳过 工作流会跳过当前失败任务,继续向下执行,并且若最终执行完成,
工作流状态会是处理成功。阻塞 工作流会被阻塞在当前任务阶段,等待手动重试。若重试成功,
工作流还会自动向下执行未执行完的任务。节点状态:
类型 说明 开启 任务流调度会经过该节点。 关闭 任务流调度会跳过该节点。 就目前的 1.4.0 版本而言,如果把一个节点状态设置为关闭后,记得要把失败策略手动设置为跳过。不然工作流还是会阻塞住。这个开发团队后面可能会进行调整。
支付宝账单任务
汇总账单金额任务
整体配置后效果图
验证测试
启动服务端和客户端后,查看工作流的执行批次。
本文目标 2
这里想使用工作流中的决策节点。这里增加一个需求,要按照上下文中的 channel 渠道如果是 wechat 走微信对账单任务;其他走支付宝对账单任务。
目标 2 实现
客户端代码
不用改动
工作流配置
增加微信任务条件
表达式类型包括:SpEL、Aviator 和 QL。我这里就知道 SpEL,就拿这个试试。不过受到安全限制,这里的 SpEL 上下文是用 SimpleEvaluationContext。我这里列出和 StandardEvaluationContext 的区别。
| 特性 | SimpleEvaluationContext | StandardEvaluationContext |
|---|---|---|
| 设计目的 | 限制功能以提高安全性,适用于简单场景 | 提供完整功能,但可能引入安全风险 |
| 默认注册的 Java 类型 | 无 | 自动注册 java.lang 包下的所有类 |
允许 T() 类型引用 | 默认禁用 | 默认启用 |
| 适用场景 | 数据绑定、简单表达式 | 复杂表达式、需要访问静态方法/类 |
说明:
- 表达式类型:这里我仅仅会 SpEL 表达式
- 条件表达式:这里就是判断上下文中的
channel是不是等于- 模拟上下文:这里单纯用来测试验证用的,这里设置的值,并不会影响实际上下文中的值
添加任务节点
在上面的微信渠道任务条件下,把微信对账单任务添加进来。这个和前面目标 1 介绍的方式是一样的。
支付宝的任务节点添加到其他情况这个分支中,汇总账单金额任务不变,最后的整体流程如下:
验证测试
渠道是
wechat2025-03-28 16:46:21 [snail-grpc-server-5] INFO c.a.s.c.job.core.client.JobEndPoint - 批次:[8260] 任务调度成功. 2025-03-28 16:46:21 [snail-job-job-8,260-1] INFO c.mayuanfei.workflow.WechatBillTask - 上下文: {channel=wechat, settlementDate=sysdate} 2025-03-28 16:46:21 [snail-job-job-8,260-1] INFO c.a.s.c.j.c.e.JobExecutorFutureCallback - 任务执行成功 taskBatchId:[8260] [{"status":1,"result":{"billId":123456789,"billChannel":"wechat","billDate":"2025-03-28","billAmount":1234.56},"message":"任务执行成功"}] 2025-03-28 16:46:22 [snail-job-grpc-server-executor-11] INFO c.a.s.c.c.rpc.server.GrpcInterceptor - method invoked: UnaryRequest/unaryRequest cast:0ms 2025-03-28 16:46:22 [snail-grpc-server-6] INFO c.a.s.c.job.core.client.JobEndPoint - 批次:[8262] 任务调度成功. 2025-03-28 16:46:22 [snail-job-job-8,262-1] INFO c.mayuanfei.workflow.SummaryBillTask - 总金额: 1234.56 2025-03-28 16:46:22 [snail-job-job-8,262-1] INFO c.a.s.c.j.c.e.JobExecutorFutureCallback - 任务执行成功 taskBatchId:[8262] [{"status":1,"result":1234.56,"message":"任务执行成功"}]从测试结果看,只有微信任务和汇总任务执行了。
渠道是
alipay2025-03-28 16:49:25 [snail-grpc-server-11] INFO c.a.s.c.job.core.client.JobEndPoint - 批次:[8276] 任务调度成功. 2025-03-28 16:49:25 [snail-job-job-8,276-1] INFO c.mayuanfei.workflow.AlipayBillTask - 上下文: {channel=alipay, settlementDate=sysdate} 2025-03-28 16:49:25 [snail-job-job-8,276-1] INFO c.a.s.c.j.c.e.JobExecutorFutureCallback - 任务执行成功 taskBatchId:[8276] [{"status":1,"result":{"billId":23456789,"billChannel":"alipay","billDate":"2025-03-28","billAmount":2345.67},"message":"任务执行成功"}] 2025-03-28 16:49:25 [snail-job-grpc-server-executor-23] INFO c.a.s.c.c.rpc.server.GrpcInterceptor - method invoked: UnaryRequest/unaryRequest cast:0ms 2025-03-28 16:49:25 [snail-grpc-server-12] INFO c.a.s.c.job.core.client.JobEndPoint - 批次:[8277] 任务调度成功. 2025-03-28 16:49:25 [snail-job-job-8,277-1] INFO c.mayuanfei.workflow.SummaryBillTask - 总金额: 2345.67 2025-03-28 16:49:25 [snail-job-job-8,277-1] INFO c.a.s.c.j.c.e.JobExecutorFutureCallback - 任务执行成功 taskBatchId:[8277] [{"status":1,"result":2345.67,"message":"任务执行成功"}]从测试结果看,只有支付宝和汇总任务执行了。
本文目标 3
这里想使用工作流中的回调通知。这里增加一个需求,就是在汇总账单金额任务执行完成后调用一个接口,把最后汇总的金额通知给这个接口。
目标 3 实现
接受回调的 Controller
@Slf4j
@RestController
@RequestMapping("/workflow")
public class WorkflowCallbackController {
@PostMapping("/callback")
public void callback(@RequestBody CallbackParamsDTO callbackParams,
@RequestHeader HttpHeaders headers) {
// callbackParams 对象可以获取到当前回调通知之前的上下文内容
// secret 是当回调通知的秘钥,用于鉴权
String secret = headers.getFirst("secret");
log.info("callback: {}, secret:{}", callbackParams, secret);
log.info("完成任务推送到监控");
}
}说明:
CallbackParamsDTO可以接受回调的参数。当然这里需要引用 snail-job 的包,如果是通知到其他服务,这里可以用字符串接收。header中有秘钥的信息。通过该秘钥可以鉴权。- 回调可以做很多业务以为的事情。这里就是结合实际情况进行运用了。可能更多情况是不需要用到回调的,但是如果当你需要某个工作流执行完成后,再做一些额外的事情。那么这个 webhook 就很有用了。
增加回调通知
调出添加对话框
添加回调通知
说明
- webhook:回调地址
- 请求类型:可以是 JSON 或者 Form 表单
- 秘钥:通讯秘钥
- 回调通知状态:这里估计是页面写错误。这里的工作流状态其实应该是通知的状态。
总结
- 工作流其实就是把多个普通定时进行编排,从而形成一个有前后执行顺序甚至是有条件的流程任务。
- 上下文传获得参数通过
jobArgs.getWfContext() - 上下文添加参数通过
jobArgs.appendContext() - 如果一个定时任务的触发方式是工作流,那么这个定时任务的阻塞策略以工作流的策略为准
- 工作流的任务节点,如果想关闭该节点。记得几点状态选关闭时,失败策略要选跳过。
- 添加节点的 SpEL 采用的上下文是
SimpleEvaluationContext所以无法引用很多 Java 内置方法。 - 回调通知获得秘钥通过:
headers.getFirst("secret"); - 回调通知接收可以通过
CallbackParamsDTO这个 snail-job 内置对象;也可以通过字符串接收。
本笔记原文来自博主 老马 9527 的文章 7.snail-job的工作流任务
如有侵权,请联系作者 马铃薯头 删除















