1

我正在使用 Elasticsearch-Hadoop 插件将 Elasticsearch 与 Spark 连接,并且我很难将具有timestamp类型列的数据框写入 Elasticsearch。

问题是当我尝试使用动态/多资源格式来创建每日索引时。

相关文档中我得到的印象是这是可能的,但是,除非我将数据帧类型更改为date.

import pyspark
conf = pyspark.SparkConf()
conf.set('spark.jars', 'elasticsearch-spark-20_2.11-6.1.2.jar')
conf.set('es.nodes', '127.0.0.1:9200')
conf.set('es.read.metadata', 'true')
conf.set('es.nodes.wan.only', 'true')
from datetime import datetime, timedelta
now = datetime.now()
before = now - timedelta(days=1)
after = now + timedelta(days=1)
cols = ['idz', 'name', 'time']
vals = [(0,'maria', before), (1, 'lolis', after)]  
time_df = spark.createDataFrame(vals, cols)

当我尝试写作时,我使用以下内容:

time_df.write.mode('append').format(
    'org.elasticsearch.spark.sql'
).options(
    **{'es.write.operation': 'index' }
).save('xxx-{time|yyyy.MM.dd}/1')

不幸的是,这会导致错误:

.... 引起:java.lang.IllegalArgumentException:无效格式:“2018-03-04 12:36:12.949897”在 org.joda.time.format.DateTimeFormatter.parseDateTime 的“12:36:12.949897”处格式错误(DateTimeFormatter.java:945)

另一方面,如果我在创建数据框时使用日期,则效果很好:

cols = ['idz', 'name', 'time']
vals = [(0,'maria', before.date()), (1, 'lolis', after.date())]  
time_df = spark.createDataFrame(vals, cols)

是否可以使用此方法格式化timestamp要写入每日索引的数据框,而无需保留一date列?月度指数如何?

Pyspark 版本:spark 版本 2.2.1 使用 Scala 版本 2.11.8,OpenJDK 64-Bit Server VM,1.8.0_151

ElasticSearch 版本号 "6.2.2" build_hash "10b1edd" build_date "2018-02-16T19:01:30.685723Z" build_snapshot false lucene_version "7.2.1" minimum_wire_compatibility_version "5.6.0" minimum_index_compatibility_version "5.0.0"

4

0 回答 0