12

我的应用程序应该有 2 个核心端点:pushpull用于发送和获取数据。

拉取操作应该异步工作并产生 DeferredResult。当用户调用 pull service over rest 时,会创建新的 DeferedResult 并将其存储到Map<Long, DefferedResult> results = new ConcurrentHashMap<>()等待新数据的位置或直到超时到期。

推送操作也会调用用户休息,并且此操作会检查结果映射以查找此操作推送的数据的接收者。当 map 包含接收者的结果时,这些数据被设置为他的结果,DefferedResult 被返回。

这是基本代码:

@Service
public class FooServiceImpl {
    Map<Long, DefferedResult> results = new ConcurrentHashMap<>();

    @Transactional
    @Override
    public DeferredResult<String> pull(Long userId) {
        // here is database call, String data = fooRepository.getNewData(); where I check if there are some new data in database, and if there are, just return it, if not add deferred result into collection to wait for it
        DeferredResult<String> newResult = new DeferredResult<>(5000L);
        results.putIfAbsent(userId, newResult);
        newResult.onCompletion(() -> results.remove(userId));

        // if (data != null)
        //      newResult.setResult(data);

        return newResult;
    }

    @Transactional
    @Override
    public void push(String data, Long recipientId) {
        // fooRepository.save(data, recipientId);
        if (results.containsKey(recipientId)) {
            results.get(recipientId).setResult(data);
        }
    }
}

代码按我预期的那样工作,问题是它也应该适用于多个用户。我猜将调用拉操作的最大活动用户最多为 1000。因此,每次调用 pull 最多需要 5 秒,正如我在 DeferedResult 中设置的那样,但事实并非如此。

正如您在图片中看到的,如果我立即从我的 javascript 客户端多次调用其余的 pull 操作,您可以看到任务将按顺序执行而不是同时执行。我上次触发的任务大约需要 25 秒,但我需要当 1000 个用户同时执行拉操作时,该操作最多需要 5 秒 + 延迟。

在此处输入图像描述

如何配置我的应用程序以同时执行这些任务并确保每个任务大约 5 秒或更短(当另一个用户向等待用户发送内容时)?我尝试将此配置添加到属性文件中:

server.tomcat.max-threads=1000

还有这个配置:

@Configuration
public class AsyncConfig extends AsyncSupportConfigurer {

    @Override
    protected AsyncTaskExecutor getTaskExecutor() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setCorePoolSize(1000);
        taskExecutor.initialize();
        return taskExecutor;
    }
}

但这并没有帮助,仍然是相同的结果。你能帮我配置一下吗?

编辑:

这就是我从角度调用此服务的方式:

this.http.get<any>(this.url, {params})
  .subscribe((data) => {
    console.log('s', data);
  }, (error) => {
    console.log('e', error);
  });

当我尝试使用这样的纯 JS 代码多次调用它时:

function httpGet()
{
    var xmlHttp = new XMLHttpRequest();
    xmlHttp.open( "GET", 'http://localhost:8080/api/pull?id=1', true );
    xmlHttp.send( null );
    return xmlHttp.responseText;
}
setInterval(httpGet, 500);

它将更快地执行每个请求调用(大约 7 秒)。我预计增加是导致数据库调用服务,但它仍然优于 25 秒。以角度调用此服务有什么问题吗?

编辑2:

我尝试了另一种形式的测试,而不是浏览器,我使用了 jMeter。我在 100 个线程中执行 100 个请求,结果如下:

在此处输入图像描述

如您所见,请求将按 10 进行,达到 50 个请求后应用程序抛出异常:

java.sql.SQLTransientConnectionException: HikariPool-1 - Connection is not available, request timed out after 30000ms.
    at com.zaxxer.hikari.pool.HikariPool.createTimeoutException(HikariPool.java:667) ~[HikariCP-2.7.8.jar:na]
    at com.zaxxer.hikari.pool.HikariPool.getConnection(HikariPool.java:183) ~[HikariCP-2.7.8.jar:na]
    at com.zaxxer.hikari.pool.HikariPool.getConnection(HikariPool.java:148) ~[HikariCP-2.7.8.jar:na]
    at com.zaxxer.hikari.HikariDataSource.getConnection(HikariDataSource.java:128) ~[HikariCP-2.7.8.jar:na]
    at org.hibernate.engine.jdbc.connections.internal.DatasourceConnectionProviderImpl.getConnection(DatasourceConnectionProviderImpl.java:122) ~[hibernate-core-5.2.16.Final.jar:5.2.16.Final]
    at org.hibernate.internal.NonContextualJdbcConnectionAccess.obtainConnection(NonContextualJdbcConnectionAccess.java:35) ~[hibernate-core-5.2.16.Final.jar:5.2.16.Final]
    at org.hibernate.resource.jdbc.internal.LogicalConnectionManagedImpl.acquireConnectionIfNeeded(LogicalConnectionManagedImpl.java:106) ~[hibernate-core-5.2.16.Final.jar:5.2.16.Final]
    at org.hibernate.resource.jdbc.internal.LogicalConnectionManagedImpl.getPhysicalConnection(LogicalConnectionManagedImpl.java:136) ~[hibernate-core-5.2.16.Final.jar:5.2.16.Final]
    at org.hibernate.internal.SessionImpl.connection(SessionImpl.java:523) ~[hibernate-core-5.2.16.Final.jar:5.2.16.Final]
    at sun.reflect.GeneratedMethodAccessor61.invoke(Unknown Source) ~[na:na]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_171]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_171]
    at org.springframework.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:223) ~[spring-core-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.util.ReflectionUtils.invokeMethod(ReflectionUtils.java:207) ~[spring-core-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.orm.jpa.vendor.HibernateJpaDialect$HibernateConnectionHandle.doGetConnection(HibernateJpaDialect.java:391) ~[spring-orm-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.orm.jpa.vendor.HibernateJpaDialect.beginTransaction(HibernateJpaDialect.java:154) ~[spring-orm-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.orm.jpa.JpaTransactionManager.doBegin(JpaTransactionManager.java:400) ~[spring-orm-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.transaction.support.AbstractPlatformTransactionManager.getTransaction(AbstractPlatformTransactionManager.java:378) ~[spring-tx-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.transaction.interceptor.TransactionAspectSupport.createTransactionIfNecessary(TransactionAspectSupport.java:474) ~[spring-tx-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:289) ~[spring-tx-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:98) ~[spring-tx-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:185) ~[spring-aop-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.aop.interceptor.ExposeInvocationInterceptor.invoke(ExposeInvocationInterceptor.java:92) ~[spring-aop-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:185) ~[spring-aop-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:689) ~[spring-aop-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at sk.moe.zoya.service.impl.FooServiceImpl$$EnhancerBySpringCGLIB$$ebab570a.pull(<generated>) ~[classes/:na]
    at sk.moe.zoya.web.FooController.pull(FooController.java:25) ~[classes/:na]
    at sun.reflect.GeneratedMethodAccessor60.invoke(Unknown Source) ~[na:na]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_171]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_171]
    at org.springframework.web.method.support.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:209) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.web.method.support.InvocableHandlerMethod.invokeForRequest(InvocableHandlerMethod.java:136) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.web.servlet.mvc.method.annotation.ServletInvocableHandlerMethod.invokeAndHandle(ServletInvocableHandlerMethod.java:102) ~[spring-webmvc-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.invokeHandlerMethod(RequestMappingHandlerAdapter.java:877) ~[spring-webmvc-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.web.servlet.mvc.method.annotation.RequestMappingHandlerAdapter.handleInternal(RequestMappingHandlerAdapter.java:783) ~[spring-webmvc-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.web.servlet.mvc.method.AbstractHandlerMethodAdapter.handle(AbstractHandlerMethodAdapter.java:87) ~[spring-webmvc-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet.java:991) ~[spring-webmvc-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.web.servlet.DispatcherServlet.doService(DispatcherServlet.java:925) ~[spring-webmvc-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.web.servlet.FrameworkServlet.processRequest(FrameworkServlet.java:974) ~[spring-webmvc-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.web.servlet.FrameworkServlet.doGet(FrameworkServlet.java:866) ~[spring-webmvc-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:635) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
    at org.springframework.web.servlet.FrameworkServlet.service(FrameworkServlet.java:851) ~[spring-webmvc-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at javax.servlet.http.HttpServlet.service(HttpServlet.java:742) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:231) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
    at org.apache.tomcat.websocket.server.WsFilter.doFilter(WsFilter.java:52) ~[tomcat-embed-websocket-8.5.29.jar:8.5.29]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
    at org.springframework.web.filter.RequestContextFilter.doFilterInternal(RequestContextFilter.java:99) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
    at org.springframework.web.filter.HttpPutFormContentFilter.doFilterInternal(HttpPutFormContentFilter.java:109) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
    at org.springframework.web.filter.HiddenHttpMethodFilter.doFilterInternal(HiddenHttpMethodFilter.java:81) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
    at org.springframework.web.filter.CharacterEncodingFilter.doFilterInternal(CharacterEncodingFilter.java:200) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.web.filter.OncePerRequestFilter.doFilter(OncePerRequestFilter.java:107) ~[spring-web-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:193) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
    at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:166) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
    at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:198) ~[tomcat-embed-core-8.5.29.jar:8.5.29]
    at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:96) [tomcat-embed-core-8.5.29.jar:8.5.29]
    at org.apache.catalina.authenticator.AuthenticatorBase.invoke(AuthenticatorBase.java:496) [tomcat-embed-core-8.5.29.jar:8.5.29]
    at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:140) [tomcat-embed-core-8.5.29.jar:8.5.29]
    at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:81) [tomcat-embed-core-8.5.29.jar:8.5.29]
    at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:87) [tomcat-embed-core-8.5.29.jar:8.5.29]
    at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:342) [tomcat-embed-core-8.5.29.jar:8.5.29]
    at org.apache.coyote.http11.Http11Processor.service(Http11Processor.java:803) [tomcat-embed-core-8.5.29.jar:8.5.29]
    at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:66) [tomcat-embed-core-8.5.29.jar:8.5.29]
    at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:790) [tomcat-embed-core-8.5.29.jar:8.5.29]
    at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1459) [tomcat-embed-core-8.5.29.jar:8.5.29]
    at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49) [tomcat-embed-core-8.5.29.jar:8.5.29]
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [na:1.8.0_171]
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [na:1.8.0_171]
    at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61) [tomcat-embed-core-8.5.29.jar:8.5.29]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_171]

2018-06-02 13:21:47.163  WARN 26978 --- [io-8080-exec-48] o.h.engine.jdbc.spi.SqlExceptionHelper   : SQL Error: 0, SQLState: null
2018-06-02 13:21:47.163  WARN 26978 --- [io-8080-exec-40] o.h.engine.jdbc.spi.SqlExceptionHelper   : SQL Error: 0, SQLState: null
2018-06-02 13:21:47.163 ERROR 26978 --- [io-8080-exec-48] o.h.engine.jdbc.spi.SqlExceptionHelper   : HikariPool-1 - Connection is not available, request timed out after 30000ms.
2018-06-02 13:21:47.163 ERROR 26978 --- [io-8080-exec-40] o.h.engine.jdbc.spi.SqlExceptionHelper   : HikariPool-1 - Connection is not available, request timed out after 30000ms.
2018-06-02 13:21:47.164 ERROR 26978 --- [io-8080-exec-69] o.a.c.c.C.[.[.[/].[dispatcherServlet]    : Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Request processing failed; nested exception is org.springframework.transaction.CannotCreateTransactionException: Could not open JPA EntityManager for transaction; nested exception is org.hibernate.exception.JDBCConnectionException: Unable to acquire JDBC Connection] with root cause

我还注释了我使用存储库的代码,以确保数据库没有任何内容,并且结果相同。此外,我使用 AtomicLong 类为每个请求设置了唯一的 userId。

编辑 3:

当我发表评论时,我发现@Transactional一切正常!那么你能告诉我如何在不增加延迟的情况下为大量操作设置spring的事务吗?

我添加spring.datasource.maximumPoolSize=1000了我认为应该增加的池大小,所以唯一的问题是如何使用@Transactional 加速方法。

每个对 pull 方法的调用都用 @Transactional 注释,因为我首先需要从数据库加载数据并检查是否有新数据,因为是的,我不必创建等待延迟结果。push 方法也必须使用 @Transaction 进行注释,因为我首先需要将接收到的数据存储在数据库中,然后将该值设置为等待结果。对于我的数据,我使用的是 Postgres。

4

3 回答 3

6

这里的问题似乎是您的数据库池中的连接用完了。

您已将方法标记为,@Transaction但您的控制器也期望方法的结果,即DeferredResult尽快交付,以便释放线程。

现在,这是运行请求时发生的情况:

  • @Transaction功能在 Spring 代理中实现,该代理必须打开连接,调用您的主题方法,然后提交或回滚事务。
  • 因此,当您的控制器调用该fooService.pull方法时,它实际上是在调用一个代理。
  • 代理必须首先从池中请求连接,然后调用您的服务方法,该方法在该事务中执行一些数据库操作。最后,它必须提交或回滚事务并最终将连接返回到池中。
  • 毕竟,您的方法返回 a DeferredResult,然后将其传递给控制器​​以使其返回。

现在,问题在于它DeferredResult的设计方式应该是异步使用的。换句话说,promise 预计稍后会在其他线程中解决,我们应该尽快释放请求线程。

事实上,关于DeferredResult的 Spring 文档说:

@GetMapping("/quotes")
@ResponseBody
public DeferredResult<String> quotes() {
    DeferredResult<String> deferredResult = new DeferredResult<String>();
    // Save the deferredResult somewhere..
    return deferredResult;
}

// From some other thread...
deferredResult.setResult(data);

您的代码中的问题正是DeferredResult在同一个请求线程中解决。

所以,问题是当 Spring 代理请求连接到数据库池时,当您进行重负载测试时,许多请求会发现池已满并且没有可用的连接。所以请求被搁置,但此时你的DeferredResult还没有被创建,所以它的超时功能不存在。

您的请求基本上是在等待来自数据库池的一些连接可用。因此,假设 5 秒过去了,然后请求获得了连接,现在您获得DeferredResult了控制器用来处理响应的连接。最终,5 秒后超时。因此,您必须添加等待池连接的时间和等待DeferredResult解决的时间。

这就是为什么您可能会看到,当您使用 JMeter 进行测试时,请求时间会随着数据库池中的连接耗尽而逐渐增加。

您可以通过添加以下 application.properties 文件来为线程池启用一些日志记录:

logging.level.com.zaxxer.hikari=DEBUG

您还可以配置数据库池的大小,甚至添加一些 JMX 支持,以便您可以从 Java Mission Control 监控它:

spring.datasource.hikari.maximumPoolSize=10
spring.datasource.hikari.registerMbeans=true

使用 JMX 支持,您将能够看到数据库池是如何耗尽的。

这里的技巧在于将解决承诺的逻辑移动到另一个线程:

@Override
public DeferredResult pull(Long previousId, String username) {


    DeferredResult result = createPollingResult(previousId, username);

    CompletableFuture.runAsync(() -> {
        //this is where you encapsulate your db transaction
        List<MessageDTO> messages = messageService.findRecents(previousId, username); // should be final or effective final
        if (messages.isEmpty()) {
           pollingResults.putIfAbsent(username, result);
        } else {
           result.setResult(messages);
        }
    });

    return result;
}

通过这样做,您DeferredResult会立即返回,并且 Spring 可以在释放宝贵的 Tomcat 线程的同时发挥其异步请求处理的魔力。

于 2018-06-01T20:06:39.053 回答
2

我认为你需要生产者和消费者的结构模型。我为你写代码。我希望它会帮助你。

这是示例代码:

延迟结果存储

@Component
public class DeferredResultStrore {

    private Queue<DeferredResult<String>> responseBodyQueue;
    private HashMap<String, List<DeferredResult<InterfaceModel>>> groupMap;
    private final long resultTimeOut;

    public DeferredResultStrore() {
        responseBodyQueue = new LinkedBlockingQueue<DeferredResult<String>>();
        groupMap = new HashMap<String, List<DeferredResult<InterfaceModel>>>();
        // write time.
        resultTimeOut = 1000 * 60 * 60;
    }

    public Queue<DeferredResult<String>> getResponseBodyQueue() {
        return responseBodyQueue;
    }

    public HashMap<String, List<DeferredResult<InterfaceModel>>> getGroupMap() {
        return groupMap;
    }

    public long getResultTimeOut() {
        return resultTimeOut;
    }

}

延迟结果服务

public interface DeferredResultService {

    public DeferredResult<?> biteResponse(HttpServletResponse resp, HttpServletRequest req);

    public DeferredResult<?> biteGroupResponse(String key, HttpServletResponse resp);

}

DeferredResultServiceImpl

@Service
public class DeferredResultServiceImpl implements DeferredResultService {

    @Autowired
    private DeferredResultStrore deferredResultStore;

    @Override
    public DeferredResult<?> biteResponse(final HttpServletResponse resp, HttpServletRequest req) {

        final DeferredResult<String> defResult = new DeferredResult<String>(deferredResultStore.getResultTimeOut());

        removeObserver(resp, defResult, null);

        deferredResultStore.getResponseBodyQueue().add(defResult);

        return defResult;
    }

    @Override
    public DeferredResult<?> biteGroupResponse(String key, final HttpServletResponse resp) {

        final DeferredResult<InterfaceModel> defResult = new DeferredResult<InterfaceModel>(
                deferredResultStore.getResultTimeOut());

        List<DeferredResult<InterfaceModel>> defResultList = null;

        removeObserver(resp, defResult, key);

        if (deferredResultStore.getGroupMap().containsKey(key)) {

            defResultList = deferredResultStore.getGroupMap().get(key);
            defResultList.add(defResult);

        } else {

            defResultList = new ArrayList<DeferredResult<InterfaceModel>>();
            defResultList.add(defResult);
            deferredResultStore.getGroupMap().put(key, defResultList);

        }

        return defResult;
    }

    private void removeObserver(final HttpServletResponse resp, final DeferredResult<?> defResult, final String key) {

        defResult.onCompletion(new Runnable() {
            public void run() {
                if (key != null) {
                    List<DeferredResult<InterfaceModel>> defResultList = deferredResultStore.getGroupMap().get(key);

                    if (defResultList != null) {
                        for (DeferredResult<InterfaceModel> deferredResult : defResultList) {
                            if (deferredResult.hashCode() == defResult.hashCode()) {
                                defResultList.remove(deferredResult);
                            }
                        }
                    }

                } else {
                    if (!deferredResultStore.getResponseBodyQueue().isEmpty()) {
                        deferredResultStore.getResponseBodyQueue().remove(defResult);
                    }
                }
            }
        });

        defResult.onTimeout(new Runnable() {
            public void run() {
                // 206
                resp.setStatus(HttpServletResponse.SC_PARTIAL_CONTENT);

                if (key != null) {
                    List<DeferredResult<InterfaceModel>> defResultList = deferredResultStore.getGroupMap().get(key);

                    if (defResultList != null) {
                        for (DeferredResult<InterfaceModel> deferredResult : defResultList) {
                            if (deferredResult.hashCode() == defResult.hashCode()) {

                                InterfaceModel model = new InterfaceModel();
                                model.setId(key);
                                model.setMessage("onTimeout");

                                deferredResult.setErrorResult(model);
                                defResultList.remove(deferredResult);
                            }
                        }
                    }

                } else {
                    defResult.setErrorResult("onTimeout");
                    deferredResultStore.getResponseBodyQueue().remove(defResult);
                }
            }
        });
    }

}

推送服务

public interface PushService {

    public boolean pushMessage(String message);

    public boolean pushGroupMessage(String key, String topic, HttpServletResponse resp);

}

PushServiceImpl

@Service
public class PushServiceImpl implements PushService {

    @Autowired
    private DeferredResultStrore deferredResultStore;

    @Override
    public boolean pushMessage(String message) {

        if (!deferredResultStore.getResponseBodyQueue().isEmpty()) {

            for (DeferredResult<String> deferredResult : deferredResultStore.getResponseBodyQueue()) {

                deferredResult.setResult(message);
            }

            deferredResultStore.getResponseBodyQueue().remove();
        }

        return true;
    }

    @Override
    public boolean pushGroupMessage(String key, String topic, HttpServletResponse resp) {
        List<DeferredResult<InterfaceModel>> defResultList = null;

        // select data in DB. that is sample group push service. need to connect db.
        InterfaceModel model = new InterfaceModel();
        model.setMessage("write group message.");
        model.setId(key);

        if (deferredResultStore.getGroupMap().containsKey(key)) {
            defResultList = deferredResultStore.getGroupMap().get(key);

            for (DeferredResult<InterfaceModel> deferredResult : defResultList) {
                deferredResult.setResult(model);
            }

            deferredResultStore.getGroupMap().remove(key);
        }

        return true;
    }

}

接口模型

public class InterfaceModel {

    private String message;

    private int idx;
    private String id;

    // DB Column

    public InterfaceModel() {
        // TODO Auto-generated constructor stub
    }

    public InterfaceModel(String message, int idx, String id) {
        this.message = message;
        this.idx = idx;
        this.id = id;
    }

    public String getMessage() {
        return message;
    }

    public void setMessage(String message) {
        this.message = message;
    }

    public int getIdx() {
        return idx;
    }

    public void setIdx(int idx) {
        this.idx = idx;
    }

    public String getId() {
        return id;
    }

    public void setId(String id) {
        this.id = id;
    }

}

web.xml

异步支持在设置中非常重要。

<servlet>
    <servlet-name>appServlet</servlet-name>
    <servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
    <init-param>
        <param-name>contextConfigLocation</param-name>
        <param-value>/WEB-INF/spring/appServlet/servlet-context.xml</param-value>
    </init-param>
    <load-on-startup>1</load-on-startup>
    <async-supported>true</async-supported>
</servlet>

Java基础

@Bean
public ServletRegistrationBean dispatcherServlet() {
    ServletRegistrationBean registration = new ServletRegistrationBean(
            new DispatcherServlet(), "/");
    registration.setAsyncSupported(true);
    return registration;
}

实际上 :

DeferredResult 与打开的请求相关联。当请求完成时,DeferredResult 被从 map 中移除,然后,客户端发出一个新的长轮询请求,它添加了一个新的 DeferredResult 实例


Spring Boot 将自动将应用程序上下文中的任何 Servlet bean 注册到 servlet 容器。默认情况下,异步支持设置为 true,因此除了为 Servlet 创建 bean 之外,您无需做任何事情。

@Aligtor, for you => public @interface EnableAsync 启用 Spring 的异步方法执行能力,类似于 Spring 的 XML 命名空间中的功能。

于 2018-06-01T06:21:34.030 回答
2

正如许多人提到的那样,这不是测试性能的正确方法。您要求在特定时间段执行自动请求,就像在 XMLHttpRequest 中所做的那样。您可以使用interval如下Observable

从“rxjs/Observable”导入 {Observable};

从“rxjs/订阅”导入{订阅};

private _intervalSubscription: Subscription;

ngOnInit() {
    this._intervalSubscription = Observable.interval(500).subscribe(x => {
        this.getDataFromServer();
    });
}

ngOnDestroy(): void {
    this._intervalSubscription.unsubscribe();
}

getDataFromServer() {
    // do your network call
    this.http.get<any>(this.url, {params})
                .subscribe((data) => {
                    console.log('s', data);
                }, (error) => {
                    console.log('e', error);
                }); 
}

这是从客户端进行轮询的最佳方式。

编辑 1

private prevRequestTime: number;

ngAfterViewInit(): void {
    this.getDataFromServer();
}

getDataFromServer() {
    this.prevRequestTime = Date.now();
    // do your network call
    this.http.get<any>(this.url, {params})
            .subscribe((data) => {
                console.log('s', data);
                this.scheduleRequestAgain();
            }, (error) => {
                console.log('e', error);
                this.scheduleRequestAgain();
            }); 
}

scheduleRequestAgain() {
    let diff = Date.now() - this.prevRequestTime;
    setTimeout(this.getDataFromServer(), diff);
}
于 2018-06-01T19:30:55.147 回答