前言

Snail Job 目前最新的版本已经来到了 v1.4.0 版本。最直观的感受是服务端的后台报警信息少了很多。可能与通讯模式默认改为 gRPC 有关。

本文主要是针对工作流任务进行测试。在前面的 6 篇文章,我们搭建一个普通定时任务已经完全没有问题了,而这个工作流其实就是把多个普通定时进行编排,从而形成一个有前后执行顺序甚至是有条件的流程任务。

抓重点

第一段叽里呱啦说了一大段,真正有营养的就最后一句话。拆解这句话,可以归纳:

  1. 工作流是普通定时任务的集合
  2. 工作流有前后执行顺序
  3. 工作流在执行某个任务可以加条件

本文目标 1

这里实现一个假设的业务场景:一个是微信的账单收集任务,统计某一天的总金额;另一个是支付宝的账单收集任务,同样也是统计某一天的总金额;等这两个任务完成后再执行一个会汇总账单总金额的定时任务。

基础的流程就如上图这么简单的业务流程。后面可能随着深入学习,此图可能会有所变化。

目标 1 实现

客户端开发环境

  • JDK 版本:openjdk-21.0.2
  • snail-job 版本:1.4.0

客户端 Maven 依赖

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <!-- snail-job 客户端依赖 -->
    <dependency>
        <groupId>com.aizuda</groupId>
        <artifactId>snail-job-client-starter</artifactId>
        <version>1.4.0</version>
    </dependency>
    <!-- snail-job 重试相关依赖 -->
    <dependency>
        <groupId>com.aizuda</groupId>
        <artifactId>snail-job-client-retry-core</artifactId>
        <version>1.4.0</version>
    </dependency>
    <!-- snail-job 客户端核心依赖 -->
    <dependency>
        <groupId>com.aizuda</groupId>
        <artifactId>snail-job-client-job-core</artifactId>
        <version>1.4.0</version>
    </dependency>
</dependencies>

客户端代码

Dto 对象

这个对象用于在几个定时任务的上下文中进行传输。

@Data
public class BillDto {
    /**
     * 账单 ID
     */
    private Long billId;
    /**
     * 账单渠道
     */
    private String billChannel;
    /**
     * 账单日期
     */
    private String billDate;
    /**
     * 账单金额
     */
    private BigDecimal billAmount;

}

微信账单任务

该任务就是用于微信账单的统计任务。该任务从工作流的上下文中获得一个 settlementDate 清算日期值,如果是 sysdate 就设置当前天为账单日期。并且把 BillDto 放到上下文中,供后面的任务使用。代码如下:

@Component
@JobExecutor(name = "wechatBillTask")
public class WechatBillTask {
    public ExecuteResult jobExecute(JobArgs jobArgs) throws InterruptedException {
        BillDto billDto = new BillDto();
        billDto.setBillId(123456789L);
        billDto.setBillChannel("wechat");
        // 从上下文中获得清算日期并设置,如果上下文中清算日期
        // 是 sysdate 设置为当前日期;否则取管理页面设置的值
        String settlementDate = (String) jobArgs.getWfContext().get("settlementDate");
        if(StrUtil.equals(settlementDate, "sysdate")) {
            settlementDate = DateUtil.today();
        }
        billDto.setBillDate(settlementDate);
        billDto.setBillAmount(new BigDecimal("1234.56"));
        // 把 billDto 对象放入上下文进行传递
        jobArgs.appendContext("wechat", JSONUtil.toJsonStr(billDto));
        SnailJobLog.REMOTE.info("上下文: {}", jobArgs.getWfContext());
        return ExecuteResult.success(billDto);
    }
}

补充说明:

我这里采用注解方式来实现定时任务。如果想通过实现类方式请自行参考前面的几篇文章。

支付宝账单任务

支付宝账单任务同微信账单任务。

@Component
@JobExecutor(name = "alipayBillTask")
public class AlipayBillTask {
    public ExecuteResult jobExecute(JobArgs jobArgs) throws InterruptedException {
        BillDto billDto = new BillDto();
        billDto.setBillId(23456789L);
        billDto.setBillChannel("alipay");
        // 设置清算日期
        String settlementDate = (String) jobArgs.getWfContext().get("settlementDate");
        if(StrUtil.equals(settlementDate, "sysdate")) {
            settlementDate = DateUtil.today();
        }
        billDto.setBillDate(settlementDate);
        billDto.setBillAmount(new BigDecimal("2345.67"));
        // 把 billDto 对象放入上下文进行传递
        jobArgs.appendContext("alipay", JSONUtil.toJsonStr(billDto));
        SnailJobLog.REMOTE.info("上下文: {}", jobArgs.getWfContext());
        return ExecuteResult.success(billDto);
    }
}

汇总账单任务

该任务就是从上下文取得微信、支付宝账单内容,把 dto 中的金额进行汇总。代码如下:

@Component
@JobExecutor(name = "summaryBillTask")
public class SummaryBillTask {
    public ExecuteResult jobExecute(JobArgs jobArgs) throws InterruptedException {
        // 获得微信账单
        BigDecimal wechatAmount = BigDecimal.valueOf(0);
        String wechat = (String) jobArgs.getWfContext("wechat");
        if(StrUtil.isNotBlank(wechat)) {
            BillDto wechatBillDto = JSONUtil.toBean(wechat, BillDto.class);
            wechatAmount = wechatBillDto.getBillAmount();
        }
        // 获得支付宝账单
        BigDecimal alipayAmount = BigDecimal.valueOf(0);
        String alipay = (String) jobArgs.getWfContext("alipay");
        if(StrUtil.isNotBlank(alipay)) {
            BillDto alipayBillDto = JSONUtil.toBean(alipay, BillDto.class);
            alipayAmount = alipayBillDto.getBillAmount();
        }
        // 汇总账单
        BigDecimal totalAmount = wechatAmount.add(alipayAmount);
        SnailJobLog.REMOTE.info("总金额: {}", totalAmount);
        return ExecuteResult.success(totalAmount);
    }
}

服务端配置

定时任务配置

  1. 微信对账单任务

    配置项配置内容
    任务名称微信对账单
    状态启用
    任务类型集群
    自定义执行器wechatBillTask
    路由策略轮询
    阻塞策略丢弃
    触发类型工作流
    超时时间60 秒
    最大重试次数0
    重试间隔1 秒

    说明:

    这里很关键的一个配置项是触发类型。当设置为工作流触发时,阻塞策略以工作流的策略为准,所以这里配置什么已经不重要了。

  2. 支付宝对账单任务

    配置项配置内容
    任务名称支付宝对账单
    自定义执行器alipayBillTask
    其他配置项同上面的微信对账单任务
  3. 汇总账单金额任务

    配置项配置内容
    任务名称汇总账单金额
    自定义执行器summaryBillTask
    其他配置项同上面的微信对账单任务

工作流配置

  1. 工作流总体配置

    image-20250327155154652

    说明:

    • 这里是对整个工作流的配置,这里设置的触发类型和间隔时间,实际上是调用具体定时任务的时间
    • 执行超时时间,是所有任务执行完成的超时时间。所以最好根据实际情合理的设置。
    • 阻塞策略,同理也是该工作流下所有任务的阻塞策略。

      阻塞策略策略功能解释
      丢弃放弃创建新工作流批次你都没干完,我不干
      覆盖关闭还在执行的工作流批次,执行新工作流批次你别干了,我来干
      并行不管正在执行的工作流批次,直接执行新的工作流批次直接开干
      恢复执行正在执行中的工作流批次中的失败任务(非重试)继续把失败的任务干完
    • 工作流上下文,可以通过点击减号去掉不用设置。该例中是需要的。
    • 节点状态。这里关闭或者开启该工作流。
  2. 微信对账单任务

    image-20250327160407635

    说明:

    • 优先级:同一级中数字越小越先执行
    • 失败策略:

      类型说明
      跳过工作流会跳过当前失败任务,继续向下执行,并且若最终执行完成,
      工作流状态会是处理成功。
      阻塞工作流会被阻塞在当前任务阶段,等待手动重试。若重试成功,
      工作流还会自动向下执行未执行完的任务。
    • 节点状态:

      类型说明
      开启任务流调度会经过该节点。
      关闭任务流调度会跳过该节点。

      就目前的 1.4.0 版本而言,如果把一个节点状态设置为关闭后,记得要把失败策略手动设置为跳过。不然工作流还是会阻塞住。这个开发团队后面可能会进行调整。

  3. 支付宝账单任务

    image-20250327165214320

  4. 汇总账单金额任务

    image-20250327165517686

  5. 整体配置后效果图

    image-20250327165714978

验证测试

启动服务端和客户端后,查看工作流的执行批次。

image-20250327171738387

image-20250327171738387

本文目标 2

这里想使用工作流中的决策节点。这里增加一个需求,要按照上下文中的 channel 渠道如果是 wechat 走微信对账单任务;其他走支付宝对账单任务。

image-20250328155702732

目标 2 实现

客户端代码

不用改动

工作流配置

image-20250328160216419

增加微信任务条件

image-20250328160458032

表达式类型包括:SpEL、Aviator 和 QL。我这里就知道 SpEL,就拿这个试试。不过受到安全限制,这里的 SpEL 上下文是用 SimpleEvaluationContext。我这里列出和 StandardEvaluationContext 的区别。

特性SimpleEvaluationContextStandardEvaluationContext
设计目的限制功能以提高安全性,适用于简单场景提供完整功能,但可能引入安全风险
默认注册的 Java 类型自动注册 java.lang 包下的所有类
允许 T() 类型引用默认禁用默认启用
适用场景数据绑定、简单表达式复杂表达式、需要访问静态方法/类

image-20250328161721806

说明:

  1. 表达式类型:这里我仅仅会 SpEL 表达式
  2. 条件表达式:这里就是判断上下文中的 channel 是不是等于 wechat
  3. 模拟上下文:这里单纯用来测试验证用的,这里设置的值,并不会影响实际上下文中的值

添加任务节点

在上面的微信渠道任务条件下,把微信对账单任务添加进来。这个和前面目标 1 介绍的方式是一样的。

image-20250328162747259

支付宝的任务节点添加到其他情况这个分支中,汇总账单金额任务不变,最后的整体流程如下:

image-20250328163210990

验证测试

  1. 渠道是 wechat

    2025-03-28 16:46:21 [snail-grpc-server-5] INFO  c.a.s.c.job.core.client.JobEndPoint
     - 批次:[8260] 任务调度成功.
    2025-03-28 16:46:21 [snail-job-job-8,260-1] INFO  c.mayuanfei.workflow.WechatBillTask
     - 上下文: {channel=wechat, settlementDate=sysdate}
    2025-03-28 16:46:21 [snail-job-job-8,260-1] INFO  c.a.s.c.j.c.e.JobExecutorFutureCallback
     - 任务执行成功 taskBatchId:[8260] [{"status":1,"result":{"billId":123456789,"billChannel":"wechat","billDate":"2025-03-28","billAmount":1234.56},"message":"任务执行成功"}]
    2025-03-28 16:46:22 [snail-job-grpc-server-executor-11] INFO  c.a.s.c.c.rpc.server.GrpcInterceptor
     - method invoked: UnaryRequest/unaryRequest cast:0ms
    2025-03-28 16:46:22 [snail-grpc-server-6] INFO  c.a.s.c.job.core.client.JobEndPoint
     - 批次:[8262] 任务调度成功.
    2025-03-28 16:46:22 [snail-job-job-8,262-1] INFO  c.mayuanfei.workflow.SummaryBillTask
     - 总金额: 1234.56
    2025-03-28 16:46:22 [snail-job-job-8,262-1] INFO  c.a.s.c.j.c.e.JobExecutorFutureCallback
     - 任务执行成功 taskBatchId:[8262] [{"status":1,"result":1234.56,"message":"任务执行成功"}]

    从测试结果看,只有微信任务和汇总任务执行了。

  2. 渠道是 alipay

    2025-03-28 16:49:25 [snail-grpc-server-11] INFO  c.a.s.c.job.core.client.JobEndPoint
     - 批次:[8276] 任务调度成功.
    2025-03-28 16:49:25 [snail-job-job-8,276-1] INFO  c.mayuanfei.workflow.AlipayBillTask
     - 上下文: {channel=alipay, settlementDate=sysdate}
    2025-03-28 16:49:25 [snail-job-job-8,276-1] INFO  c.a.s.c.j.c.e.JobExecutorFutureCallback
     - 任务执行成功 taskBatchId:[8276] [{"status":1,"result":{"billId":23456789,"billChannel":"alipay","billDate":"2025-03-28","billAmount":2345.67},"message":"任务执行成功"}]
    2025-03-28 16:49:25 [snail-job-grpc-server-executor-23] INFO  c.a.s.c.c.rpc.server.GrpcInterceptor
     - method invoked: UnaryRequest/unaryRequest cast:0ms
    2025-03-28 16:49:25 [snail-grpc-server-12] INFO  c.a.s.c.job.core.client.JobEndPoint
     - 批次:[8277] 任务调度成功.
    2025-03-28 16:49:25 [snail-job-job-8,277-1] INFO  c.mayuanfei.workflow.SummaryBillTask
     - 总金额: 2345.67
    2025-03-28 16:49:25 [snail-job-job-8,277-1] INFO  c.a.s.c.j.c.e.JobExecutorFutureCallback
     - 任务执行成功 taskBatchId:[8277] [{"status":1,"result":2345.67,"message":"任务执行成功"}]

    从测试结果看,只有支付宝和汇总任务执行了。

本文目标 3

这里想使用工作流中的回调通知。这里增加一个需求,就是在汇总账单金额任务执行完成后调用一个接口,把最后汇总的金额通知给这个接口。

image-20250328171814122

目标 3 实现

接受回调的 Controller

@Slf4j
@RestController
@RequestMapping("/workflow")
public class WorkflowCallbackController {

    @PostMapping("/callback")
    public void callback(@RequestBody CallbackParamsDTO callbackParams,
                         @RequestHeader HttpHeaders headers) {
        // callbackParams 对象可以获取到当前回调通知之前的上下文内容
        // secret 是当回调通知的秘钥,用于鉴权
        String secret = headers.getFirst("secret");
        log.info("callback: {}, secret:{}", callbackParams, secret);
        log.info("完成任务推送到监控");
    }
}

说明:

  • CallbackParamsDTO 可以接受回调的参数。当然这里需要引用 snail-job 的包,如果是通知到其他服务,这里可以用字符串接收。
  • header 中有秘钥的信息。通过该秘钥可以鉴权。
  • 回调可以做很多业务以为的事情。这里就是结合实际情况进行运用了。可能更多情况是不需要用到回调的,但是如果当你需要某个工作流执行完成后,再做一些额外的事情。那么这个 webhook 就很有用了。

增加回调通知

  1. 调出添加对话框

    image-20250401083453649

  2. 添加回调通知

    image-20250401083800046

    说明

    • webhook:回调地址
    • 请求类型:可以是 JSON 或者 Form 表单
    • 秘钥:通讯秘钥
    • 回调通知状态:这里估计是页面写错误。这里的工作流状态其实应该是通知的状态。

总结

  • 工作流其实就是把多个普通定时进行编排,从而形成一个有前后执行顺序甚至是有条件的流程任务。
  • 上下文传获得参数通过 jobArgs.getWfContext()
  • 上下文添加参数通过 jobArgs.appendContext()
  • 如果一个定时任务的触发方式是工作流,那么这个定时任务的阻塞策略以工作流的策略为准
  • 工作流的任务节点,如果想关闭该节点。记得几点状态选关闭时,失败策略要选跳过。
  • 添加节点的 SpEL 采用的上下文是 SimpleEvaluationContext 所以无法引用很多 Java 内置方法。
  • 回调通知获得秘钥通过:headers.getFirst("secret");
  • 回调通知接收可以通过 CallbackParamsDTO 这个 snail-job 内置对象;也可以通过字符串接收。

本笔记原文来自博主 老马 9527 的文章 7.snail-job的工作流任务
如有侵权,请联系作者 马铃薯头 删除
最后修改:2025 年 11 月 03 日
温柔的好天气总是和我一样,帅的鸭皮!