0

我正在使用带有 Cassandra 2.2.5 列和元存储的 FiloDB 0.4,并尝试使用 Spark Streaming 1.6.1 + Jobserver 0.6.2 将数据插入其中。我使用以下代码插入数据:

messages.foreachRDD(parseAndSaveToFiloDb)

private static Function<JavaPairRDD<String, String>, Void> parseAndSaveToFiloDb = initialRdd -> {
        final List<RowWithSchema> parsedMessages = parseMessages(initialRdd.collect());
        final JavaRDD<Row> rdd = javaSparkContext.parallelize(createRows(parsedMessages));
        final DataFrame dataFrame = sqlContext.createDataFrame(rdd, generateSchema(rawMessages);

        dataFrame.write().format("filodb.spark")
                .option("database", keyspace)
                .option("dataset", dataset)
                .option("row_keys", rowKeys)
                .option("partition_keys", partitionKeys)
                .option("segment_key", segmentKey)
                .mode(saveMode).save();
        return null;
    };

段键为“:string /0”,行键设置为每行唯一的列,分区键设置为所有行的 const 列。换句话说,我所有的测试数据集都转到单个分区上的单个段。当我使用单个单节点 Spark 时,一切正常,我插入了所有数据,但是当我同时运行两个单独的单节点 Spark(不是作为集群)时,我会迷失大约 30-60即使我以几秒为间隔一一发送消息,数据的百分比。我检查了 dataFrame.write() 是否为每条消息执行,因此问题发生在此行之后。当我将段键设置为每行唯一的列时,所有数据都会到达 Cassandra/FiloDB。

请向我建议具有 2 个单独火花的场景的解决方案。

4

1 回答 1

1

@psyduck,这很可能是因为每个分区的数据一次只能在一个节点上摄取——对于 0.4 版本。因此,要坚持使用当前版本,您需要将数据划分为多个分区,然后确保每个工作人员只获得一个分区。实现上述目标的最简单方法是按分区键对数据进行排序。

不过,我强烈建议您迁移到最新版本 - master (Spark 2.x / Scala 2.11) 或 spark1.6 分支 (spark 1.6 / Scala 2.10)。最新版本有许多 0.4 中没有的更改可以解决您的问题:

  • 使用 Akka Cluster 自动将您的数据路由到正确的摄取节点。在这种情况下,使用相同的模型,您的数据将全部进入正确的节点并确保没有数据丢失
  • 基于 TimeUUID 的 chunkID,因此即使多个工作人员(在脑裂的情况下)以某种方式写入同一个分区,也可以避免数据丢失
  • 一种新的“少段”数据模型,因此您无需定义任何段键,读取和写入效率更高

随时联系我们的邮件列表https://groups.google.com/forum/#!forum/filodb-discuss

于 2017-01-26T21:23:42.723 回答