1

我看到并使用默认分区器类实现了方法KafkaTemplate.send(TOPIC,message) 。

但是在这里,我不是在传递密钥。我有一个简单的自定义分区器类,我还想发送到像KafkaTemplate(TOPIC,key,message)这样的 kafka 服务器,在 producerConfig 中我将我的 customPartitioner 类设置为分区。

我看到如果我提供自定义 Partitioner,KafkaTemplate 的这个 Will send(Topic, Key, Message) 方法会调用 Partition 方法吗?但我没有完全明白。

  1. 我简单的 customPartitioner 类:
public class CustomPartitionar implements Partitioner {
   private PartitionMapper newMapper;
   public CustomPartitionar(){
       newMapper = new PartitionMapper();
   }
   @Override
   public void configure(Map<String, ?> configs) {

   }
   @Override
   public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes,Cluster cluster) {
       int partition = 0;
       String userName = (String) key;
       // Find the id of current user based on the username from another mapper class
       Integer userId = newMapper.findUserId(userName);
       // If the userId not found, default partition is 0
       if (userId != null) {
           partition = userId;
       }
       return partition;
   }
   @Override
   public void close() {
   }
}
  1. 将此类添加到 producerFactory:
config.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, CustomPartitionar.class);
  1. 实际上,我的密钥将从“message.getReceiver()”获取,主题将从“message.getTopic()”获取,因此我的消息将转到所需的主题,分区属于该用户/组..所以我只想发送像:
KafkaTemplate.send(message.getTopic(),message.getReceiver(),message)

这可以通过简单的方式实现还是我错过了什么?

4

1 回答 1

1

KafkaTemplate几种send方法:

/**
 * Send the data to the default topic with no key or partition.
 * @param data The data.
 * @return a Future for the {@link SendResult}.
 */
ListenableFuture<SendResult<K, V>> sendDefault(V data);

/**
 * Send the data to the default topic with the provided key and no partition.
 * @param key the key.
 * @param data The data.
 * @return a Future for the {@link SendResult}.
 */
ListenableFuture<SendResult<K, V>> sendDefault(K key, V data);

/**
 * Send the data to the default topic with the provided key and partition.
 * @param partition the partition.
 * @param key the key.
 * @param data the data.
 * @return a Future for the {@link SendResult}.
 */
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, K key, V data);

/**
 * Send the data to the default topic with the provided key and partition.
 * @param partition the partition.
 * @param timestamp the timestamp of the record.
 * @param key the key.
 * @param data the data.
 * @return a Future for the {@link SendResult}.
 * @since 1.3
 */
ListenableFuture<SendResult<K, V>> sendDefault(Integer partition, Long timestamp, K key, V data);

/**
 * Send the data to the provided topic with no key or partition.
 * @param topic the topic.
 * @param data The data.
 * @return a Future for the {@link SendResult}.
 */
ListenableFuture<SendResult<K, V>> send(String topic, V data);

/**
 * Send the data to the provided topic with the provided key and no partition.
 * @param topic the topic.
 * @param key the key.
 * @param data The data.
 * @return a Future for the {@link SendResult}.
 */
ListenableFuture<SendResult<K, V>> send(String topic, K key, V data);

/**
 * Send the data to the provided topic with the provided key and partition.
 * @param topic the topic.
 * @param partition the partition.
 * @param key the key.
 * @param data the data.
 * @return a Future for the {@link SendResult}.
 */
ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, K key, V data);

/**
 * Send the data to the provided topic with the provided key and partition.
 * @param topic the topic.
 * @param partition the partition.
 * @param timestamp the timestamp of the record.
 * @param key the key.
 * @param data the data.
 * @return a Future for the {@link SendResult}.
 * @since 1.3
 */
ListenableFuture<SendResult<K, V>> send(String topic, Integer partition, Long timestamp, K key, V data);

/**
 * Send the provided {@link ProducerRecord}.
 * @param record the record.
 * @return a Future for the {@link SendResult}.
 * @since 1.3
 */
ListenableFuture<SendResult<K, V>> send(ProducerRecord<K, V> record);

/**
 * Send a message with routing information in message headers. The message payload
 * may be converted before sending.
 * @param message the message to send.
 * @return a Future for the {@link SendResult}.
 * @see org.springframework.kafka.support.KafkaHeaders#TOPIC
 * @see org.springframework.kafka.support.KafkaHeaders#PARTITION_ID
 * @see org.springframework.kafka.support.KafkaHeaders#MESSAGE_KEY
 */
ListenableFuture<SendResult<K, V>> send(Message<?> message);
于 2020-05-19T19:12:06.260 回答