0

I'm trying to create a simple producer which create a topic with some partitions provided by configuration.

According to Alpakka Producer Setting Doc any property from org.apache.kafka.clients.producer.ProducerConfig can be set in kafka-clients section. And, there is a num.partitions property as commented in Producer API Doc .

Thus, I added that property to my application.conf file as given below:

topic = "topic"
topic = ${?TOPIC}

# Properties for akka.kafka.ProducerSettings can be
# defined in this section or a configuration section with
# the same layout.
akka.kafka.producer {
  # Tuning parameter of how many sends that can run in parallel.
  parallelism = 100
  parallelism = ${?PARALLELISM}

  # Duration to wait for `KafkaConsumer.close` to finish.
  close-timeout = 20s

  # Fully qualified config path which holds the dispatcher configuration
  # to be used by the producer stages. Some blocking may occur.
  # When this value is empty, the dispatcher configured for the stream
  # will be used.
  use-dispatcher = "akka.kafka.default-dispatcher"

  # The time interval to commit a transaction when using the `Transactional.sink` or `Transactional.flow`
  eos-commit-interval = 100ms

  # Properties defined by org.apache.kafka.clients.producer.ProducerConfig
  # can be defined in this configuration section.
  kafka-clients {
    bootstrap.servers = "my-kafka:9092"
    bootstrap.servers = ${?BOOTSTRAPSERVERS}
    num.partitions = "3"
    num.partitions = ${?NUM_PARTITIONS}
  }
}

The producer application code is also given below:

object Main extends App {

  val config = ConfigFactory.load()

  implicit val system: ActorSystem = ActorSystem("producer")
  implicit val materializer: Materializer = ActorMaterializer()

  val producerConfigs = config.getConfig("akka.kafka.producer")
  val producerSettings = ProducerSettings(producerConfigs, new StringSerializer, new StringSerializer)

  val topic = config.getString("topic")

  val done: Future[Done] =
    Source(1 to 100000)
      .map(_.toString)
      .map(value => new ProducerRecord[String, String](topic, value))
      .runWith(Producer.plainSink(producerSettings))

  implicit val ec: ExecutionContextExecutor = system.dispatcher
  done onComplete  {
    case Success(_) => println("Done"); system.terminate()
    case Failure(err) => println(err.toString); system.terminate()
  }

}

But, this doesn't work. Producer creates a topic with a single partition instead of 3 partitions as I've set by configuration:

num.partitions = "3"

Finally, Kafkacat output is given below:

~$ kafkacat -b my-kafka:9092 -L
Metadata for all topics (from broker -1: my-kafka:9092/bootstrap):
 3 brokers:
  broker 2 at my-kafka-2.my-kafka-headless.default:9092
  broker 1 at my-kafka-1.my-kafka-headless.default:9092
  broker 0 at my-kafka-0.my-kafka-headless.default:9092
 1 topics:
  topic "topic" with 1 partitions:
    partition 0, leader 2, replicas: 2, isrs: 2

What is wrong? Is it possible to set properties from Kafka Producer API in kafka-clients section using Alpakka?

4

2 回答 2

2

# Properties defined by org.apache.kafka.clients.producer.ProducerConfig

# can be defined in this configuration section.

As this says, ProducerConfig is for producer settings, not broker settings, which is what num.partitions is (I think you got lost in which table the property was shown on the Apache Kafka docs... scroll to the top of it to see the proper header).

There is no way to set the partitions of a topic from the producer... You would need to use AdminClient class to create a topic, and the number of partitions is a parameter there, not a configuation property.

Sample code

val props = new Properties()
props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")

val adminClient = AdminClient.create(props)

val numPartitions = 3
val replicationFactor = 3.toShort
val newTopic = new NewTopic("new-topic-name", numPartitions, replicationFactor)
val configs = Map(TopicConfig.COMPRESSION_TYPE_CONFIG -> "gzip")
// settings some configs
newTopic.configs(configs.asJava)

adminClient.createTopics(List(newTopic).asJavaCollection)

And then you can start the producer

于 2019-04-04T16:48:33.270 回答
1

It appears that the topic is getting create by Default , which is the default behavior for Kafka. If that is the case you need to define the default number of partitions in the server.properties file for your broker.

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
# the brokers.
num.partitions=3
于 2019-04-04T11:42:07.437 回答