There is a simple example on CUBA+Kafka integration, you can find it here: https://github.com/cuba-labs/kafka-sample
The configuration process is taken from the official Spring documentation.
- The key configuration class is com.company.kafkasample.config.KafkaConfig. It contains a number of beans that will help you to configure Kafka facilities. In this particular example, there are both producer and consumer configured. Please note that configuration parameters are hardcoded, but it is just an example.
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ConsumerConfig.GROUP_ID_CONFIG, "sample-kafka");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return props;
}
- After that you should be able to create a service that can send messages to Kafka queue by injecting KafkaTemplate bean.
@Inject
private KafkaTemplate<Integer, String> template;
@Override
public void sendMessage(String message) {
log.info("Sending {} using Kafka", message);
long id = uniqueNumbersService.getNextNumber("users");
ListenableFuture<SendResult<Integer, String>> send = template.send("users", (int) id, message);
send.addCallback(new ListenableFutureCallback<SendResult<Integer, String>>() {
@Override
public void onFailure(Throwable ex) {
log.info("Failed to send message {}, error {}", message, ex.getMessage());
}
@Override
public void onSuccess(SendResult<Integer, String> result) {
log.info("Message {} sent", message);
}
});
}
- Then you can inject this service into screens and use it there.
- As for the receiver, you can CUBA component and annotate its method with
@KafkaListener
annotation. For example, the example below saves kafka messages to the database.
@Component
@DependsOn("consumerFactory")
public class MessageListener {
@Inject
private DataManager dataManager;
@KafkaListener(id = "sample-kafka", topics = "users")
public void listen1(String foo, @Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) int id) {
KafkaMessage kafkaMessage = dataManager.create(KafkaMessage.class);
kafkaMessage.setKafkaID(id);
kafkaMessage.setContent(foo);
dataManager.commit(kafkaMessage);
}