0

我遵循此说明https://debezium.io/docs/tutorial-for-0-2/。我的 CDC 针对 mysql 事件(创建、更新、删除)运行良好。

我尝试从 pyspark 使用 python 获取这个 kafka 事件,我的代码仍然无法获取该事件。

代码下方:

os.environ["SPARK_HOME"] = "/usr/local/spark"
os.environ["PYSPARK_PYTHON"]="/home/azda/anaconda3/bin/python"
os.environ["PYSPARK_DRIVER_PYTHON"]="/home/azda/anaconda3/bin/python"
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars /usr/local/spark/jars/spark-streaming-kafka-0-8-assembly_2.10-2.2.0.jar pyspark-shell'

if __name__ == '__main__':

     conf = SparkConf().setAppName("DBWatcher").setMaster("local[3]")

     sc = SparkContext(conf=conf)
     sc.setLogLevel("INFO")

     ssc = StreamingContext(sc, 20)

     #Already try use :2181
     kafkaStream = KafkaUtils.createStream(ssc, '10.90.29.24:9092', 'spark-streaming', {'mysql-server-1.inventory.customers': 1})
     print('contexts =================== {} {}')
     lines = kafkaStream.map(lambda x: x[1])
     lines.pprint()

     ssc.start()
     ssc.awaitTermination()

从这段代码中,我得到了以下错误:

2018-11-14 16:22:39 错误执行程序:91 - org.apache.spark.internal.Logging$class.initializeLogIfNecessary(Logging.scala: 99) 在 org.apache.spark.internal.Logging$class.log(Logging.scala:46) 在 org.apache.spark 的 org.apache.spark.streaming.kafka.KafkaReceiver.initializeLogIfNecessary(KafkaInputDStream.scala:68) .streaming.kafka.KafkaReceiver.log(KafkaInputDStream.scala:68) at org.apache.spark.internal.Logging$class.logInfo(Logging.scala:54) at org.apache.spark.streaming.kafka.KafkaReceiver.logInfo (KafkaInputDStream.scala:68) 在 org.apache.spark.streaming.kafka.KafkaReceiver.onStart(KafkaInputDStream.scala:90) 在 org.apache.spark.streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:149) 在org.apache。spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:131) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:600) at org.apache.spark。 streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:590) at org.apache.spark.SparkContext$$anonfun$34.apply(SparkContext.scala:2185) at org.apache.spark.SparkContext$ $anonfun$34.apply(SparkContext.scala:2185) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87) at org.apache.spark.scheduler.Task.run(Task.scala:109)在 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) 在 java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 在 java.util.concurrent.ThreadPoolExecutor$Worker。在 java.lang.Thread.run(Thread.java:748) 2018-11-14 16:22:39 运行(ThreadPoolExecutor.java:624) WARN TaskSetManager:66 - 在阶段 0.0 中丢失任务 0.0(TID 0,本地主机,执行器驱动程序):在 org.apache.spark.streaming.kafka.KafkaReceiver.initializeLogIfNecessary(KafkaInputDStream.scala:68)的 org.apache.spark.internal.Logging$class.initializeLogIfNecessary(Logging.scala:99)的 java.lang.AbstractMethodError ) 在 org.apache.spark.internal.Logging$class.log(Logging.scala:46) 在 org.apache.spark.streaming.kafka.KafkaReceiver.log(KafkaInputDStream.scala:68) 在 org.apache.spark。 internal.Logging$class.logInfo(Logging.scala:54) at org.apache.spark.streaming.kafka.KafkaReceiver.logInfo(KafkaInputDStream.scala:68) at org.apache.spark.streaming.kafka.KafkaReceiver.onStart( KafkaInputDStream.scala:90) 在 org.apache.spark。streaming.receiver.ReceiverSupervisor.startReceiver(ReceiverSupervisor.scala:149) at org.apache.spark.streaming.receiver.ReceiverSupervisor.start(ReceiverSupervisor.scala:131) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$ $anonfun$9.apply(ReceiverTracker.scala:600) at org.apache.spark.streaming.scheduler.ReceiverTracker$ReceiverTrackerEndpoint$$anonfun$9.apply(ReceiverTracker.scala:590) at org.apache.spark.SparkContext$$anonfun $34.apply(SparkContext.scala:2185) at org.apache.spark.SparkContext$$anonfun$34.apply(SparkContext.scala:2185) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)在 org.apache.spark.scheduler.Task.run(Task.scala:109) 在 org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345) 在 java.util.concurrent。ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) 在 java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) 在 java.lang.Thread.run(Thread.java:748)

2018-11-14 16:22:39 错误 TaskSetManager:70 - 阶段 0.0 中的任务 0 失败 1 次;中止作业 2018-11-14 16:22:39 INFO TaskSchedulerImpl:54 - 从池中删除 TaskSet 0.0,其任务已全部完成 2018-11-14 16:22:39 INFO TaskSchedulerImpl:54 - 取消阶段 0 2018-11 -14 16:22:39 INFO DAGScheduler:54 - ResultStage 0(从 NativeMethodAccessorImpl.java:0 开始)在 0.438 秒内失败,原因是作业因阶段失败而中止:阶段 0.0 中的任务 0 失败 1 次,最近一次失败:丢失阶段 0.0 中的任务 0.0(TID 0,本地主机,执行程序驱动程序):org.apache.spark.internal.Logging$class.initializeLogIfNecessary 的 java.lang.AbstractMethodError (Logging.scala:99) at org.apache.spark.streaming。 kafka.KafkaReceiver.initializeLogIfNecessary(KafkaInputDStream.scala:68) 在 org.apache.spark.internal.Logging$class。

任何建议如何获取流数据?此致。

4

0 回答 0