spark-streaming-kafka-0-8_2.11:2.3.0:(我的旧逻辑
def process_rdd(ts, rdd):
print rdd.count()
# The first stage in this pipe is to check rdd count
if rdd.count():
try:
# The second stage in this pipe is to collect the rdd value
lines = rdd.map(lambda x: x[1]).collect()
# The third stage in this pipe is to iterate the rdd one by one
for x in lines:
# The fourth stage in this pipe is to convert into json
x = json.loads(x)
# The fifth stage in this pipe is to chose operations
if x["database"] == "test":
portal_db_name = "portal"
if x["database"] == "test_db":
portal_db_name = "new_portal"
if __name__ == "__main__":
try:
# kafka_streams =KafkaUtils.createStream(ssc, zkQuorum, "kafka-spark-streaming-mongodb-test", {topic: 1})
kafkaStream = KafkaUtils.createStream(
ssc, kafkabrokerhost, 'spark-streaming', {topicname: 1})
kafkaStream.foreachRDD(process_rdd)
except Exception as e:
logging.error("message")
print(e)
ssc.start()
ssc.awaitTermination()
ssc.stop()
如何实现相同的功能 spark-streaming-kafka-0-10_2.12:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
KAFKA_TOPIC = "test1"
KAFKA_SERVER = "localhost:6667"
def process_rdd(ts, rdd):
print("siva")
print("hi--------", rdd.count())
print(rdd.collect())
# creating an instance of SparkSession
spark_session = SparkSession \
.builder \
.appName("Python Spark create RDD") \
.getOrCreate()
# Subscribe to 1 topic
df = spark_session \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", KAFKA_SERVER) \
.option("subscribe", KAFKA_TOPIC) \
.load()
问题:
如何转换成foreachRDD spark-streaming-kafka-0-10_2.12?
如何实现以下功能
行 = rdd.map(lambda x: x[1]).collect()
我得到以下错误
Traceback (most recent call last):
File "/home/valor-pgadmin-001/cdc/drive/test.py", line 41, in <module>
rdd = df.rdd
File "/usr/local/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line 85, in rdd
File "/usr/local/lib/python3.6/site-packages/pyspark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__
File "/usr/local/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 117, in deco
pyspark.sql.utils.AnalysisException: Queries with streaming sources must be executed with writeStream.start();
kafka