1

我正在使用 spring cloud bus 将事件发布到 kafka,以便另一个实例可以监听相同的事件。事件被触发但未发布到 kafka 。我正在使用带有 spring cloud stream 的 spring cloud bus。

版本:Spring Boot:2.0,Spring Cloud Bus:2.0.0,Spring Cloud Stream:2.0.1

应用程序.yml:

server:
  port: 7711
spring:
  application:
    index: ${random.uuid}
  cloud:
    bus:
      enabled: true
    stream:
      kafka:
        binder:
          brokers: localhost:9092
      bindings:
        input:
          destination: EMPLOYEE-TOPIC-DEMO-R1-P1
          group: ali

pom.xml

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-bus-kafka</artifactId>
</dependency>

发布活动:

@Autowired
private ApplicationContext context;

@StreamListener(ConsumerStream.INPUT)
public void messageConsumer(@Payload String jsonValue, @Headers MessageHeaders header) {

    try {
        log.info("Enter in Consumer->messageConsumer()");
        final String myUniqueId = context.getId();
        context.publishEvent(new MessagingEventBus(this,myUniqueId,header));
    } catch (Exception e) {
        log.error("Exception caught while processing the request :", e);
    }
}

事件类别:

@Slf4j
public class MessagingEventBus extends RemoteApplicationEvent {


    private MessageHeaders header;

    // Must supply a default constructor and getters/setters for deserialization
    public MessagingEventBus() {
    }

    public MessagingEventBus(Object source, String originService, MessageHeaders header) {
        // source is the object that is publishing the event
        // originService is the unique context ID of the publisher
        super(source, originService);
        this.header = header;
    }


}

事件监听器:

@Component
@Slf4j
public class MessagingEventBusListener implements ApplicationListener<MessagingEventBus> {

    @Override
    public void onApplicationEvent(MessagingEventBus messagingEventBus) {
       log.info("Messaging Event Bus Listener called");
    }
}
4

1 回答 1

1

这就是总线发送事件的过程。

  1. new RemoteApplicationEvent() //创建事件
  2. applicationContext#publishEvent //发布本地事件
  3. BusAutoConfiguration#acceptLocal //总线接受2步发送的本地事件
  4. serviceMatcher#isFromSelf //bus会判断事件是self send吗?如果是本身,则发送到出站通道(5 步)
  5. cloudBusOutboundChannel#send

问题是 4 步判断失败,您可以将 BusProperties#getId 用于事件 originService,它的工作

@Autowired
BusProperties busProperties;

public void fire(){
    new RemoteApplicationEvent(this, busProperties.getId().......
}
于 2020-03-06T12:29:24.950 回答