0

I have written an Akka application which takes input from Kafka and then processes the data with sharded actors and output to Kafka.

But in some occasions the sharded regions can't handle the load, and I get:

You should probably implement flow control to avoid flooding the remote connection.

How can I implement backpressure in this chain/flow?

Kafka Consumer -> Shared Actors -> Kafka Producer

Some fragments from the code:

ReactiveKafka kafka = new ReactiveKafka();

Subscriber subscriber = kafka.publish(pp, system);

ActorRef kafkaWriterActor = (ActorRef) Source.actorRef(10000, OverflowStrategy.dropHead())
                .map(ix -> KeyValueProducerMessage.apply(Integer.toString(ix.hashCode()), ix))
                .to(Sink.fromSubscriber(subscriber))
                .run(materializer);

ConsumerProperties cp = new PropertiesBuilder.Consumer(brokerList, intopic, consumergroup, new ByteArrayDeserializer(), new NgMsgDecoder())
                        .build().consumerTimeoutMs(5000).commitInterval(Duration.create(60, TimeUnit.SECONDS)).readFromEndOfStream();

Publisher<ConsumerRecord<byte[], StreamEvent>> publisher = kafka.consume(cp,system);

ActorRef streamActor = ClusterSharding.get(system).start("StreamActor",
                Props.create(StreamActor.class, synctime), ClusterShardingSettings.create(system), messageExtractor);

shardRegionTypenames.add("StreamActor");


Source.fromPublisher(publisher)                
                .runWith(Sink.foreach(msg -> {                    
                    streamActor.tell(msg.value(),ActorRef.noSender());
                }), materializer);
4

1 回答 1

1

ConsumerWithPerPartitionBackpressure也许您可以考虑将您的主题并行化到分区中(如果适用),并通过在此示例中适应使用mapAsync 和 ask与您的参与者集成来创建具有每个分区背压的消费者。

于 2017-04-11T21:58:09.473 回答