【Java分享客栈】一文搞定京东零售开源的AsyncTool,彻底解决异步编排问题。

虚幻大学 xuhss 240℃ 0评论

? 优质资源分享 ?

学习路线指引(点击解锁) 知识定位 人群定位
? Python实战微信订餐小程序 ? 进阶级 本课程是python flask+微信小程序的完美结合,从项目搭建到腾讯云部署上线,打造一个全栈订餐系统。
?Python量化交易实战? 入门级 手把手带你打造一个易扩展、更安全、效率更高的量化交易系统

一、前言

本章主要是承接上一篇讲CompletableFuture的文章,想了解的可以先去看看案例:
https://juejin.cn/post/7091132240574283813
CompletableFuture已经提供了串行、并行等常用异步编排的方案,但在细节上还是有许多不足,比如回调方面,编排复杂顺序方面,就捉襟见肘了。
之前我有关注过Gitee上star量还不错的一款开源工具AsyncTool:
https://gitee.com/jd-platform-opensource/asyncTool
是由京东零售的高级工程师编写的,提供了非常丰富的异步编排功能,并且经过了京东内部的测试,是对CompletableFuture的封装和补足,试用了一下挺不错。

二、用法

1、引入

1)、不推荐:maven引入,这个比较坑,客观原因经常会导致依赖下载不下来,不推荐使用;
2)、推荐:直接下载源码,因为代码量很少,就几个核心类和测试类。

如下图所示,把下载的源码拷贝进来即可,核心代码放到java目录里面,测试代码放到test目录里面。

cee51f5bd2b248d6e169f9509e886005 - 【Java分享客栈】一文搞定京东零售开源的AsyncTool,彻底解决异步编排问题。

2、编写worker

1)、worker是AsyncTool中的一个思想,专门来处理任务的,比如查询、rpc调用等耗时操作,一个任务就是一个worker;
2)、构建worker十分简单,只需要实现IWorker和ICallback接口即可;
3)、这里,我们承接上一篇文章的案例,分别创建查询二十四节气和查询星座的worker;
4)、其中begin方法是构建开始时会执行,result方法是获取到结果后会执行,action方法就是处理具体任务的地方,一般业务就在这里编写,defaultValue方法提供超时异常时返回的默认值。

1)、二十四节气worker

package com.example.async.worker;

import cn.hutool.http.HttpUtil;
import com.jd.platform.async.callback.ICallback;
import com.jd.platform.async.callback.IWorker;
import com.jd.platform.async.executor.timer.SystemClock;
import com.jd.platform.async.worker.WorkResult;
import com.jd.platform.async.wrapper.WorkerWrapper;
import lombok.extern.slf4j.Slf4j;

import java.util.Map;
import java.util.concurrent.TimeUnit;

/**
 * 
 * 二十四节气worker
 * 

 *
 * @author 福隆苑居士,公众号:【Java分享客栈】
 * @since 2022-04-27 18:01
 */
@Slf4j
public class TwentyFourWorker implements IWorker, ICallback {

 public static final String APPKEY = "xxxxxx";// 你的appkey
 public static final String URL = "https://api.jisuapi.com/jieqi/query";

 @Override
 public void begin() {
 // System.out.println(Thread.currentThread().getName() + "- start --" + System.currentTimeMillis());
 }

 @Override
 public void result(boolean success, String param, WorkResult workResult) {
 if (success) {
 System.out.println("callback twentyFourWorker success--" + SystemClock.now() + "----" + workResult.getResult()
 + "-threadName:" +Thread.currentThread().getName());
 } else {
 System.err.println("callback twentyFourWorker failure--" + SystemClock.now() + "----" + workResult.getResult()
 + "-threadName:" +Thread.currentThread().getName());
 }
 }

 /**
 * 查询二十四节气
 */
 @Override
 public String action(String object, Map allWrappers) {
 String url = URL + "?appkey=" + APPKEY;
 String result = HttpUtil.get(url);

 // 模拟时长
 try {
 TimeUnit.SECONDS.sleep(5);
 } catch (Exception e) {
 log.error("[二十四节气]>>>> 异常: {}", e.getMessage(), e);
 }

 return result;
 }

 @Override
 public String defaultValue() {
 return "twentyFourWorker";
 }
}

2)、星座worker

package com.example.async.worker;

import cn.hutool.http.HttpUtil;
import com.jd.platform.async.callback.ICallback;
import com.jd.platform.async.callback.IWorker;
import com.jd.platform.async.executor.timer.SystemClock;
import com.jd.platform.async.worker.WorkResult;
import com.jd.platform.async.wrapper.WorkerWrapper;
import lombok.extern.slf4j.Slf4j;

import java.util.Map;
import java.util.concurrent.TimeUnit;

/**
 * 
 * 星座worker
 * 

 *
 * @author 福隆苑居士,公众号:【Java分享客栈】
 * @since 2022-04-27 18:01
 */
@Slf4j
public class ConstellationWorker implements IWorker, ICallback {

 public static final String APPKEY = "xxxxxx";// 你的appkey
 public static final String URL = "https://api.jisuapi.com/astro/all";

 @Override
 public void begin() {
 // System.out.println(Thread.currentThread().getName() + "- start --" + System.currentTimeMillis());
 }

 @Override
 public void result(boolean success, String param, WorkResult workResult) {
 if (success) {
 System.out.println("callback constellationWorker success--" + SystemClock.now() + "----" + workResult.getResult()
 + "-threadName:" +Thread.currentThread().getName());
 } else {
 System.err.println("callback constellationWorker failure--" + SystemClock.now() + "----" + workResult.getResult()
 + "-threadName:" +Thread.currentThread().getName());
 }
 }

 /**
 * 查询星座
 */
 @Override
 public String action(String object, Map allWrappers) {
 String url = URL + "?appkey=" + APPKEY;
 String result = HttpUtil.get(url);

 // 模拟异常
 // int i = 1/0;

 // 模拟时长
 try {
 TimeUnit.SECONDS.sleep(5);
 } catch (Exception e) {
 log.error("[星座]>>>> 异常: {}", e.getMessage(), e);
 }

 return result;
 }

 @Override
 public String defaultValue() {
 return "constellationWorker";
 }
}

3、异步编排

1)、新建一个AsyncToolService,在里面进行worker的声明、构建、编排;
2)、Async.beginWork就是执行异步任务,参数分别为超时时间和worker,其中超时时间可以自己设短一点看效果;
3)、最后封装结果返回即可,这里为演示案例节省时间直接用map返回。

package com.example.async.service;

import com.example.async.worker.ConstellationWorker;
import com.example.async.worker.TwentyFourWorker;
import com.jd.platform.async.executor.Async;
import com.jd.platform.async.wrapper.WorkerWrapper;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;

/**
 * 
 * AsyncTools服务
 * 

 *
 * @author 福隆苑居士,公众号:【Java分享客栈】
 * @since 2022-04-27 17:56
 */
@Service
@Slf4j
public class AsyncToolService {

   /**
 * 异步返回结果
 * ---- 方式:AsyncTool并行处理
 *
 * @return 结果
 */
   public Map queryAsync() throws ExecutionException, InterruptedException {
 // 声明worker
 TwentyFourWorker twentyFourWorker = new TwentyFourWorker();
 ConstellationWorker constellationWorker = new ConstellationWorker();

 // 构建二十四节气worker
 WorkerWrapper twentyFourWrapper = new WorkerWrapper.Builder()
 .worker(twentyFourWorker)
 .callback(twentyFourWorker)
 .param("0")
 .build();

 // 构建星座worker
 WorkerWrapper constellationWrapper = new WorkerWrapper.Builder()
 .worker(constellationWorker)
 .callback(constellationWorker)
 .param("1")
 .build();

 // 开始工作,这里设定超时时间10s,测试时可以设短一点看效果。
 Async.beginWork(10000, twentyFourWrapper, constellationWrapper);

 // 打印当前线程数
 log.debug("----------------- 当前线程数 ----------------");
 log.debug(Async.getThreadCount());

 // 打印结果
 log.debug("----------------- 二十四节气 ----------------");
 log.debug("结果: {}", twentyFourWrapper.getWorkResult());
 log.debug("----------------- 星座 ----------------");
 log.debug("结果: {}", constellationWrapper.getWorkResult());

 // 返回
 Map map = new HashMap<>();
 map.put("twentyFour", twentyFourWrapper.getWorkResult());
 map.put("constellation", constellationWrapper.getWorkResult());

 // 关闭(spring web类应用不用关闭,否则第二次执行会报线程池异常。)
 // Async.shutDown();

 return map;
 }
}

4、测试效果

上一篇的案例有演示同步执行的结果,在10秒左右,而CompletableFuture的结果在5秒多点。
这里测试后发现,AsyncTool的结果也是5秒左右,和CompletableFuture差不多,但AsyncTool提供的编排更丰富。

6b5da4ff43a3d3e0329a4fd322672c0f - 【Java分享客栈】一文搞定京东零售开源的AsyncTool,彻底解决异步编排问题。

我们把其中一个星座worker的任务耗时调大,模拟一下超时的效果。可以发现,AsyncTool直接返回了我们上面defaultValue方法中设置的默认值。

414bad10452a647f5e324b9ffd6dad7a - 【Java分享客栈】一文搞定京东零售开源的AsyncTool,彻底解决异步编排问题。

三、常用编排

AsyncTool其实提供了很丰富的异步编排方式,包括较复杂的编排,但以我呆过的中小企业为例,几乎用不到复杂编排,最常用的的还是并行以及串行+并行。

AsyncTool的QuickStart.md已经做了简洁的说明:https://gitee.com/jd-platform-opensource/asyncTool/blob/master/QuickStart.md

1)、任务并行

也就是本篇我们案例使用的编排,是我个人平时最常用的一种。

65d2d0ad624cd9654c5a0b94e0c7fe99 - 【Java分享客栈】一文搞定京东零售开源的AsyncTool,彻底解决异步编排问题。

2)、串行+并行

这种其实就是通过next()来做串行和并行的衔接,有些场景也会用到。

3637f5a84c3f8b006cc2436fb7469963 - 【Java分享客栈】一文搞定京东零售开源的AsyncTool,彻底解决异步编排问题。

3)、依赖其他任务的结果

这也是很常见的场景,B任务依赖A任务的结果来实现业务,最后返回。
AsyncTool也提供了很方便的方式:
1)、在service中构建worker-A时设置一个id名称;
2)、你可以发现worker的action方法第二个入参是个map,里面就是所有的wrapper;
3)、在worker-B的action方法中get这个id来获取wrapper从而拿到A的结果即可。

8ac0d5ef032d4768dfa1f9a4d9671865 - 【Java分享客栈】一文搞定京东零售开源的AsyncTool,彻底解决异步编排问题。

28938382a20168adb5ae57d459ac2215 - 【Java分享客栈】一文搞定京东零售开源的AsyncTool,彻底解决异步编排问题。

四、避坑经验

1、勿关闭线程池

AsyncTool提供了很多测试类,里面包含了所有编排方式,可以一一查看并验证,但使用过程中要注意一点,如果是spring-web项目,比如springboot,不需要手动Async.shutdown()处理,否则会执行一次后就关闭线程池了,这是不少人直接拷贝test代码疏忽的地方。

7e7f0798d8c258c29833c159dec6215a - 【Java分享客栈】一文搞定京东零售开源的AsyncTool,彻底解决异步编排问题。

2、自定义线程池

这个问题可以在AsyncTool的issue中看到相关讨论,作者君是根据京东零售的业务来决定使用什么线程池的,他们使用的默认线程池就是newCachedThreadPool,无限制长度的线程池,且具备复用特性,按照作者君的说法,因为京东的场景多数为低耗时(10ms)高并发,瞬间冲击的场景,所以最适合这种线程池。

db67a62af65c452ee932d3236407c92f - 【Java分享客栈】一文搞定京东零售开源的AsyncTool,彻底解决异步编排问题。

e36ff1d3eb02d31ccd4f4690f71a02c6 - 【Java分享客栈】一文搞定京东零售开源的AsyncTool,彻底解决异步编排问题。

而根据我的经验,不同公司的业务和项目都不同,中小企业往往依靠企事业单位生存,对接第三方厂家较多,rpc接口耗时往往较长且不可控,不符合京东零售低耗时高并发的特点,直接使用Integer.MAX_VALUE的无限制核心线程数的方式不太合适。
我建议中小企业使用自定义线程池,根据自身硬件水平和压测结果调整最终核心线程数和任务队列长度,确定合适的拒绝策略,比如直接拒绝或走主线程,这样会比较稳妥。

五、示例代码

完整示例代码提供给大家,里面有我的极速数据API的key,每天100次免费调用,省去注册账号,先到先测,慢点就只能等明天了哦。
链接:https://pan.baidu.com/doc/share/kJyph2LX076okHVWv38tlw-159275174957933
提取码:yqms


原创文章纯手打,觉得有点帮助就请点个推荐吧。

不定期分享工作经验和趣事,喜欢的话就请关注一下吧。

转载请注明:xuhss » 【Java分享客栈】一文搞定京东零售开源的AsyncTool,彻底解决异步编排问题。

喜欢 (0)

您必须 登录 才能发表评论!