0

大家好 !

我正在尝试将我的结构化流数据帧发送到我的 kafka 主题之一,detection.

这是结构化流数据帧的架构:

 root
 |-- timestamp: timestamp (nullable = true)
 |-- Sigma: string (nullable = true)
 |-- time: string (nullable = true)
 |-- duration: string (nullable = true)
 |-- SourceComputer: string (nullable = true)
 |-- SourcePort: string (nullable = true)
 |-- DestinationComputer: string (nullable = true)
 |-- DestinationPort: string (nullable = false)
 |-- protocol: string (nullable = true)
 |-- packetCount: string (nullable = true)
 |-- byteCount: string (nullable = true)

但后来我尝试使用这种方法发送数据帧:

dfwriter=df \
  .selectExpr("CAST(value AS STRING)") \
  .writeStream \
  .format("kafka") \
  .option("checkpointLocation", "/Documents/checkpoint/logs") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("failOnDataLoss", "false") \
  .option("topic", detection) \
  .start() 

然后我得到了错误:

pyspark.sql.utils.AnalysisException:无法解析“ value”给定输入列:[DestinationComputer、DestinationPort、Sigma、SourceComputer、SourcePort、byteCount、duration、packetCount、processName、protocol、time、timestamp];第 1 行第 5 行;

如果我发送一个带有value它工作的列的数据框,我会收到关于我的 kafka 主题消费者的数据。

任何想法发送我的所有列的数据框?

谢谢 !

4

1 回答 1

0

value正如错误所说,您的数据框没有列。

您需要“嵌入”列下的所有列value StructType,然后使用类似的函数to_json,而不是CAST( .. AS STRING)

在 Pyspark 中,这就像struct(to_json(struct($"*")).as("value")在选择查询中

类似的问题 -将 spark 数据帧的所有列转换为 json 格式,然后将 json 格式的数据作为列包含在另一个/父数据帧中

于 2021-11-02T15:08:04.487 回答