我正在尝试通过 spark 将数据传输到 kinesis firehose :
我收到类似 UnpicklingError 的错误:NEWOBJ Class argument has NULL tp_new
我正在使用 python 版本:2.10(Anaconda 版本)Spark - 2.4.6-hadoop 版本。我正在使用 Boto3 - 将数据推送到 Amazon Kinesis。
当我尝试使用 python 脚本使用示例数据将数据推送到 kinesis 时 - 它运行良好,我得到如下所示的结果输出..
In [21]: aws_client.put_record_batch(DeliveryStreamName='streamname',Records=Records)
And I am getting a success message that it is pushed to kinesis :
Out[16]:
{u'FailedPutCount': 0,
u'RequestResponses': [{u'RecordId': u'gfdgfdgdtgrt4rt4trfg'},
{u'RecordId': u'hggfhhy7686ghytryhfgfdgdgtrged'}],
'ResponseMetadata': {'HTTPStatusCode': 200,
'RequestId': 'u'ytutujguiuuuiiuuu'}}
http://boto3.readthedocs.org/en/latest/guide/configuration.html http://boto3.readthedocs.org/en/latest/reference/services/kinesis.html#client
同样,如果我尝试使用 spark - (进行处理以丰富来自 twitter 的推文并推送到 kinesis)..
sc = SparkContext(appName="PythonStreamingKafkaWordCount")
ssc = StreamingContext(sc, 1)
zkQuorum, topic = sys.argv[1:]
kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", {topic: 1})
lines = kvs.map(lambda x: x[1])
enrichedData = lines.flatMap(f1)
enrichedData.foreachRDD(lambda rdd: rdd.foreachPartition(sendToKinesis))
# simplejson.l1oad()
ssc.start()
ssc.awaitTermination()
我收到类似 UnpicklingError 的错误:NEWOBJ Class argument has NULL tp_new
谁能帮我弄清楚为什么它不能与 Spark 一起使用?