9

所以我有一个 Python Stream-sourced DataFrame ,其中包含我想使用spark-cassandra-connectordf放入 Cassandra 表中的所有数据。我尝试过两种方式:

df.write \
    .format("org.apache.spark.sql.cassandra") \
    .mode('append') \
    .options(table="myTable",keyspace="myKeySpace") \
    .save() 

query = df.writeStream \
    .format("org.apache.spark.sql.cassandra") \
    .outputMode('append') \
    .options(table="myTable",keyspace="myKeySpace") \
    .start()

query.awaitTermination()

但是,我不断收到此错误:

pyspark.sql.utils.AnalysisException: "'write' can not be called on streaming Dataset/DataFrame;

java.lang.UnsupportedOperationException: Data source org.apache.spark.sql.cassandra does not support streamed writing.

无论如何我可以将我的 Streamed DataFrame 发送到我的 Cassandra 表中吗?

4

2 回答 2

6

目前Sink在 Spark Cassandra 连接器中没有 Cassandra 的流式传输。您将需要实现自己的Sink或等待它变得可用。

如果您使用的是 Scala 或 Java,则可以使用foreachoperator 并按照Using ForeachForeachWriter中的描述使用 a 。

于 2017-07-15T02:21:52.273 回答
5

我知道它是一个旧帖子,更新它以供将来参考。

您可以从流数据中将其作为批处理进行处理。像下面

def writeToCassandra(writeDF, epochId):
 writeDF.write \
    .format("org.apache.spark.sql.cassandra") \
    .options(table="table_name", keyspace="keyspacename")\
    .mode("append") \
    .save()

query = sdf3.writeStream \
.trigger(processingTime="10 seconds") \
.outputMode("update") \
.foreachBatch(writeToCassandra) \
.start()
于 2019-11-05T22:38:22.860 回答