9

我正在使用 Kafka 构建数据管道。数据流程如下:在mongodb中捕获数据变化,并发送到elasticsearch。

在此处输入图像描述

MongoDB

  • 3.6版
  • 分片集群

卡夫卡

  • Confuent 平台 4.1.0
  • mongoDB 源连接器:debezium 0.7.5
  • elasticserach 水槽连接器

弹性搜索

  • 版本 6.1.0

由于我仍在测试,因此与 Kafka 相关的系统正在单个服务器上运行。

  • 启动 ZooKeeper

    $ bin/zookeeper-server-start etc/kafka/zookeeper.properties
    
  • 启动引导服务器

    $ bin/kafka-server-start etc/kafka/server.properties
    
  • 启动注册表模式

    $ bin/schema-registry-start etc/schema-registry/schema-registry.properties
    
  • 启动 mongodb 源连接器

    $ bin/connect-standalone \ 
      etc/schema-registry/connect-avro-standalone.properties \ 
      etc/kafka/connect-mongo-source.properties
    
    $ cat etc/kafka/connect-mongo-source.properties
    >>> 
    name=mongodb-source-connector
    connector.class=io.debezium.connector.mongodb.MongoDbConnector
    mongodb.hosts=''
    initial.sync.max.threads=1
    tasks.max=1
    mongodb.name=higee
    
    $ cat etc/schema-registry/connect-avro-standalone.properties
    >>>
    bootstrap.servers=localhost:9092
    key.converter=io.confluent.connect.avro.AvroConverter
    key.converter.schema.registry.url=http://localhost:8081
    value.converter=io.confluent.connect.avro.AvroConverter
    value.converter.schema.registry.url=http://localhost:8081
    internal.key.converter=org.apache.kafka.connect.json.JsonConverter
    internal.value.converter=org.apache.kafka.connect.json.JsonConverter
    internal.key.converter.schemas.enable=false
    internal.value.converter.schemas.enable=false
    rest.port=8083
    
  • 启动 elasticsearch sink 连接器

    $ bin/connect-standalone \ 
      etc/schema-registry/connect-avro-standalone2.properties  \ 
      etc/kafka-connect-elasticsearch/elasticsearch.properties
    
    $ cat etc/kafka-connect-elasticsearch/elasticsearch.properties
    >>>
    name=elasticsearch-sink
    connector.class=io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
    tasks.max=1
    topics=higee.higee.higee
    key.ignore=true
    connection.url=''
    type.name=kafka-connect
    
    $ cat etc/schema-registry/connect-avro-standalone2.properties
    >>>
    bootstrap.servers=localhost:9092
    key.converter=io.confluent.connect.avro.AvroConverter
    key.converter.schema.registry.url=http://localhost:8081
    value.converter=io.confluent.connect.avro.AvroConverter
    value.converter.schema.registry.url=http://localhost:8081
    internal.key.converter=org.apache.kafka.connect.json.JsonConverter
    internal.value.converter=org.apache.kafka.connect.json.\ 
                          JsonConverter
    internal.key.converter.schemas.enable=false
    internal.value.converter.schemas.enable=false
    rest.port=8084
    

上述系统一切正常。Kafka 连接器捕获数据更改 (CDC) 并通过接收器连接器成功将其发送到 elasticsearch。问题是我无法将字符串类型消息数据转换为结构化数据类型。例如,让我们在对 mongodb 进行一些更改后使用主题数据。

    $ bin/kafka-avro-console-consumer \
    --bootstrap-server localhost:9092 \
    --topic higee.higee.higee --from-beginning | jq

然后,我得到以下结果。

    "after": null,
      "patch": {
        "string": "{\"_id\" : {\"$oid\" : \"5ad97f982a0f383bb638ecac\"},\"name\" : \"higee\",\"salary\" : 100,\"origin\" : \"South Korea\"}"
      },
      "source": {
        "version": {
          "string": "0.7.5"
        },
        "name": "higee",
        "rs": "172.31.50.13",
        "ns": "higee",
        "sec": 1524214412,
        "ord": 1,
        "h": {
          "long": -2379508538412995600
        },
        "initsync": {
          "boolean": false
        }
      },
      "op": {
        "string": "u"
      },
      "ts_ms": {
        "long": 1524214412159
      }
    }

然后,如果我去弹性搜索,我会得到以下结果。

    {
        "_index": "higee.higee.higee",
        "_type": "kafka-connect",
        "_id": "higee.higee.higee+0+3",
        "_score": 1,
        "_source": {
          "after": null,
          "patch": """{"_id" : {"$oid" : "5ad97f982a0f383bb638ecac"}, 
                       "name" : "higee", 
                       "salary" : 100,
                       "origin" : "South Korea"}""",
          "source": {
            "version": "0.7.5",
            "name": "higee",
            "rs": "172.31.50.13",
            "ns": "higee",
            "sec": 1524214412,
            "ord": 1,
            "h": -2379508538412995600,
            "initsync": false
          },
          "op": "u",
          "ts_ms": 1524214412159
        }
      }

我想要实现的目标如下

    {
        "_index": "higee.higee.higee",
        "_type": "kafka-connect",
        "_id": "higee.higee.higee+0+3",
        "_score": 1,
        "_source": {
          "oid" : "5ad97f982a0f383bb638ecac",
          "name" : "higee", 
          "salary" : 100,
          "origin" : "South Korea"
         }"
     }

我一直在尝试并仍在考虑的一些选项如下。

  • 日志存储

    • 案例1:不知道如何解析这些字符(/u0002,/u0001)

      • logstash.conf

        input {
          kafka {
            bootstrap_servers => ["localhost:9092"]
            topics => ["higee.higee.higee"]
            auto_offset_reset => "earliest"
            codec => json {
              charset => "UTF-8"
            }
          }
        }
        
        filter {
          json {
            source => "message"
          }
         }
        
        output {
          stdout {
            codec => rubydebug
          }
        }
        
      • 结果

        {
        "message" => "H\u0002�\u0001{\"_id\" : \
            {\"$oid\" : \"5adafc0e2a0f383bb63910a6\"}, \
             \"name\" : \"higee\", \
             \"salary\" : 101, \
             \"origin\" : \"South Korea\"} \
             \u0002\n0.7.5\nhigee \ 
             \u0018172.31.50.13\u001Ahigee.higee2 \ 
             ��ح\v\u0002\u0002��̗���� \u0002\u0002u\u0002�����X",
        "tags" => [[0] "_jsonparsefailure"]
        }
        
    • 案例2

      • logstash.conf

        input {
          kafka {
            bootstrap_servers => ["localhost:9092"]
            topics => ["higee.higee.higee"]
            auto_offset_reset => "earliest"
            codec => avro {
              schema_uri => "./test.avsc"
            }
          }
        }
        
        filter {
          json {
            source => "message"
          }
        }
        
        output {
          stdout {
            codec => rubydebug
          }
        }
        
      • 测试.avsc

        {
            "namespace": "example",
            "type": "record",
            "name": "Higee",
            "fields": [
              {"name": "_id", "type": "string"},
              {"name": "name", "type": "string"},
              {"name": "salary",  "type": "int"},
              {"name": "origin", "type": "string"}
            ]
         }
        
      • 结果

        An unexpected error occurred! {:error=>#<NoMethodError: 
        undefined method `type_sym' for nil:NilClass>, :backtrace=> 
        ["/home/ec2-user/logstash- 
        6.1.0/vendor/bundle/jruby/2.3.0/gems/avro- 
        1.8.2/lib/avro/io.rb:224:in `match_schemas'", "/home/ec2- 
        user/logstash-6.1.0/vendor/bundle/jruby/2.3.0/gems/avro- 
        1.8.2/lib/avro/io.rb:280:in `read_data'", "/home/ec2- 
        user/logstash-6.1.0/vendor/bundle/jruby/2.3.0/gems/avro- 
        1.8.2/lib/avro/io.rb:376:in `read_union'", "/home/ec2- 
        user/logstash-6.1.0/vendor/bundle/jruby/2.3.0/gems/avro- 
        1.8.2/lib/avro/io.rb:309:in `read_data'", "/home/ec2- 
        user/logstash-6.1.0/vendor/bundle/jruby/2.3.0/gems/avro- 
        1.8.2/lib/avro/io.rb:384:in `block in read_record'", 
        "org/jruby/RubyArray.java:1734:in `each'", "/home/ec2- 
        user/logstash-6.1.0/vendor/bundle/jruby/2.3.0/gems/avro- 
        1.8.2/lib/avro/io.rb:382:in `read_record'", "/home/ec2- 
        user/logstash-6.1.0/vendor/bundle/jruby/2.3.0/gems/avro- 
        1.8.2/lib/avro/io.rb:310:in `read_data'", "/home/ec2- 
        user/logstash-6.1.0/vendor/bundle/jruby/2.3.0/gems/avro- 
        1.8.2/lib/avro/io.rb:275:in `read'", "/home/ec2- 
        user/logstash-6.1.0/vendor/bundle/jruby/2.3.0/gems/ 
        logstash-codec-avro-3.2.3-java/lib/logstash/codecs/ 
        avro.rb:77:in `decode'", "/home/ec2-user/logstash-6.1.0/ 
        vendor/bundle/jruby/2.3.0/gems/logstash-input-kafka- 
        8.0.2/lib/ logstash/inputs/kafka.rb:254:in `block in 
        thread_runner'", "/home/ec2-user/logstash- 
        6.1.0/vendor/bundle/jruby/2.3.0/gems/logstash-input-kafka- 
        8.0.2/lib/logstash/inputs/kafka.rb:253:in `block in 
        thread_runner'"]}
        
  • 蟒蛇客户端

    • 在一些数据操作之后使用主题并产生不同的主题名称,以便弹性搜索接收器连接器可以只使用来自 python 操作主题的格式良好的消息
    • kafka库:无法解码消息

      from kafka import KafkaConsumer
      
      consumer = KafkaConsumer(
                   topics='higee.higee.higee',
                   auto_offset_reset='earliest'
                 )
      
      for message in consumer:
          message.value.decode('utf-8')
      
      >>> 'utf-8' codec can't decode byte 0xe4 in position 6: 
          invalid continuation byte
      
    • confluent_kafka与 python 3 不兼容


知道如何在 elasticsearch 中对数据进行 jsonify 处理吗?以下是我搜索的来源。

提前致谢。


一些尝试

1) 我已按如下方式更改了我的 connect-mongo-source.properties 文件以测试转换。

    $ cat etc/kafka/connect-mongo-source.properties
    >>> 
    name=mongodb-source-connector
    connector.class=io.debezium.connector.mongodb.MongoDbConnector
    mongodb.hosts=''
    initial.sync.max.threads=1
    tasks.max=1
    mongodb.name=higee
    transforms=unwrap     
    transforms.unwrap.type = io.debezium.connector.mongodbtransforms.UnwrapFromMongoDbEnvelope

以下是我得到的错误日志。我还不习惯 Kafka,更重要的是 debezium 平台,我无法调试这个错误。

ERROR WorkerSourceTask{id=mongodb-source-connector-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:172)
org.bson.json.JsonParseException: JSON reader expected a string but found '0'.
    at org.bson.json.JsonReader.visitBinDataExtendedJson(JsonReader.java:904)
    at org.bson.json.JsonReader.visitExtendedJSON(JsonReader.java:570)
    at org.bson.json.JsonReader.readBsonType(JsonReader.java:145)
    at org.bson.codecs.BsonDocumentCodec.decode(BsonDocumentCodec.java:82)
    at org.bson.codecs.BsonDocumentCodec.decode(BsonDocumentCodec.java:41)
    at org.bson.codecs.BsonDocumentCodec.readValue(BsonDocumentCodec.java:101)
    at org.bson.codecs.BsonDocumentCodec.decode(BsonDocumentCodec.java:84)
    at org.bson.BsonDocument.parse(BsonDocument.java:62)
    at io.debezium.connector.mongodb.transforms.UnwrapFromMongoDbEnvelope.apply(UnwrapFromMongoDbEnvelope.java:45)
    at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:38)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:218)
    at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:194)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

2)这一次,我改变了elasticsearch.properties,并没有改变connect-mongo-source.properties。

$ cat connect-mongo-source.properties

    name=mongodb-source-connector
    connector.class=io.debezium.connector.mongodb.MongoDbConnector
    mongodb.hosts=''
    initial.sync.max.threads=1
    tasks.max=1
    mongodb.name=higee

$ cat elasticsearch.properties

    name=elasticsearch-sink
    connector.class = io.confluent.connect.elasticsearch.ElasticsearchSinkConnector
    tasks.max=1
    topics=higee.higee.higee
    key.ignore=true
    connection.url=''
    type.name=kafka-connect
    transforms=unwrap
    transforms.unwrap.type = io.debezium.connector.mongodb.transforms.UnwrapFromMongoDbEnvelope

我得到了以下错误。

ERROR WorkerSinkTask{id=elasticsearch-sink-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:172)
org.bson.BsonInvalidOperationException: Document does not contain key $set
    at org.bson.BsonDocument.throwIfKeyAbsent(BsonDocument.java:844)
    at org.bson.BsonDocument.getDocument(BsonDocument.java:135)
    at io.debezium.connector.mongodb.transforms.UnwrapFromMongoDbEnvelope.apply(UnwrapFromMongoDbEnvelope.java:53)
    at org.apache.kafka.connect.runtime.TransformationChain.apply(TransformationChain.java:38)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:480)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:301)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:205)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:173)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

3) 更改 test.avsc 并运行 logstash。我没有收到任何错误消息,但结果不是我所期望的originsalary,name字段都是空的,即使它们被赋予了非空值。我什至能够通过控制台消费者正确读取数据。

$ cat test.avsc
>>>
    {
      "type" : "record",
      "name" : "MongoEvent",
      "namespace" : "higee.higee",
      "fields" : [ {
        "name" : "_id",
        "type" : {
          "type" : "record",
          "name" : "HigeeEvent",
          "fields" : [ {
            "name" : "$oid",
            "type" : "string"
          }, {
            "name" : "salary",
            "type" : "long"
          }, {
            "name" : "origin",
            "type" : "string"
          }, {
            "name" : "name",
            "type" : "string"
          } ]
        }
      } ]
    }

$ cat logstash3.conf
>>>
    input {
      kafka {
        bootstrap_servers => ["localhost:9092"]
        topics => ["higee.higee.higee"]
        auto_offset_reset => "earliest"
        codec => avro {
          schema_uri => "./test.avsc"
        }
      }
    }

    output {
      stdout {
       codec => rubydebug
      }
    }

$ bin/logstash -f logstash3.conf
>>>
    {
    "@version" => "1",
    "_id" => {
      "salary" => 0,
      "origin" => "",
      "$oid" => "",
      "name" => ""
    },
    "@timestamp" => 2018-04-25T09:39:07.962Z
    }
4

3 回答 3

2

Python 客户端

必须使用 Avro Consumer,否则你会得到'utf-8' codec can't decode byte

即使这个示例也不起作用因为您仍然需要模式注册表来查找模式。

Confluent 的 Python 客户端的先决条件说它适用于 Python 3.x

没有什么能阻止你使用不同的客户端,所以不知道你为什么只尝试 Python 就离开它。

Logstash Avro 编解码器

  1. JSON 编解码器无法解码 Avro 数据。我认为 avro 输入编解码器之后的 json 过滤器也不起作用
  2. 您的 Avro 架构错误 - 您$oid缺少_id
  3. “原始 Avro”(包括消息本身中的模式)和 Confluent 的编码版本(仅包含注册表中的模式 ID)之间存在差异。意思是,Logstash 没有与 Schema Registry 集成……至少没有插件

你的 AVSC 实际上应该是这样的

{
  "type" : "record",
  "name" : "MongoEvent",
  "namespace" : "higee.higee",
  "fields" : [ {
    "name" : "_id",
    "type" : {
      "type" : "record",
      "name" : "HigeeEvent",
      "fields" : [ {
        "name" : "$oid",
        "type" : "string"
      }, {
        "name" : "salary",
        "type" : "long"
      }, {
        "name" : "origin",
        "type" : "string"
      }, {
        "name" : "name",
        "type" : "string"
      } ]
    }
  } ]
}

但是,Avro 不允许名称以正则表达式 of 以外的任何内容开头[A-Za-z_],所以这$oid将是一个问题。

虽然我不推荐它(也没有实际尝试过),但将 JSON 编码的 Avro 数据从 Avro 控制台消费者获取到 Logstash 的一种可能方法是使用 Pipe 输入插件

input {
  pipe {
    codec => json
    command => "/path/to/confluent/bin/kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic higee.higee.higee --from-beginning" 
  }
}

地比西

请注意,该after值始终是一个字符串,并且按照惯例,它将包含文档的 JSON 表示

http://debezium.io/docs/connectors/mongodb/

我认为这也适用于patch价值观,但我真的不知道 Debezium。

Kafka 不会在不使用简单消息转换 (SMT) 的情况下解析运行中的 JSON。阅读您链接到的文档,您可能应该将这些添加到您的连接源属性

transforms=unwrap
transforms.unwrap.type=io.debezium.connector.mongodb.transforms.UnwrapFromMongoDbEnvelope

还值得指出的是,字段展平已在路线图上 - DBZ-561

卡夫卡连接 Elasticsearch

如果不使用 Logstash 或其JSON Processor之类的东西,Elasticsearch 不会解析和处理编码的 JSON 字符串对象。相反,它只将它们作为一个完整的字符串体进行索引。

如果我没记错的话,Connect 只会将 Elasticsearch 映射应用于顶级 Avro 字段,而不是嵌套字段。

换句话说,生成的映射遵循这种模式,

"patch": {
    "string": "...some JSON object string here..."
  },

你实际上需要这样的地方 - 也许手动定义你的 ES 索引

"patch": {
   "properties": {
      "_id": {
        "properties" {
          "$oid" :  { "type": "text" }, 
          "name" :  { "type": "text" },
          "salary":  { "type": "int"  }, 
          "origin": { "type": "text" }
      },

同样,不确定是否允许使用美元符号。

卡夫卡连接 MongoDB 源

如果以上都不起作用,您可以尝试使用不同的连接器

于 2018-04-21T21:51:31.103 回答
2

我能够使用 python kafka 客户端解决这个问题。以下是我的管道的新架构。

在此处输入图像描述

即使 Confluent 文档说支持 python3,我也使用了 python 2。主要原因是有一些 python2 语法代码。例如...(不完全按照行,但语法相似)

    except NameError, err:

为了与 Python3 一起使用,我需要将上面的行转换为:

    except NameError as err:

话虽如此,以下是我的python代码。请注意,此代码仅用于原型设计,尚未用于生产。

通过 Confluent Consumer 消费消息

  • 代码

    from confluent_kafka.avro import AvroConsumer
    
    c = AvroConsumer({ 
           'bootstrap.servers': '',
           'group.id': 'groupid',
           'schema.registry.url': ''
        })
    
    c.subscribe(['higee.higee.higee'])
    
    x = True
    
    while x:
        msg = c.poll(100)
        if msg:
            message = msg.value()
            print(message)
            x = False
    
    c.close()
    
  • (在 mongodb 中更新文档后)让我们检查message变量

    {u'after': None,
     u'op': u'u',
     u'patch': u'{
         "_id" : {"$oid" : "5adafc0e2a0f383bb63910a6"},
         "name" : "higee",
         "salary" : 100,
         "origin" : "S Korea"}',
     u'source': {
         u'h': 5734791721791032689L,
         u'initsync': False,
         u'name': u'higee',
         u'ns': u'higee.higee',
         u'ord': 1,
         u'rs': u'',
         u'sec': 1524362971,
         u'version': u'0.7.5'},
     u'ts_ms': 1524362971148
     }
    

操纵消息消费

  • 代码

    patch = message['patch']
    patch_dict = eval(patch)
    patch_dict.pop('_id')
    
  • 查看patch_dict

    {'name': 'higee', 'origin': 'S Korea', 'salary': 100}
    

通过 Confluent Producer 生成消息

    from confluent_kafka import avro
    from confluent_kafka.avro import AvroProducer

    value_schema_str = """
    {
       "namespace": "higee.higee",
       "name": "MongoEvent",
       "type": "record",
       "fields" : [
           {
               "name" : "name",
               "type" : "string"
           },
           {
              "name" : "origin",
              "type" : "string"
           },
           {
               "name" : "salary",
               "type" : "int32"
           }
       ]
    }
    """
    AvroProducerConf = {
        'bootstrap.servers': '',
        'schema.registry.url': ''
    }

    value_schema = avro.load('./user.avsc')
    avroProducer = AvroProducer(
                       AvroProducerConf, 
                       default_value_schema=value_schema
                   )

    avroProducer.produce(topic='python', value=patch_dict)
    avroProducer.flush()

剩下的唯一事情是通过以下格式设置配置来使 elasticsearch sink 连接器响应新主题“python”。一切都保持不变,除了topics

    name=elasticsearch-sink
    connector.class= io.confluent.connect \ 
                     elasticsearch.ElasticsearchSinkConnector
    tasks.max=1
    topics=python
    key.ignore=true
    connection.url=''
    type.name=kafka-connect

然后运行 ​​elasticsearch sink 连接器并在 elasticsearch 上检查它。

    {
        "_index": "zzzz",
        "_type": "kafka-connect",
        "_id": "zzzz+0+3",
        "_score": 1,
        "_source": {
          "name": "higee",
          "origin": "S Korea",
          "salary": 100
        }
      }
于 2018-04-22T02:19:29.057 回答
1

对@cricket_007 的建议 +1 - 使用io.debezium.connector.mongodb.transforms.UnwrapFromMongoDbEnvelope单个消息转换。您可以在此处阅读有关 SMT 及其优势的更多信息。

于 2018-04-22T16:18:42.413 回答