0

spark-streaming-kafka-0-8_2.11:2.3.0:(我的旧逻辑

def process_rdd(ts, rdd):
    print rdd.count()
    # The first stage in this pipe is to check rdd count
    if rdd.count():
        try:
            # The second stage in this pipe is to collect the rdd value
            lines = rdd.map(lambda x: x[1]).collect()
            # The third stage in this pipe is to iterate the rdd one by one
            for x in lines:
                # The fourth stage in this pipe is to convert into json
                x = json.loads(x)
                # The fifth stage in this pipe is to chose operations
                if x["database"] == "test":
                    portal_db_name = "portal"
                if x["database"] == "test_db":
                    portal_db_name = "new_portal"
if __name__ == "__main__":              
    try:
        # kafka_streams =KafkaUtils.createStream(ssc, zkQuorum, "kafka-spark-streaming-mongodb-test", {topic: 1})
        kafkaStream = KafkaUtils.createStream(
            ssc, kafkabrokerhost, 'spark-streaming', {topicname: 1})

        kafkaStream.foreachRDD(process_rdd)
    except Exception as e:
        logging.error("message")
        print(e)
    ssc.start()
    ssc.awaitTermination()
    ssc.stop()

如何实现相同的功能 spark-streaming-kafka-0-10_2.12:

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
KAFKA_TOPIC = "test1"
KAFKA_SERVER = "localhost:6667"

def process_rdd(ts, rdd):
    print("siva")
    print("hi--------", rdd.count())
    print(rdd.collect())

# creating an instance of SparkSession
spark_session = SparkSession \
    .builder \
    .appName("Python Spark create RDD") \
    .getOrCreate()

# Subscribe to 1 topic
df = spark_session \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", KAFKA_SERVER) \
    .option("subscribe", KAFKA_TOPIC) \
    .load()

问题:

  1. 如何转换成foreachRDD spark-streaming-kafka-0-10_2.12?

  2. 如何实现以下功能

    行 = rdd.map(lambda x: x[1]).collect()

我得到以下错误

Traceback (most recent call last):
  File "/home/valor-pgadmin-001/cdc/drive/test.py", line 41, in <module>
    rdd = df.rdd
  File "/usr/local/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/dataframe.py", line 85, in rdd
  File "/usr/local/lib/python3.6/site-packages/pyspark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1305, in __call__
  File "/usr/local/lib/python3.6/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 117, in deco
pyspark.sql.utils.AnalysisException: Queries with streaming sources must be executed with writeStream.start();
kafka
4

0 回答 0