我需要使用 Thrift 在 Spark 上公开一些临时表。这是我在集群上运行的基本代码:

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from py4j.java_gateway import java_import
# java_import(sc._gateway.jvm,"")

spark = SparkSession \
    .builder \
    .appName('the_test') \
    .config('spark.sql.hive.thriftServer.singleSession', True)\


java_import(sc._gateway.jvm, "")

from pyspark.sql import Row
l = [('Ankit',25),('Jalfaizy',22),('saurabh',20),('Bala',26)]
rdd = sc.parallelize(l)
people = rdd.map(lambda x: Row(name=x[0], age=int(x[1])))
people = people.toDF().cache()
peebs = people.createOrReplaceTempView('peebs')

#Start the Thrift Server using the jvm and passing the same spark session corresponding to pyspark session in the jvm side.


./bin/spark-submit --conf spark.executor.cores=1  --master spark:// /home/spark/test.py



我正在使用 spark.sql.hive.thriftServer.singleSession True 来确保它从同一个会话中读取并且我没有看到任何错误,它只是简单地开始遍历所有逻辑并完成工作而不会挂起所以我能够使用 JDBC 从直线或其他 SQL 客户端访问临时表。



您的脚本终止,因为在 thriftserver 启动后没有什么可做的了。只需通过睡眠使其保持活力:

import time
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from py4j.java_gateway import java_import
# java_import(sc._gateway.jvm,"")

spark = SparkSession \
    .builder \
    .appName('the_test') \
    .config('spark.sql.hive.thriftServer.singleSession', True)\
    .config("hive.server2.thrift.port", "20001") \


java_import(sc._gateway.jvm, "")

from pyspark.sql import Row
l = [('Ankit',25),('Jalfaizy',22),('saurabh',20),('Bala',26)]
rdd = sc.parallelize(l)
people = rdd.map(lambda x: Row(name=x[0], age=int(x[1])))
people = people.toDF().cache()
peebs = people.createOrReplaceTempView('peebs')

#Start the Thrift Server using the jvm and passing the same spark session corresponding to pyspark session in the jvm side.

#this keeps the thriftserver running
while True:
