高并发下的web异步处理方案(代码片段)

kuailefangyuan kuailefangyuan     2022-11-11     268

关键词:

高并发下的web异步处理方案

一、问题介绍

​ 平时web开发时(使用的servlet或者基于servlet封装的SpringMVC框架),业务处理基本都是同步处理,即业务处理与web容器接收线程为同一线程,每一次Http请求都由一个线程从头到尾负责处理。

​ 如果一个请求业务处理涉及IO操作,比如访问数据库、调用第三方服务接口等,那么其所对应的线程将同步地等待IO操作完成。而IO操作是非常慢的,将导致线程并不能及时地释放回线程池以供后续使用。在高并发请求条件下,web容器线程池将很快耗尽,降低服务吞吐量。

二、解决方案

​ 针对上述问题,解决办法就是另起单独线程去处理业务请求,提前释放当前服务接收线程,避免当前接收线程阻塞等待得不到释放,以缓解高并发下的线程池资源紧张,提高服务吞吐量。

​ 异步处理过程:当收到一个http请求后,tomcat等中间件的主线程调用副线程来执行业务处理,当副线程执行完成业务处理后,主线程再返回结果,在副线程执行业务处理过程中,主线程会空闲出来以处理其他请求,以此提升服务器的吞吐量。

​ servlet以及SpringMVC针对该问题都有对应的异步处理解决方案,具体如下:

2.1、Servlet3.0的异步处理

​ 在Servlet 3.0中,我们可以从HttpServletRequest对象中获得AsyncContext异步处理上下文,Request和Response对象都可从中获取。AsyncContext可以从当前线程传给其他线程,并在新的线程中完成对请求的处理并返回结果给客户端,初始线程便可以还回给容器线程池以处理更多的请求。

@Slf4j
@WebServlet(name = "simpleAsync", urlPatterns = "/simpleAsync", asyncSupported = true)
public class SimpleAsyncServlet extends HttpServlet 

    @Override
    public void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException 
        log.info("初始线程开始");
        AsyncContext asyncContext = request.startAsync();
        asyncContext.start(() -> 
            log.info("处理业务开始");
            try 
                Thread.sleep(2000L);
             catch (InterruptedException e) 
                e.printStackTrace();
            
            log.info("处理业务结束");

            try 
                asyncContext.getResponse().getWriter().write("success");
             catch (IOException e) 
                e.printStackTrace();
            
            asyncContext.complete();
        );
        log.info("初始线程结束");
    

Servlet3.0异步处理的主要流程如下:

  • 1、根据request获取AsyncContext异步处理上下文对象

  • 2、调用AsyncContext的start()方法执行异步处理,该方法向申请一单独线程,并在新线程中处理业务请求,原线程则被回收到主线程池中。

  • 3、业务请求处理完毕后调用complete()方法通知servlet容器。

事实上,高并发下这种方式对性能的改进不大,因为如果新的线程和初始线程共享同一个服务线程池的话,并不能改善线程池资源紧张的问题。

优化方案:使用单独的线程池执行业务处理,降低高并发下Servlet容器主线程池资源紧张问题。

@Slf4j
@WebServlet(name = "simpleAsyncWithThreadPool", urlPatterns = "/simpleAsyncWithThreadPool", asyncSupported = true)
public class SimpleAsyncServletWithThreadPool extends HttpServlet 

    public static ExecutorService executor = Executors.newFixedThreadPool(10);

    @Override
    public void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException 
        log.info("初始线程开始");

        AsyncContext asyncContext = request.startAsync();
        // 使用自定义线程池去异步执行业务处理
        executor.execute(new Runnable() 
            @Override
            public void run() 
                log.info("处理业务开始");
                try 
                    Thread.sleep(2000L);
                 catch (InterruptedException e) 
                    e.printStackTrace();
                
                log.info("处理业务结束");

                try 
                    asyncContext.getResponse().getWriter().write("success");
                 catch (IOException e) 
                    e.printStackTrace();
                
                asyncContext.complete();
            
        );

        log.info("初始线程结束");
    

2.2、Servlet3.1的异步处理优化

​ Servlet 3.0对请求的处理虽然是异步的,但是对InputStream和OutputStream的IO操作却依然是阻塞的,对于数据量大的请求体或者返回体,阻塞IO也将导致不必要的等待。因此在Servlet 3.1中引入了非阻塞IO(参考下图红框内容),通过在HttpServletRequest和HttpServletResponse中分别添加ReadListener和WriterListener方式,只有在IO数据满足一定条件时(比如数据准备好时),才进行后续的操作。

@Slf4j
@WebServlet(name = "nonBlockingAsync", urlPatterns = "/nonBlockingAsync", asyncSupported = true)
public class NonBlockingAsyncServlet extends HttpServlet 

    @Autowired
    private AsyncRestfulExecutor asyncRestfulExecutor;

    @Override
    public void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException 
        log.info("初始线程开始");
        AsyncContext asyncContext = request.startAsync();

        ServletInputStream inputStream = request.getInputStream();

        inputStream.setReadListener(new ReadListener() 
            @Override
            public void onDataAvailable() throws IOException 

            

            @Override
            public void onAllDataRead() throws IOException 
                asyncRestfulExecutor.execute(() -> 
                    log.info("处理业务开始");
                    try 
                        Thread.sleep(2000L);
                     catch (InterruptedException e) 
                        e.printStackTrace();
                    
                    log.info("处理业务结束");

                    try 
                        asyncContext.getResponse().getWriter().write("success");
                     catch (IOException e) 
                        e.printStackTrace();
                    

                    asyncContext.complete();

                );
            

            @Override
            public void onError(Throwable t) 
                asyncContext.complete();
            
        );
        log.info("初始线程结束");
    

2.3、SpringMVC的异步处理

首先展示一个SpringMVC同步处理的示例:

@Slf4j
@RestController
@RequestMapping("/user")
public class UserController 

    /**
     * 同步处理
     * @return
     * @throws InterruptedException
     */
    @GetMapping("/addUserSync")
    @ResponseBody
    public String addUserSync() throws InterruptedException 
        log.info("初始线程,开始");
        log.info("业务处理,开始");
        Thread.sleep(500L);
        log.info("业务处理,结束");
        log.info("初始线程,结束");
        return "success";
    

执行结果:

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-qzzyvdeD-1645847837675)(C:\\Users\\meifangyuan\\AppData\\Roaming\\Typora\\typora-user-images\\image-20220225221637418.png)]

根据日志输出可见,业务处理线程和初始线程为同一线程,高并发条件下,当业务处理耗时较长时,线程资源得不到释放,请求长时间占用服务连接池,将降低服务吞吐量。

DeferredResult和Callable都是为了异步生成返回值提供基本的支持。简单来说就是一个请求进来,如果你使用了DeferredResult或者Callable,在没有得到返回数据之前,DispatcherServlet和所有Filter就会退出Servlet容器线程,但响应保持打开状态,一旦返回数据有了,这个DispatcherServlet就会被再次调用并且处理,以异步产生的方式,向请求端返回值。

这么做的好处就是请求不会长时间占用服务连接池,提高服务器的吞吐量。

2.3.1、基于Callable的SpringMVC异步处理

    @GetMapping("/addUserSync1")
    @ResponseBody
    public Callable<String> addUserAsync1() throws InterruptedException 
        log.info("初始线程,开始");
        Callable<String> result = (()->
            log.info("业务处理,开始");
            Thread.sleep(500L);
            log.info("业务处理,结束");
            return "success";
        );
        log.info("初始线程,结束");
        return result;
    

controller直接返回Callable对象,SpringMVC会使用内置的线程池去异步执行业务处理。

执行流程:

  • 客户端请求服务
  • SpringMVC 调用 Controller,Controller 返回一个 Callback 对象
  • SpringMVC 调用 request.startAsync 并且将 Callback 提交到 TaskExecutor 中去执行
  • DispatcherServlet 以及 Filters 等从应用服务器线程中结束,但 Response 仍旧是打开状态,也就是说暂时还不返回给客户端
  • TaskExecutor 调用 Callback 返回一个结果,SpringMVC 将请求发送给应用服务器继续处理
  • DispatcherServlet 再次被调用并且继续处理 Callback 返回的对象,最终将其返回给客户端

2.3.2、基于DeferredResult的SpringMVC异步处理

    @GetMapping("/addUserAsync2")
    @ResponseBody
    public DeferredResult<String> addUserAsync2() 
        log.info("初始线程,开始");
        DeferredResult<String> deferredResult = new DeferredResult(1000L);
        /**
         * 绑定回调通知
         */
        // 异步处理业务完成后回调
        deferredResult.onCompletion(new Runnable() 
            @Override
            public void run() 
                log.info("异步处理执行完成回调");
            
        );

        // 异步处理业务超时回调
        deferredResult.onTimeout(new Runnable() 
            @Override
            public void run() 
                log.info("异步处理执行超时回调");
                deferredResult.setErrorResult("异步处理执行超时");
            
        );

        // 异步处理异常回调
        deferredResult.onError((throwable) -> 
            log.info("异步线程执行出错回调");
            deferredResult.setErrorResult("异步线程执行出错");
        );

        // 使用自定义线程池处理
        FIXED_THREAD_POOL.execute(new Runnable() 
            @SneakyThrows
            @Override
            public void run() 
                log.info("业务开始开始");
                Thread.sleep(500L);
                log.info("业务开始结束");

                deferredResult.setResult("success");
            
        );
        log.info("初始线程,结束");
        return deferredResult;
    

同样是异步处理,DefferedResult与Callable的区别在于:

  • DeferredResult 更灵活,可以主动 setResult 到 DeferredResult 中并返回,实现两个不相干的线程间通信

  • Callable 是由 SpringMVC 管理异步线程,而 DeferredResult 是自己创建线程池并管理异步线程

  • DefferedResult可以设置超时时间,完成|异常|超时回调。

3.3、基于DefferedResult的SpringMVC异步处理框架

​ 基于DefferedResult实现的SpringMVC异步处理抽象基类,该基类持有自定义的异步处理线程池,提供通用的异步业务执行方法,普通业务处理Controller只需继承该抽象类即可直接调用asyncInvoke异步执行方法。

public abstract class AbstractAsyncProcessController 

    @Autowired
    private BusinessProcessExecutor businessProcessExecutor;

    /**
     * 异步执行业务逻辑
     *
     * @param supplier 正常业务逻辑
     * @param function 异常情况
     * @param <T>
     * @return
     */
    protected  <T> DeferredResult<T> asyncInvoke(Supplier<T> supplier, BiFunction<String, String, T> function) 
        DeferredResult<T> deferredResult = new DeferredResult<>();
        CompletableFuture.supplyAsync(supplier, businessProcessExecutor)
                .whenCompleteAsync((result, e) -> 
                    if (e == null) 
                        deferredResult.setResult(result);
                        return;
                    

                    // 线程池不足
                    if (e instanceof ExecutionException) 
                        // R r = apply(T t) 方法接受参数,返回执行结果
                        deferredResult.setResult(function.apply("9999", "系统繁忙"));
                        return;
                    

                    // 其他内部错误
                    String errMsg = e.getMessage();
                    if (StringUtils.isNotBlank(errMsg)) 
                        errMsg = "系统内部错误,详细信息:" + errMsg;
                     else 
                        errMsg = "系统内部错误,请联系系统维护人员";
                    
                    deferredResult.setResult(function.apply("9999", errMsg));
                );
        return deferredResult;
    

    protected static <T extends BaseResponse> T buildErrorResponse(BaseRequest request, Class<T> responseClass, String errCode, String errMsg) 
        T response;
        try 
            response = responseClass.newInstance();
         catch (InstantiationException | IllegalAccessException e) 
            throw new RuntimeException(errMsg, e);
        
        BeanUtils.copyProperties(request, response);
        response.setErrCode(errCode);
        response.setErrMsg(errMsg);
        return response;
    


以下提供一个异步处理controller示例:

@Slf4j
@RestController
@RequestMapping("/person")
public class PersonController extends AbstractAsyncProcessController 

    @Autowired
    private PersonService personService;

    @PostMapping("/add")
    public DeferredResult<AddPersonResponse> add(@RequestBody AddPersonRequest request) 
        return asyncInvoke(() -> personService.addPerson(request),
                (errCode, errMsg) -> buildErrorResponse(request, AddPersonResponse.class, errCode, errMsg));
    


关于高并发下kafkaproducersend异步发送耗时问题的分析(代码片段)

最近开发网关服务的过程当中,需要用到kafka转发消息与保存日志,在进行压测的过程中由于是多线程并发操作kafkaproducer进行异步send,发现send耗时有时会达到几十毫秒的阻塞,很大程度上上影响了并发的性能,而在后续的测试... 查看详情

面试实战考核:设计一个高并发下的下单功能(代码片段)

功能需求:设计一个秒杀系统初始方案商品表设计:热销商品提供给用户秒杀,有初始库存。@EntitypublicclassSecKillGoodsimplementsSerializable@IdprivateStringid;/***剩余库存*/privateIntegerremainNum;/***秒杀商品名称*/privateStringgoodsName;秒杀订单表... 查看详情

高并发下保证接口幂等性方案(代码片段)

生命无罪,健康万岁,我是laity。我曾七次鄙视自己的灵魂:第一次,当它本可进取时,却故作谦卑;第二次,当它在空虚时,用爱欲来填充;第三次,在困难和容易之间,它选择了容易... 查看详情

今日小结(代码片段)

...,使用更少的内存和资源nginx处理请求是异步非阻塞,在高并发下nginx能保持低资源低耗能性能高度模块化设计,编写模块相对简单nginx处理静态文件好,静态处理性能比apache高三倍以上nginx作为负载均衡器,支持7层负载均衡适合... 查看详情

漫画:高并发下的hashmap(代码片段)

这一期我们来讲解高并发环境下,HashMap可能出现的致命问题。               HashMap的容量是有限的。当经过多次元素插入,使得HashMap达到一定饱和度时,Key映射位置发生冲突... 查看详情

全网最全的高并发下常见的限流算法!(代码片段)

限流简介现在说到高可用系统,都会说到高可用的保护手段:缓存、降级和限流,本博文就主要说说限流。限流是流量限速(RateLimit)的简称,是指只允许指定的事件进入系统,超过的部分将被拒绝服... 查看详情

高并发下常见的限流算法都在这了!(代码片段)

限流简介现在说到高可用系统,都会说到高可用的保护手段:缓存、降级和限流,本博文就主要说说限流。限流是流量限速(RateLimit)的简称,是指只允许指定的事件进入系统,超过的部分将被拒绝服... 查看详情

高并发下,hashmap会产生哪些问题?(代码片段)

HashMap在高并发环境下会产生的问题HashMap其实并不是线程安全的,在高并发的情况下,会产生并发引起的问题:比如:HashMap死循环,造成CPU100%负载触发fail-fast下面逐个分析下出现上述情况的原因:HashMap死循环的原因HashMap进行存... 查看详情

深入理解分布式事务,高并发下分布式事务的解决方案

这两天正在研究微服务架构中分布式事务的处理方案,做一个小小的总结,作为备忘.如有错误,欢迎指正!概念澄清事务补偿机制:在事务链中的任何一个正向事务操作,都必须存在一个完全符合回滚规则的可逆事务.CAP理论:CAP(Consistency... 查看详情

数据存储redis第四章:高并发下实现分布式锁(代码片段)

直接上代码:大部分互联网公司实现分布式锁原理/***分布式锁底层实现原理*@return*/@GetMapping("distributedLock")publicObjectdistributedLock()StringlockKey="distributedLockKey";//给每个线程都设置一个唯一标识,避免出现程序执行的时间超过设置的... 查看详情

数据存储redis第四章:高并发下实现分布式锁(代码片段)

直接上代码:大部分互联网公司实现分布式锁原理/***分布式锁底层实现原理*@return*/@GetMapping("distributedLock")publicObjectdistributedLock()StringlockKey="distributedLockKey";//给每个线程都设置一个唯一标识,避免出现程序执行的时间超过设置的... 查看详情

高并发下秒杀商品,你必须知道的9个细节(代码片段)

前言高并发下如何设计秒杀系统?这是一个高频面试题。这个问题看似简单,但是里面的水很深,它考查的是高并发场景下,从前端到后端多方面的知识。秒杀一般出现在商城的促销活动中,指定了一定数量&#... 查看详情

高并发下秒杀商品,你必须知道的9个细节(代码片段)

大家好,我是苏三,又跟大家见面了。前言高并发下如何设计秒杀系统?这是一个高频面试题。这个问题看似简单,但是里面的水很深,它考查的是高并发场景下,从前端到后端多方面的知识。秒杀一般出... 查看详情

高并发下如何避免产生重复数据?(代码片段)

前言最近测试给我提了一个bug,说我之前提供的一个批量复制商品的接口,产生了重复的商品数据。追查原因之后发现,这个事情没想象中简单,可以说一波多折。1.需求产品有个需求:用户选择一些品牌࿰... 查看详情

高并发下如何避免产生重复数据?(代码片段)

前言最近测试给我提了一个bug,说我之前提供的一个批量复制商品的接口,产生了重复的商品数据。追查原因之后发现,这个事情没想象中简单,可以说一波多折。1.需求产品有个需求:用户选择一些品牌࿰... 查看详情

高并发下秒杀商品,这9个细节得知道(代码片段)

大家好,我是bigsai,又跟大家见面了。前言高并发下如何设计秒杀系统?这是一个高频面试题。这个问题看似简单,但是里面的水很深,它考查的是高并发场景下,从前端到后端多方面的知识。秒杀一般出... 查看详情

高并发下保证接口的幂等性的几种方式(代码片段)

高并发下保证接口的幂等性的几种方式​--洱涷Zz场景不知道你有没有遇到过这些场景:有时我们在填写某些form表单时,保存按钮不小心快速点了两次,表中竟然产生了两条重复的数据,只是id不一样。我们在项目... 查看详情

高并发下保证接口的幂等性的几种方式(代码片段)

高并发下保证接口的幂等性的几种方式​--洱涷Zz场景不知道你有没有遇到过这些场景:有时我们在填写某些form表单时,保存按钮不小心快速点了两次,表中竟然产生了两条重复的数据,只是id不一样。我们在项目... 查看详情