1

我正在尝试运行JavaKinesisWordCountASL示例。

该示例似乎连接到我的 Kinesis Stream 并从流中获取数据(如下面的日志所示)。但是,Sparks 不会调用示例中传递给 unionStreams.flatMap 方法的调用函数,也不会打印任何字数。

我尝试使用 Java 8 和 Java 7 运行。我在 ubuntu 实例上运行它。相同的示例适用于我的 macbook。

ec2.internal/10.80.91.13:39149],1 条消息待处理 14/11/15 01:59:42 INFO network.ConnectionManager:接受来自 [ip-10-80-91-13.ec2.internal/10.80.91.13 的连接:56700] 14/11/15 01:59:42 WARN storage.BlockManager: Block input-0-1416016781800 已经存在于这台机器上;不重新添加它 14/11/15 01:59:42 INFO receiver.BlockGenerator: Pushed block input-0-1416016781800 14/11/15 01:59:43 INFO storage.MemoryStore: ensureFreeSpace(256) 调用 curMem= 3776,maxMem=938244833 14/11/15 01:59:43 INFO storage.MemoryStore:块输入-0-1416016782800 作为值存储在内存中(估计大小 256.0 B,可用 894.8 MB)14/11/15 01:59: 43 INFO storage.BlockManagerInfo:在 ip-10-80-91-13.ec2.internal:39149 的内存中添加了 input-0-1416016782800(大小:256.0 B,免费:894.8 MB)2015 年 14 月 11 日 01:59: 43 INFO storage.BlockManagerMaster: 块 input-0-1416016782800 的更新信息 14/11/15 01:59:43 WARN storage.BlockManager: Block input-0-1416016782800 已经存在于这台机器上;不重新添加它 14/11/15 01:59:43 INFO receiver.BlockGenerator:推送块 input-0-1416016782800 14/11/15 01:59:44 INFO scheduler.ReceiverTracker:流 0 收到 2 个块 14/11 /15 01:59:44 INFO scheduler.ReceiverTracker:流 1 收到 0 个块 14/11/15 01:59:44 INFO scheduler.JobScheduler:为时间 1416016784000 ms 添加作业 14/11/15 01:59:46 INFO 调度程序.ReceiverTracker:流 0 收到 0 个块 14/11/15 01:59:46 INFO scheduler.ReceiverTracker:流 1 收到 0 个块 14/11/15 01:59:46 INFO scheduler.JobScheduler:为时间 1416016786000 ms 14 添加作业/11/15 01:59:46 INFO impl.CWPublisherRunnable:成功发布 17 个数据。14/11/15 01:59:46 INFO storage.MemoryStore: ensureFreeSpace(248) 调用 curMem=4032, maxMem=938244833 14/11/15 01:59:46 INFO storage.MemoryStore: 块 input-1-1416016786000 存储为内存中的值(估计大小 248.0 B,可用 894.8 MB)14 /11/15 01:59:46 INFO storage.BlockManagerInfo:在 ip-10-80-91-13.ec2.internal:39149 的内存中添加了 input-1-1416016786000(大小:248.0 B,免费:894.8 MB)14 /11/15 01:59:46 INFO storage.BlockManagerMaster:块 input-1-1416016786000 的更新信息 14/11/15 01:59:46 WARN storage.BlockManager:块 input-1-1416016786000 已经存在于这台机器上;不重新添加它 14/11/15 01:59:46 INFO receiver.BlockGenerator:推送块 input-1-1416016786000 14/11/15 01:59:46 INFO impl.CWPublisherRunnable:成功发布了 14 个数据。14/11/15 01:59:48 INFO scheduler.ReceiverTracker:流 0 收到 0 个块 14/11/15 01:59:48 INFO 存储。MemoryStore: ensureFreeSpace(264) 调用 curMem=4280, maxMem=938244833 14/11/15 01:59:48 INFO scheduler.ReceiverTracker: Stream 1 收到 1 个块 14/11/15 01:59:48 INFO storage.MemoryStore:块 input-0-1416016787800 存储为内存中的值(估计大小 264.0 B,可用 894.8 MB)14/11/15 01:59:48 INFO storage.BlockManagerInfo:在 ip-10-80 的内存中添加了 input-0-1416016787800 -91-13.ec2.internal:39149(大小:264.0 B,免费:894.8 MB)14/11/15 01:59:48 INFO storage.BlockManagerMaster:块输入的更新信息-0-1416016787800 14/11/15 01:59:48 INFO scheduler.JobScheduler:为时间 1416016788000 毫秒添加了作业 14/11/15 01:59:48 WARN storage.BlockManager:块输入 0-1416016787800 已经存在于这台机器上;不重新添加它 14/11/15 01:59:48 INFO receiver.BlockGenerator: 推送块 input-0-1416016787800 14/11/15 01:59: 50 INFO scheduler.ReceiverTracker:流 0 收到 1 个块 14/11/15 01:59:50 INFO scheduler.ReceiverTracker:流 1 收到 0 个块 14/11/15 01:59:50 INFO scheduler.JobScheduler:为时间添加作业1416016790000 毫秒 14/11/15 01:59:51 INFO storage.MemoryStore: ensureFreeSpace(264) 调用 curMem=4544, maxMem=938244833 14/11/15 01:59:51 INFO storage.MemoryStore: 块输入-0- 1416016790800 作为值存储在内存中(估计大小 264.0 B,可用 894.8 MB)14/11/15 01:59:51 INFO storage.BlockManagerInfo:在 ip-10-80-91-13 的内存中添加了 input-0-1416016790800。 ec2.internal:39149(大小:264.0 B,免费:894.8 MB)14/11/15 01:59:51 INFO storage.BlockManagerMaster:更新块输入信息 0-1416016790800 14/11/15 01:59:51 WARN storage.BlockManager: 块 input-0-1416016790800 已经存在于这台机器上;没有重新添加它 14/11/15 01:59:

4

2 回答 2

4

这可能与您获得了多少工作线程有关。当我使用 --master local[2] 运行应用程序时,我遇到了同样的问题。我花了很多时间寻找答案,但一无所获。只是出于好奇,我改为 --master local[4] 并且它起作用了。我不知道根本原因。也许更熟悉 Spark 的人可以启发我们。

注意:就我而言,我的 Kinesis 流有两个分片。因此,该应用程序创建了两个输入流,每个分片一个。

于 2015-01-15T23:12:54.910 回答
2

感谢@user3594557 的提示。

https://spark.apache.org/docs/1.1.0/streaming-programming-guide.html#input-dstreams有两个重要说明

如果分配给应用程序的核数小于或等于输入 DStreams/接收器的数量,则系统将接收数据,但无法处理它们。

在本地运行时,如果你的master URL设置为“local”,那么只有一个core可以运行任务。这对于即使只有一个输入 DStream(文件流还可以)的程序来说是不够的,因为接收器将占用该核心并且将没有核心来处理数据。

于 2015-02-17T13:02:34.047 回答