1

我目前正在尝试将我的应用程序升级到 Spark 3.0.1。对于表创建,我使用 Python-Cassandra 连接器 cassandra-driver 删除并创建一个表。然后我使用 spark-cassandra 连接器将数据框写入表中。仅使用 spark-cassandra 连接器来创建和删除表并没有真正好的替代方法。

在 Spark 2.4 中,drop-create-write 流程没有问题。但是在 Spark 3.0 中,应用程序似乎没有特定的顺序来做这些事情,通常在删除和创建之前尝试编写。我不知道如何确保首先删除和创建表。我知道即使应用程序在写入时出错,也会发生删除和创建,因为当我通过 cqlsh 查询 Cassandra 时,我可以看到表被删除并重新创建。关于 Spark 3.0 中这种行为的任何想法?

注意:因为架构发生了变化,所以需要删除并重新创建这个特定的表,而不是直接覆盖。

根据要求的代码片段:

        session = self._get_python_cassandra_session(self.env_conf, self.database)
        # build drop table query
        drop_table_query = 'DROP TABLE IF EXISTS {}.{}'.format(self.database, tablename)
        session.execute(drop_table_query)

        df, table_columns, table_keys = self._create_table_metadata(df, keys=keys)
        # build create query
        create_table_query = 'CREATE TABLE IF NOT EXISTS {}.{} ({} PRIMARY KEY({}), );'.format(self.database, tablename, table_columns, table_keys)
        # execute table creation
        session.execute(create_table_query)
        session.shutdown()


        # spark-cassandra connection options
        copts = _cassandra_cluster_spark_options(self.env_conf)
        # set write mode
        copts['confirm.truncate'] = overwrite
        mode = 'overwrite' if overwrite else 'append'
        # write dataframe to cassandra
        get_dataframe_writer(df, 'cassandra', keyspace=self.database, 
        table=tablename, mode=mode, copts=copts).save()
4

2 回答 2

0

我最终构建了一个 time.sleep(5) 延迟和 100 秒超时,以定期 ping Cassandra 的表,然后在找到表时写入。

于 2020-10-27T14:19:31.313 回答
0

在 Spark Cassandra 连接器 3.0+ 中,您可以使用新功能 - 通过 Catalogs API 操作键空间和表。您可以使用 Spark SQL 创建/更改/删除键空间和表。例如,您可以使用以下命令在 Cassandra 中创建一个表:

CREATE TABLE casscatalog.ksname.table_name (
  key_1 Int, 
  key_2 Int, 
  key_3 Int, 
  cc1 STRING, 
  cc2 String, 
  cc3 String, 
  value String) 
USING cassandra
PARTITIONED BY (key_1, key_2, key_3)
TBLPROPERTIES (
    clustering_key='cc1.asc, cc2.desc, cc3.asc',
    compaction='{class=SizeTieredCompactionStrategy,bucket_high=1001}'
)

正如您在此处看到的,您可以指定相当复杂的主键,还可以指定表选项。该casscatalog部分是链接到特定 Cassandra 集群的前缀(您可以同时使用多个) - 它在您启动 Spark 作业时指定,例如:

spark-shell --packages com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.0.0 \
  --conf spark.sql.catalog.casscatalog=com.datastax.spark.connector.datasource.CassandraCatalog

更多示例可以在文档中找到:

于 2020-10-30T08:32:12.763 回答