1

我有一个 pyspark 应用程序正在使用来自 Kafka 主题的消息,这些消息由org.apache.kafka.connect.json.JsonConverter. 我正在使用融合的 Kafka JDBC 连接器来执行此操作

问题是,当我使用消息时,ID 列出现在某种编码文本中,例如“ARM=”,而它应该是数字类型。

这是我现在拥有的代码

spark = SparkSession.builder.appName("my app").getOrCreate()
sc = spark.sparkContext
sc.setLogLevel('WARN')
ssc = StreamingContext(sc, 5)

kafka_params = {
    "bootstrap.servers": "kafkahost:9092",
    "group.id": "Deserialize"
}

kafka_stream = KafkaUtils.createDirectStream(ssc, ['mytopic'], kafka_params)
kafka_stream.foreachRDD(lambda rdd: rdd.foreach(lambda x: print(x))

ssc.start()
ssc.awaitTermination()

我知道 createDirectStream 有一个我可以设置的 valueDecoder 参数,问题是我不知道如何使用它进行解码。我也事先知道架构,因此如果需要,我将能够创建一个。

作为参考,这是我打印出 rdd.foreach 时得到的 JSON

{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "bytes",
        "optional": False,
        "name": "org.apache.kafka.connect.data.Decimal",
        "version": 1,
        "parameters": {
          "scale": "0"
        },
        "field": "ID"
      },
      {
        "type": "string",
        "optional": True,
        "field": "COLUMN1"
      }
    ],
    "optional": False
  },
  "payload": {
    "ID": "AOo=",
    "COLUMN1": "some string"
  }
}
4

2 回答 2

2

因此,正如 cricket_007 所提到的,在您的融合 Kafka 配置中,您必须将设置设置为 this value.converter.schema.enable=false。这将摆脱 Schema 字段,只留下有效负载 json。现在由于某种原因,我遇到了一个问题,我的所有数字列都将以这种奇怪的格式进行编码AOo=。现在,当使用 Json 序列化您的数据时,confluent 将使用 base64 转换您的数字列,但真正的问题甚至在此之前。由于某种原因,我所有的数字列都被转换为字节。不确定它为什么这样做,但它与 Confluent 处理 Oracle 数据库的方式有关。无论如何,解决这个问题的方法是在你的createDirectStream比如

kafka_stream = KafkaUtils.createDirectStream(ssc, ['mytopic'], kafka_params, valueDecoder=decoder)

在您的解码器方法中,您必须从 UTF-8 解码您的消息,解析 json,然后从 base64 解码您的数字列,然后从像这样的字节解码

def decoder(s):
    if s is None:
        return None

    loaded_json = json.loads(s.decode('utf-8'))
    loaded_json["ID"] = int.from_bytes(base64.b64decode(loaded_json['ID']), "big")
    return loaded_json
于 2019-03-22T17:48:13.450 回答
1

在您的 Connect 配置中,您可以设置value.converter.schema.enable=false,然后您只会获得该 JSON 记录的“有效负载”数据。

从那里,我假设您将能够根据在 PySpark 中读取流式 JSON 的任何其他示例来处理消息。

否则,由于您没有使用结构化流,因此没有可供您定义的模式。相反,您至少必须做类似的事情来解析记录

rdd.map(lambda x: json.loads(x))\
    .map(lambda x: x['payload'])\
    .foreach(lambda x: print(x))
于 2019-03-21T19:43:38.240 回答