0

我正在尝试向 kafka 发送 2 个自定义事件,想法是将每个事件发送到其主题,但目前我在两个主题中都收到了两个事件。

我已经配置了 2 个具有不同模式和 2 个接收器的映射(每个映射一个)。

这是 divolte-collector.conf:

divolte {
  global {
    kafka {
      enabled = true
      threads = 2
      buffer_size = 1048576

      producer = {
        bootstrap.servers = "localhost:9092"
        acks = 1
        retries = 0
        compression.type = lz4
        max.in.flight.requests.per.connection = 1
      }
    }
  }

  mappings {
    a_mapping = {
        schema_file = "path-to/conf/MyEventRecord.avsc"
        mapping_script_file = "path-to/conf/mapping.groovy"
        sources = [browser]
        sinks = [kafka1]
    }
    b_mapping = {
        schema_file = "path-to/conf/MyEventRecord2.avsc"
        mapping_script_file = "path-to/conf/mapping2.groovy"
        sources = [browser]
        sinks = [kafka2]
    }
  }

  sinks {
    kafka1 {
      type = kafka

      topic = topic-1
    }
    kafka2 {
      type = kafka

      topic = topic-2
    }
  }
}

我的常规文件:

mapping {
  map eventParameter('js_error_msg') onto 'js_error_msg'
  map eventParameter('js_error_level') onto 'js_error_level'
  map eventParameter('js_url') onto 'js_url'
  map eventParameter('js_line') onto 'js_line'
  map eventParameter('js_column') onto 'js_column'
  map eventParameter('js_is_error_caught') onto 'js_is_error_caught'
}

我的 avro 文件:

{
  "name": "tracking",
  "type": "record",
  "fields": [
    { "name": "js_error_msg", "type": "string" },
    { "name": "js_error_level", "type": "string" },
    { "name": "js_url", "type": "string" },
    { "name": "js_line", "type": "string" },
    { "name": "js_column", "type": "string" },
    { "name": "js_is_error_caught", "type": "string" }
  ]
}
4

0 回答 0