问题标签 [apache-samza]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
exception - Samza 发送消息失败。例外
我在 aws emr 实例上使用 samza,我总是有如下异常,有人可以帮助我吗?:
org.apache.samza.SamzaException:发送消息失败。异常:java.lang.IllegalStateException:生产者关闭后无法发送。在 org.apache.samza.system.kafka.KafkaSystemProducer$$anonfun$send$5.apply(KafkaSystemProducer.scala:120) 在 org.apache.samza.system.kafka.KafkaSystemProducer$$anonfun$send$5.apply(KafkaSystemProducer. scala:111) 在 org.apache.samza.system.kafka.KafkaSystemProducer.send(KafkaSystemProducer.scala:86) 在 org.apache.samza.util.ExponentialSleepStrategy.run(ExponentialSleepStrategy.scala:81) 在 org.apache.samza .system.SystemProducers.send(SystemProducers.scala:87) at org.apache.samza.task.TaskInstanceCollector.send(TaskInstanceCollector.scala:61) at org.apache.samza.storage.kv.LoggedStore.putAll(LoggedStore.scala :72) 在 org.apache.samza.storage.kv.SerializedKeyValueStore。
java - 您可以将序列化消息注入另一个 protobuf 消息吗
我们使用 protobuf 编码消息处理 kafka/samza 作业管道。对于某些数据集,管道可能相当长,我们希望为管道中的每个阶段添加时间戳/id 以监控效率和服务健康状况。
附加信息将被添加到模式中称为接触点的重复字段中。显然,在 java/samza 中解码消息,添加附加消息并再次序列化会产生随消息大小而增加的开销(有些可能会很大,增加反序列化时间),管道的某些部分只是检查消息的过滤器关键,甚至可能根本不需要反序列化,因此这些开销越少越好。
是否可以在不反序列化的情况下将第二条序列化消息注入现有消息,如果是这样,这样做会是非常糟糕的做法(我只能认为会),是否有更好的解决方案不必反序列化/添加/序列化用于监控消息路径/时间流
apache-kafka - Samza 任务未在一个分区上接收
我的一项 samza 任务有一个令人费解的问题。除了一个分区上的消息外,它可以正常工作。我有 9 个关于该主题的分区。如果我发送 1000 条消息,我只会收到大约 890 条消息。
我已经检查了 kafka-console-consumer 的分区键,我知道我的 samza 作业不会处理这些分区键,并且控制台消费者确实看到了该消息,所以我知道它正在被写入主题,并且至少一个普通消费者可以看看就好了。
我在 samza 上启用了调试日志记录,并且有很多消息org.apache.samza.checkpoint.kafka.KafkaCheckpointManager
说:
为 taskName Partition 4 添加检查点 Checkpoint [offsets={SystemStreamPartition [kafka, com.mycompany.indexing.document, 4]=448}]
分区 4 总是显示 448。分区 0 有类似的日志,但在显示 448 的地方,它是一个稳定增长的数字。
我很乐意分享任何有趣的配置信息来帮助缩小范围,但现在,我对我什至会分享什么感到有点迷惑。
我正在运行ThreadJobFactory
:
samza-kafka_2.10 版本 0.9.1
客户端上的kafka_2.10版本0.8.2.1
卡夫卡经纪人 0.9.0.0
更新
我使用相同的分区键查看了上游 samza 作业,发现上游分区 4 存在问题。使用 kafkacat 检查 samza 检查点主题,我看到分区 4 的检查点没有推进。首先我看到:
然后一分钟后我看到:
该数字没有超过 2556。但是,查看resource.mutation
分区 4 上的实际主题,最后一个偏移量的范围与其他偏移量相似,截至目前约为 61000 并且还在增长。
根本没有错误消息或警告消息。它只是停止从分区 4 消费。
java - 无法终止 YARN 作业
我有一个简单的 Samza 作业,我将其提交给我们的 YARN 集群。该作业分配一个容器并运行没有任何问题。
然而,当尝试终止作业时,AM 和作业容器都在 NM 上运行,即使 RM 声称作业已成功终止:
从 NM 日志中,我可以看到:
状态永远不会转换出来,FINISHING_CONTAINERS_WAIT
我不得不进入kill -9
容器进程。
我正在使用 Samza 版本0.10.0
和 YARN 版本Hadoop 2.6.0-cdh5.4.9
。
任何想法?
更新:
经过一番挖掘,我可以看到:
hadoop-yarn - 从运行在纱线集群上的 samza 作业加载属性文件时出现问题
我有一个 samza 作业,我正在尝试使用它在纱线集群上运行
./bin/run-job.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory --config-path=file:///home/anshu/samzaJob.properties
使用此配置,作业触发并运行良好。
现在工作开始后,我有一些特定于应用程序的配置(以单独的属性文件的形式),我正在尝试使用 apache commons 配置库加载这些配置。为此,我创建了一个 appconfig 文件夹并尝试读取该文件夹中的所有文件
CONFIGURATION_FILE_PATH = System.getProperty("user.dir") + "/config/appconfig";
这在我的本地机器上工作正常,但是当它在纱线集群上运行时,这解决了
/var/lib/hadoop-yarn/data/samza-yarn/usercache/anshu/appcache/application_1462311090906_0973/container_e19_1462311090906_0973_01_000003/config/appconfig
这是不正确的。
我应该如何找到加载文件的正确路径?或者有没有其他方法可以做到这一点?
java - 如何在 Samza 工作人员上获取应用程序 ID?
我不需要“容器 ID”或“应用尝试 ID”。
在我看到的文档${samza.log.dir}
中,我们可以放置log4j 配置,并且此路径包含应用程序 ID。就像是/foo/log/../application_id_123
mysql - Spark / Samza / Storm 可以撤销过去的提交并重新生成视图吗?
我刚刚看了Turning the database inside-out并注意到 Samza 和 Redux 之间的相似之处:所有状态都由一个不可变对象流组成。
这让我意识到,如果您事后编辑流,理论上您可以根据新的事务列表重新生成所有物化视图,并且实际上“撤消”过去对数据库的更改。
例如,假设我有以下一系列差异:
经过这一系列的更改,我们的数据库如下所示:
现在如果我想撤消数字“3”怎么办?我们的新差异集将是:
我们的数据库:
虽然这在理论上听起来不错,但实际上可以使用 Samza、Storm 或 Spark 来完成吗?任何事务流数据库都可以做到这一点吗?我对用于管理目的的此类功能感兴趣。我有一些网站,客户不小心删除了员工或修改了他们无意修改的记录。过去,我通过创建一个单独的表来解决这个问题,该表记录了对数据库的所有更改,然后当出现问题时,我可以(手动)查看该表,找出他们做错了什么,并(手动)修复数据。
如果我可以只看一个事务流,删除坏的,然后说“重新生成数据库”,那就更酷了
java - 我可以将 task.commit.ms 设置为每 1ms 吗?
我有一个 Apache-Samza 项目,但我遇到了重复数据的问题。
这是我的检查点配置:
在文档上我们可以阅读:
如果配置了 task.checkpoint.factory,则此属性确定写入检查点的频率。该值是检查点之间的时间,以毫秒为单位。检查点的频率影响故障恢复:如果容器意外失败(例如由于崩溃或机器故障)并重新启动,它将在最后一个检查点恢复处理。自失败容器上的最后一个检查点以来处理的任何消息都会再次处理。更频繁地检查点会减少可能被处理两次的消息数量,但也会使用更多资源。
那么我可以更改task.commit.ms=20000
为 250 毫秒或 1 毫秒吗?这是好还是很坏?我有一个非常好的集群。
为什么我需要改变这个,因为这个 Samza(worker) 每周崩溃 1-3 次。现在临时解决方案是每次提交偏移量。
文档参考:
java - hello-samza demo not compiling
I am trying to follow the hello-samza basic setup and cannot get past "Build a Samza Job Package". As I am running off of the latest I try running gradle as specified:
Then I saw that the bootstrap actually runs that so I thought I could just move to the next step:
Which, after a bunch of output, ultimately fails:
I then ran it with the -X switch and see a bunch of these errors:
I'm not sure how to proceed from here. I know I have managed to get previous versions to work for me. Is it possibly related to a difference between linux (my previous env) and OS X (my current env)?