1

我正在尝试通过 Thrift 在 pyspark 上创建一个临时表。我的最终目标是能够使用 JDBC 从 DBeaver 等数据库客户端访问它。

我首先使用直线进行测试。

这就是我正在做的事情。

  1. 使用 docker 在我自己的机器上用一个工人启动了一个集群并添加spark.sql.hive.thriftServer.singleSession truespark-defaults.conf
  2. 启动 Pyspark shell(为了测试)并运行以下代码:

    from pyspark.sql import Row l = [('Ankit',25),('Jalfaizy',22),('saurabh',20),('Bala',26)] rdd = sc.parallelize(l) people = rdd.map(lambda x: Row(name=x[0], age=int(x[1]))) people = people.toDF().cache() peebs = people.createOrReplaceTempView('peebs') result = sqlContext.sql('select * from peebs')

    到目前为止一切顺利,一切正常。

  3. 在另一个终端上,我初始化 spark thrift 服务器: ./sbin/start-thriftserver.sh --hiveconf hive.server2.thrift.port=10001 --conf spark.executor.cores=1 --master spark://172.18.0.2:7077

    服务器似乎正常启动,我可以看到在我的 spark 集群主 UI 上运行的 pyspark 和 thrift 服务器作业。

  4. 然后我使用直线连接到集群

    ./bin/beeline beeline> !connect jdbc:hive2://172.18.0.2:10001

    这就是我得到的

    连接到 jdbc:hive2://172.18.0.2:10001
    Enter username for jdbc:hive2://172.18.0.2:10001:
    Enter password for jdbc:hive2://172.18.0.2:10001:
    2019-06-29 20: 14:25 INFO Utils:310 - 提供的权限:172.18.0.2:10001
    2019-06-29 20:14:25 INFO Utils:397 - 已解决的权限:172.18.0.2:10001
    2019-06-29 20:14:25 INFO HiveConnection:203 - 将尝试使用 JDBC Uri 打开客户端传输:jdbc:hive2://172.18.0.2:10001
    连接到:Spark SQL(版本 2.3.3)
    驱动程序:Hive JDBC(版本 1.2.1.spark2)
    事务隔离: TRANSACTION_REPEATABLE_READ

    似乎还可以。

  5. 当我列出时,show tables;我什么都看不到。

我想强调的两件有趣的事情是:

  1. 当我启动 pyspark 时,我收到这些警告

    WARN ObjectStore:6666 - 在 Metastore 中找不到版本信息。hive.metastore.schema.verification 未启用,因此记录架构版本 1.2.0

    WARN ObjectStore:568 - 无法获取数据库默认值,返回 NoSuchObjectException

    WARN ObjectStore:568 - 无法获取数据库 global_temp,返回 NoSuchObjectException

  2. 当我启动节俭服务器时,我得到了这些:

    来自 spark://172.18.0.2:7077
    ssh 的 rsync:无法解析主机名 spark:名称或服务未知
    rsync:连接意外关闭(到目前为止已收到 0 个字节) [Receiver]
    rsync 错误:io 出现无法解释的错误(代码 255) .c(235) [Receiver=3.1.2]
    启动 org.apache.spark.sql.hive.thriftserver.HiveThriftServer2,登录到 ...

我经历了几个帖子和讨论。我看到有人说我们不能通过 thrift 公开临时表,除非您从同一代码中启动服务器。如果这是真的,我怎么能在 python (pyspark) 中做到这一点?

谢谢

4

3 回答 3

1

如果有人需要在 Spark Streaming 中执行此操作,我可以让它像这样工作。

from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from py4j.java_gateway import java_import

spark = SparkSession \
    .builder \
    .appName('restlogs_qlik') \
    .enableHiveSupport()\
    .config('spark.sql.hive.thriftServer.singleSession', True)\
    .config('hive.server2.thrift.port', '10001') \
    .getOrCreate()

sc=spark.sparkContext
sc.setLogLevel('INFO')

#Order matters! 
java_import(sc._gateway.jvm, "")
sc._gateway.jvm.org.apache.spark.sql.hive.thriftserver.HiveThriftServer2.startWithContext(spark._jwrapped)

#Define schema of json
schema = StructType().add("partnerid", "string").add("sessionid", "string").add("functionname", "string").add("functionreturnstatus", "string")

#load data into spark-structured streaming
df = spark \
      .readStream \
      .format("kafka") \
      .option("kafka.bootstrap.servers", "localhost:9092") \
      .option("subscribe", "rest_logs") \
      .load() \
      .select(from_json(col("value").cast("string"), schema).alias("parsed_value"))

#Print output
query = df.writeStream \
            .outputMode("append") \
            .format("memory") \
            .queryName("view_figures") \
            .start()

query.awaitTermination();

启动后就可以通过 JDBC 和蜂箱进行测试了。我无法理解的是我必须在同一个脚本中启动 Thrift 服务器。这是启动脚本的方法。

    spark-submit --master local[2] \
--conf "spark.driver.extraClassPath=D:\\Libraries\\m2_repository\\org\\apache\\kafka\\kafka-clients\\2.0.0\\kafka-clients-2.0.0.jar" \
--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.1 \
"C:\\Temp\\spark_kafka.py"

希望这可以帮助某人。顺便说一句,我处于初步研究阶段,所以不要评判我。

于 2019-10-18T09:42:50.817 回答
0

在做了几次测试之后,我能够想出一个对我有用的简单(无身份验证)代码。

重要的是要注意,如果您想通过 JDBC 使临时表可用,您需要在同一个 JVM(同一个 spark 作业)中启动 thrift 服务器,并确保代码挂起,以便应用程序在集群中保持运行。

按照我创建的工作示例代码供参考:

import time
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from py4j.java_gateway import java_import

spark = SparkSession \
    .builder \
    .appName('the_test') \
    .enableHiveSupport()\
    .config('spark.sql.hive.thriftServer.singleSession', True)\
    .config('hive.server2.thrift.port', '10001') \
    .getOrCreate()

sc=spark.sparkContext
sc.setLogLevel('INFO')

java_import(sc._gateway.jvm, "")


from pyspark.sql import Row
l = [('John', 20), ('Heather', 34), ('Sam', 23), ('Danny', 36)]
rdd = sc.parallelize(l)
people = rdd.map(lambda x: Row(name=x[0], age=int(x[1])))
people = people.toDF().cache()
peebs = people.createOrReplaceTempView('peebs')

sc._gateway.jvm.org.apache.spark.sql.hive.thriftserver.HiveThriftServer2.startWithContext(spark._jwrapped)

while True:
    time.sleep(10)

我只是在我的 spark-submit 中使用了上面的 .py,我能够通过 beeline 通过 JDBC 连接,并使用 Hive JDBC 驱动程序使用 DBeaver。

于 2019-07-09T14:27:00.820 回答
0

createOrReplaceTempView创建一个内存表。Spark thrift 服务器需要在我们创建内存表的同一驱动程序 JVM 上启动。
在上面的示例中,创建表的驱动程序和运行 STS(Spark Thrift 服务器)的驱动程序是不同的。
两个选项1.在启动 STS 的同一 JVM 中
创建表。 2. 使用后备元存储,并使用创建表,以便可以独立于 JVM 访问表(实际上没有任何 Spark 驱动程序。 createOrReplaceTempView
org.apache.spark.sql.DataFrameWriter#saveAsTable

关于错误:
1.与客户端和服务器元存储版本有关。
2.似乎一些rsync脚本试图解码spark:\\url
两者似乎都与问题无关。

于 2019-06-30T10:48:48.970 回答