1

以下是场景:

  1. 我创建了一个反应堆 kafka 接收器
  2. 从 kafka 接收器消耗的数据被发布到 WebSocketHanlder
  3. WebSocketHanlder 使用 SimpleUrlHandlerMapping 映射到 URL
  4. URL 模式是 api/v1/ws/{ID},我希望根据 URI 中使用的不同 ID 创建多个 WebSocketSession,这些 ID 由单个 WebSocketHanlder 管理,这实际上正在发生
  5. 但是当来自 kafka 接收器的数据发布时,只有第一个创建的 WebSocketSession 接收到它,所有其他 WebSocketSession 都没有接收到数据
  6. 我正在使用带有 starter-tomcat 的 spring-boot 2.6.3

如何将数据发布到创建的所有 WebSocketSessions 我的代码:

Web 套接字处理程序的配置


@Configuration
@Slf4j
public class OneSecPollingWebSocketConfig
{
   private OneSecPollingWebSocketHandler oneSecPollingHandler;

   @Autowired
   public OneSecPollingWebSocketConfig(OneSecPollingWebSocketHandler oneSecPollingHandler)
   {
      this.oneSecPollingHandler = oneSecPollingHandler;
   }

   @Bean
   public HandlerMapping webSocketHandlerMapping()
   {
      log.info("onesecpolling websocket configured");
      Map<String, WebSocketHandler> handlerMap = new HashMap<>();
      handlerMap.put(WEB_SOCKET_ENDPOINT, oneSecPollingHandler);
      SimpleUrlHandlerMapping mapping = new SimpleUrlHandlerMapping();
      mapping.setUrlMap(handlerMap);
      mapping.setOrder(1);
      return mapping;
   }
}

WebSocket 处理程序的代码


@Component
@Slf4j
public class OneSecPollingWebSocketHandler implements WebSocketHandler
{
   private ObjectMapper objectMapper;
   private OneSecPollingKafkaConsumerService oneSecPollingKafkaConsumerService;
   private Map<String, WebSocketSession> wsSessionsByUserSessionId = new HashMap<>();

   @Autowired
   public OneSecPollingWebSocketHandler(ObjectMapper objectMapper, OneSecPollingKafkaConsumerService oneSecPollingKafkaConsumerService)
   {
      this.objectMapper = objectMapper;
      this.oneSecPollingKafkaConsumerService = oneSecPollingKafkaConsumerService;
   }

   @Override
   public Mono<Void> handle(WebSocketSession webSocketSession)
   {
      Many<String> sink = Sinks.many().multicast().onBackpressureBuffer(Queues.SMALL_BUFFER_SIZE, false);
      wsSessionsByUserSessionId.put(getUserPollingSessionId(webSocketSession), webSocketSession);
      sinkSubscription(webSocketSession, sink);
      Mono<Void> output = webSocketSession.send(sink.asFlux().map(webSocketSession::textMessage)).doOnSubscribe(subscription ->
      {
      });
      return Mono.zip(webSocketSession.receive().then(), output).then();
   }

   public void sinkSubscription(WebSocketSession webSocketSession, Many<String> sink)
   {
      log.info("number of sessions; {}", wsSessionsByUserSessionId.size());
      oneSecPollingKafkaConsumerService.getTestTopicFlux().doOnNext(record ->
      {
         //log.info("record: {}", record);
         sink.tryEmitNext(record.value());
         record.receiverOffset().acknowledge();
      }).subscribe();
   }

   public String getOneSecPollingTopicRecord(ReceiverRecord<Integer, String> record, WebSocketSession webSocketSession)
   {
      String lastRecord = record.value();
      log.info("record to send: {} : webSocketSession: {}", record.value(), webSocketSession.getId());
      record.receiverOffset().acknowledge();
      return lastRecord;     
   }

   public String getUserPollingSessionId(WebSocketSession webSocketSession)
   {
      UriTemplate template = new UriTemplate(WEB_SOCKET_ENDPOINT);
      URI uri = webSocketSession.getHandshakeInfo().getUri();
      Map<String, String> parameters = template.match(uri.getPath());
      String userPollingSessionId = parameters.get("userPollingSessionId");
      return userPollingSessionId;
   }
}

卡夫卡接收器

@Service
@Slf4j
public class OneSecPollingKafkaConsumerService
{
   private String bootStrapServers;

   @Autowired
   public OneSecPollingKafkaConsumerService(@Value("${bootstrap.servers}") String bootStrapServers)
   {
      this.bootStrapServers = bootStrapServers;
   }

   private ReceiverOptions<Integer, String> getRecceiverOPtions()
   {
      Map<String, Object> consumerProps = new HashMap<>();
      consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootStrapServers);
      //consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, "reactive-consumer");
      consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "onesecpolling-group");
      consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
      consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
      //consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
      //consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

      ReceiverOptions<Integer, String> receiverOptions = ReceiverOptions
         .<Integer, String> create(consumerProps)
         .subscription(Collections.singleton("HighFrequencyPollingKPIsComputedValues"));

      return receiverOptions;
   }

   public Flux<ReceiverRecord<Integer, String>> getTestTopicFlux()
   {
      return createTopicCache();
   }

   private Flux<ReceiverRecord<Integer, String>> createTopicCache()
   {
      Flux<ReceiverRecord<Integer, String>> oneSecPollingMessagesFlux = KafkaReceiver.create(getRecceiverOPtions())
         .receive()
         .delayElements(Duration.ofMillis(500));
      return oneSecPollingMessagesFlux;
   }
}

POM 依赖项

<dependencies>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-devtools</artifactId>
    </dependency>
    <!-- 
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-security</artifactId>
    </dependency> 
    -->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-webflux</artifactId>
    </dependency>
    <dependency>
      <groupId>io.projectreactor.kafka</groupId>
      <artifactId>reactor-kafka</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-stream</artifactId>
    </dependency>
    <dependency>
      <groupId>org.projectlombok</groupId>
      <artifactId>lombok</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-actuator</artifactId>
    </dependency>
    <!-- This is breaking WebFlux 
      <dependency>
        <groupId>org.springdoc</groupId>
        <artifactId>springdoc-openapi-ui</artifactId>
        <version>${springdoc.version}</version>
      </dependency>
      -->
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-aop</artifactId>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-tomcat</artifactId>
      <scope>provided</scope>
    </dependency>
    <dependency>
      <groupId>org.springframework.boot</groupId>
      <artifactId>spring-boot-starter-test</artifactId>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>io.projectreactor</groupId>
      <artifactId>reactor-test</artifactId>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.springframework.cloud</groupId>
      <artifactId>spring-cloud-stream</artifactId>
      <classifier>test-binder</classifier>
      <type>test-jar</type>
      <scope>test</scope>
    </dependency>
    <!-- <dependency>
      <groupId>org.springframework.security</groupId>
      <artifactId>spring-security-test</artifactId>
      <scope>test</scope>
    </dependency> -->
  </dependencies>

我还尝试将 WebSocketHanlder 中的 handle(...) 方法定义更改为以下,但仍然来自 kafka 的数据仅推送到一个 websocket 会话:

@Override
   public Mono<Void> handle(WebSocketSession webSocketSession)
   {
      Mono<Void> input = webSocketSession.receive().then();
      Mono<Void> output = webSocketSession.send(oneSecPollingKafkaConsumerService.getTestTopicFlux().map(ReceiverRecord::value).map(webSocketSession::textMessage));
      return Mono.zip(input, output).then();
   }

另外,我尝试了以下操作:

public Mono<Void> handle(WebSocketSession webSocketSession)
   {      
      Mono<Void> input = webSocketSession.receive()
         .doOnSubscribe(subscribe -> log.info("sesseion created sessionId:{}:userId:{};sessionhash:{}",
            webSocketSession.getId(),
            getUserPollingSessionId(webSocketSession),
            webSocketSession.hashCode()))
         .then();
      Flux<String> source = oneSecPollingKafkaConsumerService.getTestTopicFlux().map(record -> getOneSecPollingTopicRecord(record, webSocketSession)).log();
      Mono<Void> output = webSocketSession.send(source.map(webSocketSession::textMessage)).log();
      return Mono.zip(input, output).then().log();
   }

我启用了 log() 并得到以下输出:

20:09:22.652 [http-nio-8080-exec-9] INFO  c.m.e.w.p.i.w.v.OneSecPollingWebSocketHandler - sesseion created sessionId:a:userId:124;sessionhash:1974799413
20:09:22.652 [http-nio-8080-exec-9] INFO  reactor.Flux.RefCount.41 - | onSubscribe([Fuseable] FluxRefCount.RefCountInner)
20:09:22.652 [http-nio-8080-exec-9] INFO  reactor.Flux.Map.42 - onSubscribe(FluxMap.MapSubscriber)
20:09:22.652 [http-nio-8080-exec-9] INFO  reactor.Flux.Map.42 - request(1)
20:09:22.652 [http-nio-8080-exec-9] INFO  reactor.Flux.RefCount.41 - | request(32)
20:09:22.659 [http-nio-8080-exec-9] INFO  reactor.Mono.FromPublisher.43 - onSubscribe(MonoNext.NextSubscriber)
20:09:22.659 [http-nio-8080-exec-9] INFO  reactor.Mono.FromPublisher.43 - request(unbounded)
20:09:25.942 [http-nio-8080-exec-10] INFO  reactor.Mono.IgnorePublisher.48 - onSubscribe(MonoIgnoreElements.IgnoreElementsSubscriber)
20:09:25.942 [http-nio-8080-exec-10] INFO  reactor.Mono.IgnorePublisher.48 - request(unbounded)
20:09:25.942 [http-nio-8080-exec-10] INFO  c.m.e.w.p.i.w.v.OneSecPollingWebSocketHandler - sesseion created sessionId:b:userId:123;sessionhash:1582184236
20:09:25.942 [http-nio-8080-exec-10] INFO  reactor.Flux.RefCount.45 - | onSubscribe([Fuseable] FluxRefCount.RefCountInner)
20:09:25.942 [http-nio-8080-exec-10] INFO  reactor.Flux.Map.46 - onSubscribe(FluxMap.MapSubscriber)
20:09:25.942 [http-nio-8080-exec-10] INFO  reactor.Flux.Map.46 - request(1)
20:09:25.942 [http-nio-8080-exec-10] INFO  reactor.Flux.RefCount.45 - | request(32)
20:09:25.947 [http-nio-8080-exec-10] INFO  reactor.Mono.FromPublisher.47 - onSubscribe(MonoNext.NextSubscriber)
20:09:25.949 [http-nio-8080-exec-10] INFO  reactor.Mono.FromPublisher.47 - request(unbounded)
20:10:00.880 [reactive-kafka-onesecpolling-group-11] INFO  reactor.Flux.RefCount.41 - | onNext(ConsumerRecord(topic = HighFrequencyPollingKPIsComputedValues, partition = 0, leaderEpoch = null, offset = 474, CreateTime = 1644071999871, serialized key size = -1, serialized value size = 43, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = {"greeting" : "Hello", "name" : "Prashant"}))
20:10:01.387 [parallel-5] INFO  reactor.Flux.Map.42 - onNext({"greeting" : "Hello", "name" : "Prashant"})
20:10:01.389 [parallel-5] INFO  reactor.Flux.Map.42 - request(1)

在这里,我们可以看到我们有 2 个 reactor-kafka Flux 订阅者:

  1. reactor.Flux.Map.42 - onSubscribe(FluxMap.MapSubscriber
  2. reactor.Flux.Map.46 - onSubscribe(FluxMap.MapSubscriber)

但是当从 kafka 主题读取数据时,它只被一个订阅者接收:

  • reactor.Flux.Map.42 - onNext({"greeting" : "Hello", "name" : "Prashant"})

它是 Webflux API 本身的错误吗?

4

1 回答 1

1

我找到了问题和解决方案。

问题 我在 WebSocketHandler handle() 方法中使用 Flux(从 KafkaReceiver 获得)的方式不正确。对于从多个客户端请求创建的每个 websocket 会话,都会调用 handle 方法。因此,为 KafkaReceiver.create().receive() 创建了多个 Flux 对象。其中一个通量从 KafkaReceiver 读取数据,但其他通量对象未能这样做。

public Mono<Void> handle(WebSocketSession webSocketSession)
   {      
      Mono<Void> input = webSocketSession.receive()
         .doOnSubscribe(subscribe -> log.info("sesseion created sessionId:{}:userId:{};sessionhash:{}",
            webSocketSession.getId(),
            getUserPollingSessionId(webSocketSession),
            webSocketSession.hashCode()))
         .then();
      **Flux<String> source = oneSecPollingKafkaConsumerService.getTestTopicFlux()**.map(record -> getOneSecPollingTopicRecord(record, webSocketSession)).log();
      Mono<Void> output = webSocketSession.send(source.map(webSocketSession::textMessage)).log();
      return Mono.zip(input, output).then().log();
   }

解决方案 确保为 KafkaReceiver.create().receive() 只创建了一个 Flux。一种方法是在 WebSocketHandler(或 KAfkaCONsumer 类)的构造函数中创建 Flux

private final Flux<String> source;

   @Autowired
   public OneSecPollingWebSocketHandler(OneSecPollingKafkaConsumerService oneSecPollingKafkaConsumerService)
   {
      source = oneSecPollingKafkaConsumerService.getOneSecPollingTopicFlux().map(r -> getOneSecPollingTopicRecord(r));
   }

   @Override
   public Mono<Void> handle(WebSocketSession webSocketSession)
   {
      // add usersession id as session attribute
      Mono<Void> input = getInputMessageMono(webSocketSession);
      Mono<Void> output = getOutputMessageMono(webSocketSession);
      return Mono.zip(input, output).then().log();
   }

   private Mono<Void> getOutputMessageMono(WebSocketSession webSocketSession)
   {
      Mono<Void> output = webSocketSession.send(source.map(webSocketSession::textMessage)).doOnError(err -> log.error(err.getMessage())).doOnTerminate(() ->
      {
         log.info("onesecpolling session terminated;{}", webSocketSession.getId());
      }).log();
      return output;
   }

   private Mono<Void> getInputMessageMono(WebSocketSession webSocketSession)
   {
      Mono<Void> input = webSocketSession.receive().doOnSubscribe(subscribe ->
      {
         log.info("onesecpolling session created sessionId:{}:userId:{}", webSocketSession.getId(), getUserPollingSessionId(webSocketSession));
      }).then();
      return input;
   }

   private String getOneSecPollingTopicRecord(ReceiverRecord<Integer, String> record)
   {
      String lastRecord = record.value();
      record.receiverOffset().acknowledge();
      return lastRecord;
   }

   private String getUserPollingSessionId(WebSocketSession webSocketSession)
   {
      UriTemplate template = new UriTemplate(WEB_SOCKET_ENDPOINT);
      URI uri = webSocketSession.getHandshakeInfo().getUri();
      Map<String, String> parameters = template.match(uri.getPath());
      String userPollingSessionId = parameters.get(WEB_SOCKET_ENDPOINT_USERID_SUBPATH);
      return userPollingSessionId;
   }
于 2022-02-08T08:46:29.557 回答