I have written an Akka application which takes input from Kafka and then processes the data with sharded actors and output to Kafka.
But in some occasions the sharded regions can't handle the load, and I get:
You should probably implement flow control to avoid flooding the remote connection.
How can I implement backpressure in this chain/flow?
Kafka Consumer -> Shared Actors -> Kafka Producer
Some fragments from the code:
ReactiveKafka kafka = new ReactiveKafka();
Subscriber subscriber = kafka.publish(pp, system);
ActorRef kafkaWriterActor = (ActorRef) Source.actorRef(10000, OverflowStrategy.dropHead())
.map(ix -> KeyValueProducerMessage.apply(Integer.toString(ix.hashCode()), ix))
.to(Sink.fromSubscriber(subscriber))
.run(materializer);
ConsumerProperties cp = new PropertiesBuilder.Consumer(brokerList, intopic, consumergroup, new ByteArrayDeserializer(), new NgMsgDecoder())
.build().consumerTimeoutMs(5000).commitInterval(Duration.create(60, TimeUnit.SECONDS)).readFromEndOfStream();
Publisher<ConsumerRecord<byte[], StreamEvent>> publisher = kafka.consume(cp,system);
ActorRef streamActor = ClusterSharding.get(system).start("StreamActor",
Props.create(StreamActor.class, synctime), ClusterShardingSettings.create(system), messageExtractor);
shardRegionTypenames.add("StreamActor");
Source.fromPublisher(publisher)
.runWith(Sink.foreach(msg -> {
streamActor.tell(msg.value(),ActorRef.noSender());
}), materializer);