问题标签 [apache-samza]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票
1 回答
419 浏览

exception - Samza 发送消息失败。例外

我在 aws emr 实例上使用 samza,我总是有如下异常,有人可以帮助我吗?:

org.apache.samza.SamzaException:发送消息失败。异常:java.lang.IllegalStateException:生产者关闭后无法发送。在 org.apache.samza.system.kafka.KafkaSystemProducer$$anonfun$send$5.apply(KafkaSystemProducer.scala:120) 在 org.apache.samza.system.kafka.KafkaSystemProducer$$anonfun$send$5.apply(KafkaSystemProducer. scala:111) 在 org.apache.samza.system.kafka.KafkaSystemProducer.send(KafkaSystemProducer.scala:86) 在 org.apache.samza.util.ExponentialSleepStrategy.run(ExponentialSleepStrategy.scala:81) 在 org.apache.samza .system.SystemProducers.send(SystemProducers.scala:87) at org.apache.samza.task.TaskInstanceCollector.send(TaskInstanceCollector.scala:61) at org.apache.samza.storage.kv.LoggedStore.putAll(LoggedStore.scala :72) 在 org.apache.samza.storage.kv.SerializedKeyValueStore。

0 投票
1 回答
359 浏览

java - 您可以将序列化消息注入另一个 protobuf 消息吗

我们使用 protobuf 编码消息处理 kafka/samza 作业管道。对于某些数据集,管道可能相当长,我们希望为管道中的每个阶段添加时间戳/id 以监控效率和服务健康状况。

附加信息将被添加到模式中称为接触点的重复字段中。显然,在 java/samza 中解码消息,添加附加消息并再次序列化会产生随消息大小而增加的开销(有些可能会很大,增加反序列化时间),管道的某些部分只是检查消息的过滤器关键,甚至可能根本不需要反序列化,因此这些开销越少越好。

是否可以在不反序列化的情况下将第二条序列化消息注入现有消息,如果是这样,这样做会是非常糟糕的做法(我只能认为会),是否有更好的解决方案不必反序列化/添加/序列化用于监控消息路径/时间流

0 投票
1 回答
542 浏览

apache-kafka - Samza 任务未在一个分区上接收

我的一项 samza 任务有一个令人费解的问题。除了一个分区上的消息外,它可以正常工作。我有 9 个关于该主题的分区。如果我发送 1000 条消息,我只会收到大约 890 条消息。

我已经检查了 kafka-console-consumer 的分区键,我知道我的 samza 作业不会处理这些分区键,并且控制台消费者确实看到了该消息,所以我知道它正在被写入主题,并且至少一个普通消费者可以看看就好了。

我在 samza 上启用了调试日志记录,并且有很多消息org.apache.samza.checkpoint.kafka.KafkaCheckpointManager说:

为 taskName Partition 4 添加检查点 Checkpoint [offsets={SystemStreamPartition [kafka, com.mycompany.indexing.document, 4]=448}]

分区 4 总是显示 448。分区 0 有类似的日志,但在显示 448 的地方,它是一个稳定增长的数字。

我很乐意分享任何有趣的配置信息来帮助缩小范围,但现在,我对我什至会分享什么感到有点迷惑。

我正在运行ThreadJobFactory

  • samza-kafka_2.10 版本 0.9.1

  • 客户端上的kafka_2.10版本0.8.2.1

  • 卡夫卡经纪人 0.9.0.0

更新

我使用相同的分区键查看了上游 samza 作业,发现上游分区 4 存在问题。使用 kafkacat 检查 samza 检查点主题,我看到分区 4 的检查点没有推进。首先我看到:

然后一分钟后我看到:

该数字没有超过 2556。但是,查看resource.mutation分区 4 上的实际主题,最后一个偏移量的范围与其他偏移量相似,截至目前约为 61000 并且还在增长。

根本没有错误消息或警告消息。它只是停止从分区 4 消费。

0 投票
0 回答
1349 浏览

java - 无法终止 YARN 作业

我有一个简单的 Samza 作业,我将其提交给我们的 YARN 集群。该作业分配一个容器并运行没有任何问题。

然而,当尝试终止作业时,AM 和作业容器都在 NM 上运行,即使 RM 声称作业已成功终止:

从 NM 日志中,我可以看到:

状态永远不会转换出来,FINISHING_CONTAINERS_WAIT我不得不进入kill -9容器进程。

我正在使用 Samza 版本0.10.0和 YARN 版本Hadoop 2.6.0-cdh5.4.9

任何想法?

更新:

经过一番挖掘,我可以看到:

0 投票
1 回答
218 浏览

hadoop-yarn - ContainerRequestState [INFO] 队列中没有更多待处理的请求

我正在使用具有 3 个节点的 MapR (YARN) 集群。我正在尝试在集群上部署 6 个 Samza 作业,以对数据流进行一些处理。所有工作都是正确的。我尝试并行部署 2-3 并且它们工作。但是,当我并行部署所有 6 个 Samza 作业时,我会看到以下日志。任务继续运行并且不产生预期的输出数据流。

在此处输入图像描述

我的 ResourceManager Web 仪表板上的节点状态如下 - 在此处输入图像描述

谁能建议如何解决这个问题。我认为也许应用程序没有足够的资源来并行运行所有这些。我可以尝试什么改变?

0 投票
1 回答
126 浏览

hadoop-yarn - 从运行在纱线集群上的 samza 作业加载属性文件时出现问题

我有一个 samza 作业,我正在尝试使用它在纱线集群上运行

./bin/run-job.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory --config-path=file:///home/anshu/samzaJob.properties

使用此配置,作业触发并运行良好。

现在工作开始后,我有一些特定于应用程序的配置(以单独的属性文件的形式),我正在尝试使用 apache commons 配置库加载这些配置。为此,我创建了一个 appconfig 文件夹并尝试读取该文件夹中的所有文件

CONFIGURATION_FILE_PATH = System.getProperty("user.dir") + "/config/appconfig";

这在我的本地机器上工作正常,但是当它在纱线集群上运行时,这解决了

/var/lib/hadoop-yarn/data/samza-yarn/usercache/anshu/appcache/application_1462311090906_0973/container_e19_1462311090906_0973_01_000003/config/appconfig

这是不正确的。

我应该如何找到加载文件的正确路径?或者有没有其他方法可以做到这一点?

0 投票
1 回答
106 浏览

java - 如何在 Samza 工作人员上获取应用程序 ID?

我不需要“容器 ID”或“应用尝试 ID”。

在我看到的文档${samza.log.dir}中,我们可以放置log4j 配置,并且此路径包含应用程序 ID。就像是/foo/log/../application_id_123

0 投票
0 回答
83 浏览

mysql - Spark / Samza / Storm 可以撤销过去的提交并重新生成视图吗?

我刚刚看了Turning the database inside-out并注意到 Samza 和 Redux 之间的相似之处:所有状态都由一个不可变对象流组成。

这让我意识到,如果您事后编辑流,理论上您可以根据新的事务列表重新生成所有物化视图,并且实际上“撤消”过去对数据库的更改。

例如,假设我有以下一系列差异:

经过这一系列的更改,我们的数据库如下所示:

现在如果我想撤消数字“3”怎么办?我们的新差异集将是:

我们的数据库:

虽然这在理论上听起来不错,但实际上可以使用 Samza、Storm 或 Spark 来完成吗?任何事务流数据库都可以做到这一点吗?我对用于管理目的的此类功能感兴趣。我有一些网站,客户不小心删除了员工或修改了他们无意修改的记录。过去,我通过创建一个单独的表来解决这个问题,该表记录了对数据库的所有更改,然后当出现问题时,我可以(手动)查看该表,找出他们做错了什么,并(手动)修复数据。

如果我可以只看一个事务流,删除坏的,然后说“重新生成数据库”,那就更酷了

0 投票
1 回答
107 浏览

java - 我可以将 task.commit.ms 设置为每 1ms 吗?

我有一个 Apache-Samza 项目,但我遇到了重复数据的问题。

这是我的检查点配置:

在文档上我们可以阅读:

如果配置了 task.checkpoint.factory,则此属性确定写入检查点的频率。该值是检查点之间的时间,以毫秒为单位。检查点的频率影响故障恢复:如果容器意外失败(例如由于崩溃或机器故障)并重新启动,它将在最后一个检查点恢复处理。自失败容器上的最后一个检查点以来处理的任何消息都会再次处理。更频繁地检查点会减少可能被处理两次的消息数量,但也会使用更多资源。

那么我可以更改task.commit.ms=20000为 250 毫秒或 1 毫秒吗?这是好还是很坏?我有一个非常好的集群。

为什么我需要改变这个,因为这个 Samza(worker) 每周崩溃 1-3 次。现在临时解决方案是每次提交偏移量。


文档参考:

阿帕奇-萨姆萨

Apache-Samza-配置

0 投票
2 回答
1052 浏览

java - hello-samza demo not compiling

I am trying to follow the hello-samza basic setup and cannot get past "Build a Samza Job Package". As I am running off of the latest I try running gradle as specified:

Then I saw that the bootstrap actually runs that so I thought I could just move to the next step:

Which, after a bunch of output, ultimately fails:

I then ran it with the -X switch and see a bunch of these errors:

I'm not sure how to proceed from here. I know I have managed to get previous versions to work for me. Is it possibly related to a difference between linux (my previous env) and OS X (my current env)?