0

I have to integrate Kafka to Cuba and I thought it would as easy as adding the spring kafka dependency and creating a Configuration annotated class to initialize the Kafka Consumer since Cuba is based on Spring.

When I added a Configuration, I found out that it is not scanned when Cuba is started. When I switch to CUBA view, I noticed that only those classes annotated as Service or Component will be read. However, even If I add a Component class, it is still not scanned properly (I added a field annotated with @Value that looks for a non-existing property but Cuba did not throw any error when I start it)

4

1 回答 1

1

There is a simple example on CUBA+Kafka integration, you can find it here: https://github.com/cuba-labs/kafka-sample

The configuration process is taken from the official Spring documentation.

  1. The key configuration class is com.company.kafkasample.config.KafkaConfig. It contains a number of beans that will help you to configure Kafka facilities. In this particular example, there are both producer and consumer configured. Please note that configuration parameters are hardcoded, but it is just an example.
    @Bean
    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "sample-kafka");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }
  1. After that you should be able to create a service that can send messages to Kafka queue by injecting KafkaTemplate bean.
    @Inject
    private KafkaTemplate<Integer, String> template;


    @Override
    public void sendMessage(String message) {
        log.info("Sending {} using Kafka", message);
        long id = uniqueNumbersService.getNextNumber("users");
        ListenableFuture<SendResult<Integer, String>> send = template.send("users", (int) id, message);
        send.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {
            @Override
            public void onFailure(Throwable ex) {
               log.info("Failed to send message {}, error {}", message, ex.getMessage());
            }

            @Override
            public void onSuccess(SendResult<Integer, String> result) {
                log.info("Message {} sent", message);
            }
        });
    }
  1. Then you can inject this service into screens and use it there.
  2. As for the receiver, you can CUBA component and annotate its method with @KafkaListener annotation. For example, the example below saves kafka messages to the database.
@Component
@DependsOn("consumerFactory")
public class MessageListener {

    @Inject
    private DataManager dataManager;

    @KafkaListener(id = "sample-kafka", topics = "users")
    public void listen1(String foo, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) int id) {
        KafkaMessage kafkaMessage = dataManager.create(KafkaMessage.class);
        kafkaMessage.setKafkaID(id);
        kafkaMessage.setContent(foo);
        dataManager.commit(kafkaMessage);
    }
于 2020-09-02T12:32:46.743 回答