5

我设计了一个简单的工作来从 MySQL 读取数据并将其保存在 Spark 的 Elasticsearch 中。

这是代码:

JavaSparkContext sc = new JavaSparkContext(
        new SparkConf().setAppName("MySQLtoEs")
                .set("es.index.auto.create", "true")
                .set("es.nodes", "127.0.0.1:9200")
                .set("es.mapping.id", "id")
                .set("spark.serializer", KryoSerializer.class.getName()));

SQLContext sqlContext = new SQLContext(sc);

// Data source options
Map<String, String> options = new HashMap<>();
options.put("driver", MYSQL_DRIVER);
options.put("url", MYSQL_CONNECTION_URL);
options.put("dbtable", "OFFERS");
options.put("partitionColumn", "id");
options.put("lowerBound", "10001");
options.put("upperBound", "499999");
options.put("numPartitions", "10");

// Load MySQL query result as DataFrame
LOGGER.info("Loading DataFrame");
DataFrame jdbcDF = sqlContext.load("jdbc", options);
DataFrame df = jdbcDF.select("id", "title", "description",
        "merchantId", "price", "keywords", "brandId", "categoryId");
df.show();
LOGGER.info("df.count : " + df.count());
EsSparkSQL.saveToEs(df, "offers/product");

可以看到代码非常简单。它将数据读入 DataFrame,选择一些列,然后count在 Dataframe 上执行 a 作为基本操作。到目前为止一切正常。

然后它尝试将数据保存到 Elasticsearch 中,但它失败了,因为它无法处理某些类型。您可以在此处查看错误日志。

我不确定为什么它不能处理那种类型。有谁知道为什么会这样?

我正在使用 Apache Spark 1.5.0、Elasticsearch 1.4.4 和 elaticsearch-hadoop 2.1.1

编辑:

  • 我已经使用示例数据集以及源代码更新了要点链接。
  • 我还尝试使用@costin 在邮件列表中提到的 elasticsearch -hadoop开发版本。
4

1 回答 1

15

这个问题的答案很棘手,但多亏了samklr,我设法弄清楚了问题所在。

尽管如此,该解决方案并不简单,并且可能会考虑一些“不必要的”转换。

首先让我们谈谈序列化

在 Spark 数据序列化和函数序列化中,序列化有两个方面需要考虑。在这种情况下,它是关于数据序列化和反序列化的。

从 Spark 的角度来看,唯一需要做的就是设置序列化 - Spark 默认依赖于 Java 序列化,这很方便但效率相当低。这就是 Hadoop 本身引入了自己的序列化机制和自己的类型的原因——即Writables. 因此,Spark 无法理解开箱即用的情况,InputFormat并且OutputFormats需要返回。Writables

使用 elasticsearch-spark 连接器,必须启用一种不同的序列化 (Kryo),它会自动处理转换,并且非常有效。

conf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")

即使 Kryo 不要求类实现要序列化的特定接口,这意味着 POJO 可以在 RDD 中使用,除了启用 Kryo 序列化之外,无需任何进一步的工作。

也就是说,@samklr 向我指出 Kryo 需要在使用它们之前注册类。

这是因为 Kryo 写入了对正在序列化的对象的类的引用(为每个写入的对象写入一个引用),如果该类已注册,则它只是一个整数标识符,否则是完整的类名。Spark 代表您注册 Scala 类和许多其他框架类(如 Avro Generic 或 Thrift 类)。

使用 Kryo 注册课程很简单。创建 KryoRegistrator 的子类,并重写该registerClasses()方法:

public class MyKryoRegistrator implements KryoRegistrator, Serializable {
    @Override
    public void registerClasses(Kryo kryo) {
        // Product POJO associated to a product Row from the DataFrame            
        kryo.register(Product.class); 
    }
}

最后,在您的驱动程序中,将 spark.kryo.registrator 属性设置为您的 KryoRegistrator 实现的完全限定类名:

conf.set("spark.kryo.registrator", "MyKryoRegistrator")

其次,即使设置了 Kryo 序列化程序并注册了类,对 Spark 1.5 进行了更改,并且由于某种原因,Elasticsearch 无法反序列化Dataframe,因为它无法SchemaType将 Dataframe 推断到连接器中。

所以我不得不将 Dataframe 转换为 JavaRDD

JavaRDD<Product> products = df.javaRDD().map(new Function<Row, Product>() {
    public Product call(Row row) throws Exception {
        long id = row.getLong(0);
        String title = row.getString(1);
        String description = row.getString(2);
        int merchantId = row.getInt(3);
        double price = row.getDecimal(4).doubleValue();
        String keywords = row.getString(5);
        long brandId = row.getLong(6);
        int categoryId = row.getInt(7);
        return new Product(id, title, description, merchantId, price, keywords, brandId, categoryId);
    }
});

现在数据已准备好写入 elasticsearch :

JavaEsSpark.saveToEs(products, "test/test");

参考:

  • Elasticsearch 的 Apache Spark 支持文档
  • Hadoop 权威指南,第 19 章。Spark,编辑。4 - 汤姆怀特。
  • 用户samklr
于 2015-10-09T15:42:51.420 回答