我正在使用带有 Cassandra 2.2.5 列和元存储的 FiloDB 0.4,并尝试使用 Spark Streaming 1.6.1 + Jobserver 0.6.2 将数据插入其中。我使用以下代码插入数据:
messages.foreachRDD(parseAndSaveToFiloDb)
private static Function<JavaPairRDD<String, String>, Void> parseAndSaveToFiloDb = initialRdd -> {
final List<RowWithSchema> parsedMessages = parseMessages(initialRdd.collect());
final JavaRDD<Row> rdd = javaSparkContext.parallelize(createRows(parsedMessages));
final DataFrame dataFrame = sqlContext.createDataFrame(rdd, generateSchema(rawMessages);
dataFrame.write().format("filodb.spark")
.option("database", keyspace)
.option("dataset", dataset)
.option("row_keys", rowKeys)
.option("partition_keys", partitionKeys)
.option("segment_key", segmentKey)
.mode(saveMode).save();
return null;
};
段键为“:string /0”,行键设置为每行唯一的列,分区键设置为所有行的 const 列。换句话说,我所有的测试数据集都转到单个分区上的单个段。当我使用单个单节点 Spark 时,一切正常,我插入了所有数据,但是当我同时运行两个单独的单节点 Spark(不是作为集群)时,我会迷失大约 30-60即使我以几秒为间隔一一发送消息,数据的百分比。我检查了 dataFrame.write() 是否为每条消息执行,因此问题发生在此行之后。当我将段键设置为每行唯一的列时,所有数据都会到达 Cassandra/FiloDB。
请向我建议具有 2 个单独火花的场景的解决方案。