0

使用以下配置,我可以将 samza 连接到 kafka-broker

systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory
systems.kafka.samza.msg.serde=json
systems.kafka.consumer.zookeeper.connect=localhost:2181/
systems.kafka.producer.bootstrap.servers=localhost:9092

但是我对 SystemFactory 类有一些疑问。如何编写我们自己的systemfactory类?SystemFactoryClass 的目的是什么?请给我一些想法

4

2 回答 2

3

SystemFactory您可以通过扩展接口并实现其三个抽象函数 、getConsumergetProducer来编写自己的系统工厂类getAdmin。在每个功能中,getConsumer例如,您要创建一个系统客户,另一个自定义类的实例扩展SystemConsumer和定义系统应该如何使用。通过这样做,您的 Samza 工作将知道如何admin/consumer/producer在需要时获取系统的权限。

示例(在 Scala 中):

class YourSystemFactory extends SystemFactory {
  override def getConsumer(systemName: String, config: Config, registry: MetricsRegistry): SystemConsumer = {
    new YourSystemConsumer(
      getAdmin(systemName, config).asInstanceOf[YourSystemAdmin],
      config.get("someParam"))
  }

  override def getAdmin(systemName: String, config: Config): SystemAdmin = {
    new YourSystemAdmin(
      config.get("someParam"),
      config.get("someOtherParam"))
    )
  }

  override def getProducer(systemName: String, config: Config, registry: MetricsRegistry): SystemProducer = {
    new YourSystemProducer(
      getAdmin(systemName, config).asInstanceOf[YourSystemAdmin],
      config.get("someParam"))
  }
}

在您的配置中:

# Your system params
systems.your.samza.factory=your.package.YourSystemFactory
systems.your.consumer.param=value
systems.your.producer.param=value
于 2017-03-18T15:31:54.390 回答
0

你不需要实现你的 KafkaSystemFactory。你刚刚实现了 StreamTask

例子 :

public class MyTaskClass implements StreamTask {

  public void process(IncomingMessageEnvelope envelope, MessageCollector collector, TaskCoordinator coordinator) {
  // process message
  }
}

配置:

# This is the class above, which Samza will instantiate when the job is run
task.class=com.example.samza.MyTaskClass

# Define a system called "kafka" (you can give it any name, and you can define
# multiple systems if you want to process messages from different sources)
systems.kafka.samza.factory=org.apache.samza.system.kafka.KafkaSystemFactory

# The job consumes a topic called "PageViewEvent" from the "kafka" system
task.inputs=kafka.PageViewEvent

# Define a serializer/deserializer called "json" which parses JSON messages
serializers.registry.json.class=org.apache.samza.serializers.JsonSerdeFactory

# Use the "json" serializer for messages in the "PageViewEvent" topic
systems.kafka.streams.PageViewEvent.samza.msg.serde=json

欲了解更多信息:文档

于 2016-03-14T00:18:03.220 回答