我设计了一个简单的工作来从 MySQL 读取数据并将其保存在 Spark 的 Elasticsearch 中。
这是代码:
JavaSparkContext sc = new JavaSparkContext(
new SparkConf().setAppName("MySQLtoEs")
.set("es.index.auto.create", "true")
.set("es.nodes", "127.0.0.1:9200")
.set("es.mapping.id", "id")
.set("spark.serializer", KryoSerializer.class.getName()));
SQLContext sqlContext = new SQLContext(sc);
// Data source options
Map<String, String> options = new HashMap<>();
options.put("driver", MYSQL_DRIVER);
options.put("url", MYSQL_CONNECTION_URL);
options.put("dbtable", "OFFERS");
options.put("partitionColumn", "id");
options.put("lowerBound", "10001");
options.put("upperBound", "499999");
options.put("numPartitions", "10");
// Load MySQL query result as DataFrame
LOGGER.info("Loading DataFrame");
DataFrame jdbcDF = sqlContext.load("jdbc", options);
DataFrame df = jdbcDF.select("id", "title", "description",
"merchantId", "price", "keywords", "brandId", "categoryId");
df.show();
LOGGER.info("df.count : " + df.count());
EsSparkSQL.saveToEs(df, "offers/product");
可以看到代码非常简单。它将数据读入 DataFrame,选择一些列,然后count
在 Dataframe 上执行 a 作为基本操作。到目前为止一切正常。
然后它尝试将数据保存到 Elasticsearch 中,但它失败了,因为它无法处理某些类型。您可以在此处查看错误日志。
我不确定为什么它不能处理那种类型。有谁知道为什么会这样?
我正在使用 Apache Spark 1.5.0、Elasticsearch 1.4.4 和 elaticsearch-hadoop 2.1.1
编辑:
- 我已经使用示例数据集以及源代码更新了要点链接。
- 我还尝试使用@costin 在邮件列表中提到的 elasticsearch -hadoop开发版本。