我目前正在尝试将我的应用程序升级到 Spark 3.0.1。对于表创建,我使用 Python-Cassandra 连接器 cassandra-driver 删除并创建一个表。然后我使用 spark-cassandra 连接器将数据框写入表中。仅使用 spark-cassandra 连接器来创建和删除表并没有真正好的替代方法。
在 Spark 2.4 中,drop-create-write 流程没有问题。但是在 Spark 3.0 中,应用程序似乎没有特定的顺序来做这些事情,通常在删除和创建之前尝试编写。我不知道如何确保首先删除和创建表。我知道即使应用程序在写入时出错,也会发生删除和创建,因为当我通过 cqlsh 查询 Cassandra 时,我可以看到表被删除并重新创建。关于 Spark 3.0 中这种行为的任何想法?
注意:因为架构发生了变化,所以需要删除并重新创建这个特定的表,而不是直接覆盖。
根据要求的代码片段:
session = self._get_python_cassandra_session(self.env_conf, self.database)
# build drop table query
drop_table_query = 'DROP TABLE IF EXISTS {}.{}'.format(self.database, tablename)
session.execute(drop_table_query)
df, table_columns, table_keys = self._create_table_metadata(df, keys=keys)
# build create query
create_table_query = 'CREATE TABLE IF NOT EXISTS {}.{} ({} PRIMARY KEY({}), );'.format(self.database, tablename, table_columns, table_keys)
# execute table creation
session.execute(create_table_query)
session.shutdown()
# spark-cassandra connection options
copts = _cassandra_cluster_spark_options(self.env_conf)
# set write mode
copts['confirm.truncate'] = overwrite
mode = 'overwrite' if overwrite else 'append'
# write dataframe to cassandra
get_dataframe_writer(df, 'cassandra', keyspace=self.database,
table=tablename, mode=mode, copts=copts).save()