0

我们发现 Kafka + Storm + Trident + OpaqueTridentKafkaSpout 存在一些性能问题

下面提到的是我们的设置细节:

风暴拓扑:

Broker broker = Broker.fromString("localhost:9092")
    GlobalPartitionInformation info = new GlobalPartitionInformation()
    if(args[4]){
        int partitionCount = args[4].toInteger()
        for(int i =0;i<partitionCount;i++){
            info.addPartition(i, broker)
        }
    }
    StaticHosts hosts = new StaticHosts(info)
    TridentKafkaConfig tridentKafkaConfig = new TridentKafkaConfig(hosts,"test")
    tridentKafkaConfig.scheme = new SchemeAsMultiScheme(new StringScheme())


    OpaqueTridentKafkaSpout kafkaSpout = new OpaqueTridentKafkaSpout(tridentKafkaConfig)
    TridentTopology topology = new TridentTopology()
    Stream st  = topology.newStream("spout1", kafkaSpout).parallelismHint(args[2].toInteger())
            .each(kafkaSpout.getOutputFields(), new NEO4JTridentFunction(), new Fields("status"))
            .parallelismHint(args[1].toInteger())
    Map conf = new HashMap()
    conf.put(Config.TOPOLOGY_WORKERS, args[3].toInteger())
    conf.put(Config.TOPOLOGY_DEBUG, false)

    if (args[0] == "local") {
        LocalCluster cluster = new LocalCluster()
        cluster.submitTopology("mytopology", conf, topology.build())
    } else {
        StormSubmitter.submitTopology("mytopology", conf, topology.build())
        NEO4JTridentFunction.getGraphDatabaseService().shutdown()
    }

我们用于 Storm 的 Storm.yaml 如下:

########### These MUST be filled in for a storm configuration
storm.zookeeper.servers:
     - "localhost"
#     - "server2"
# 
storm.zookeeper.port : 2999


storm.local.dir: "/opt/mphrx/neo4j/stormdatadir"

nimbus.childopts: "-Xms2048m"
ui.childopts: "-Xms1024m"
logviewer.childopts: "-Xmx512m"
supervisor.childopts: "-Xms1024m"
worker.childopts: "-Xms2600m -Xss256k -XX:MaxPermSize=128m -XX:PermSize=96m
    -XX:NewSize=1000m -XX:MaxNewSize=1000m -XX:MaxTenuringThreshold=1 -XX:SurvivorRatio=6
    -XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled
    -XX:CMSInitiatingOccupancyFraction=75 -XX:+UseCMSInitiatingOccupancyOnly
    -server -XX:+AggressiveOpts -XX:+UseCompressedOops -Djava.awt.headless=true -Djava.net.preferIPv4Stack=true
    -Xloggc:logs/gc-worker-%ID%.log -verbose:gc
    -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=1m
    -XX:+PrintGCDetails -XX:+PrintHeapAtGC -XX:+PrintGCTimeStamps -XX:+PrintClassHistogram
    -XX:+PrintTenuringDistribution -XX:-PrintGCApplicationStoppedTime -XX:-PrintGCApplicationConcurrentTime
    -XX:+PrintCommandLineFlags -XX:+PrintFlagsFinal"

java.library.path: "/usr/lib/jvm/jdk1.7.0_25"

supervisor.slots.ports:
    - 6700
    - 6701
    - 6702
    - 6703

topology.trident.batch.emit.interval.millis: 100
topology.message.timeout.secs: 300
#topology.max.spout.pending: 10000
  • Kafka 生成的每条消息的大小:11 KB
  • 每个bolt(NEO4JTridentFunction)处理数据的执行时间:500ms
  • 风暴工人人数:1
  • Spout(OpaqueTridentKafkaSpout) 的并行提示:1
  • Bolt/Function(NEO4JTridentFunction)的并行提示:50

  • 我们看到 Spout 的吞吐量约为 12msgs/秒。

  • Kafka 产生的消息速率:150msgs/sec

Storm 和 Kafka 都是单节点部署。我们从 Storm 中了解到更高的吞吐量,但无法产生相同的结果。请建议如何调整 Storm+ Kafka + OpaqueTridentKafkaSpout 配置以实现更高的吞吐量。在这方面的任何帮助都会极大地帮助我们。

谢谢,

4

3 回答 3

2

您应该将 spout 并行度设置为与提到的主题的分区计数相同。默认情况下,trident 每次执行都接受一个批次,您应该通过更改topology.max.spout.pending属性来增加此计数。由于 Trident 强制执行有序事务管理,因此您的执行方法(NEO4JTridentFunction)必须快速达到所需的解决方案。

此外,您可以使用"tridentConfig.fetchSizeBytes",通过更改它,您可以为您的 spout 中的每个新发出调用摄取更多数据。

另外你必须检查你的垃圾收集日志,它会给你关于真实点的线索。

"-XX:+PrintGCDetails -XX:+PrintGCTimeStamps -verbose:gc -Xloggc:{path}/gc-storm-worker-%ID%.log"您可以通过在 worker 配置的 worker.childopts 设置中添加 , 来启用垃圾收集日志。

最后但同样重要的是,如果你的年轻代比例高于正常情况,你可以使用 G1GC。

于 2016-03-17T14:55:00.413 回答
0

我的计算:如果每个螺栓有 8 个核心和 500MS -> ~16 消息/秒。 如果您优化螺栓,那么您将看到改进。

此外,对于 CPU 绑定螺栓,尝试并行提示 = 'amount of total cores' 并将topology.trident.batch.emit.interval.millis增加到处理整个批次所需的时间除以 2。设置 topology.max。 spout.pending 为 1。

于 2016-03-19T18:25:31.080 回答
0

请根据您的系统配置设置您的 worker.childopts。使用 SpoutConfig.fetchSizeBytes 增加被拉入拓扑的字节数。增加您的并行提示。

于 2016-01-08T15:23:01.207 回答