0

我想从 elasticsearch 获取两个数据

一个用查询过滤,另一个没有过滤器。

 // with query
 session = get_spark_session(query=query)

 df = session.read.option(
     "es.resource", "analytics-prod-2019.08.02"
 ).format("org.elasticsearch.spark.sql").load()


 df.show() // empty result

 // without query
 session = get_spark_session()

 df = session.read.option(
     "es.resource", "analytics-prod-2019.08.02"
 ).format("org.elasticsearch.spark.sql").load()

 df.show() // empty result


 def get_spark_session(query=None, excludes=[]):

     conf = pyspark.SparkConf()
     conf.set("spark.driver.allowMultipleContexts", "true")
     conf.set("es.index.auto.create", "true")
     conf.set("es.nodes.discovery", "true")
     conf.set("es.scroll.size", 10000)
     conf.set("es.read.field.exclude", excludes)
     conf.set("spark.driver.extraClassPath", "/usr/local/elasticsearch-hadoop/dist/elasticsearch-spark-20_2.11-6.6.2.jar")
     if query:
         conf.set("es.query", query)


     sc = SparkSession.builder.config(conf=conf).getOrCreate()

     return sc 

问题是会话是否被重用..

当我filtered先运行查询,然后再non-filtered查询时,两者都给出空结果

但是当我non-filtered第一次运行查询时,它显示了一些结果,随后的filtered查询显示了空结果。

 // below, I reverse the order
 // without query
 session = get_spark_session()

 df = session.read.option(
     "es.resource", "analytics-prod-2019.08.02"
 ).format("org.elasticsearch.spark.sql").load()

 df.show() // some result

 // with query
 session = get_spark_session(query=query)

 df = session.read.option(
     "es.resource", "analytics-prod-2019.08.02"
 ).format("org.elasticsearch.spark.sql").load()


 df.show() // empty result

** 编辑

因此,我可以通过以下方式获得所需的结果:

def get_spark_session(query=None, excludes=[]):

    conf = pyspark.SparkConf()
    conf.set("spark.driver.allowMultipleContexts", "true")
    conf.set("es.index.auto.create", "true")
    conf.set("es.nodes.discovery", "true")
    conf.set("es.scroll.size", 10000)
    conf.set("es.read.field.exclude", excludes)
    conf.set("spark.driver.extraClassPath", "/usr/local/elasticsearch-hadoop/dist/elasticsearch-spark-20_2.11-6.6.2.jar")
    if query:
        conf.set("es.query", query)
    else:
        conf.set("es.query", "") # unset the query 
4

1 回答 1

0

SparkSession.builder 获取现有的 SparkSession,或者,如果没有现有的,则根据此构建器中设置的选项创建一个新的。在您的情况下,火花配置正在被重用。从配置中删除“es.query”应该可以解决这个问题:

def get_spark_session(query=None, excludes=[]):
     conf = pyspark.SparkConf()
     conf.unset("es.query")
     conf.set("spark.driver.allowMultipleContexts", "true")
     conf.set("es.index.auto.create", "true")
     conf.set("es.nodes.discovery", "true")
     conf.set("es.scroll.size", 10000)
     conf.set("es.read.field.exclude", excludes)
     conf.set("spark.driver.extraClassPath", "/usr/local/elasticsearch-hadoop/dist/elasticsearch-spark-20_2.11-6.6.2.jar")
     if query:
         conf.set("es.query", query)    

     sc = SparkSession.builder.config(conf=conf).getOrCreate()    
     return sc 
于 2019-08-21T03:22:22.640 回答