0

I have written below code to connect to kinesis from spark streaming but there is no data been received.

val kinesisStream = KinesisUtils.createStream(ssc, appName, streamName, endpointUrl, regionName, InitialPositionInStream.LATEST, batchInterval , StorageLevel.MEMORY_AND_DISK_2)

kinesisStream.print() // nothing getting printed here 

val data = kinesisStream.flatMap(byteArray => new String(byteArray))

data.foreachRDD { rdd =>          
      println("data==" + rdd.collect().length) // no data here too
      rdd.collect()//.saveAsTextFile("file:///home/myHome/Code/sample/somedata.txt");          
    }

I tried to write into S3 and to file system, it writes file name by folder and in side that I see only _SUCCESS file which is of zero byte.

by the way, I can able to write to same kinesis stream and read data from java

what is the issue here.

4

1 回答 1

0

我得到了这个问题的解决方案。

代码可以从 kinesis 中提取数据。连同数据,它还生成了许多零字节部分文件。因为它的流应用程序数据部分文件是在给定的时间间隔内生成的,因此如果该时间间隔内的数据不可用,它会生成零字节文件。

添加检查删除DF代码中的空部分文件,以便DF只能写入有数据的部分文件。

我们在此更改后开始获取数据。

于 2017-03-14T11:17:53.810 回答