最近我正在尝试在 Samza 框架上做一些流处理工作。我已经成功部署了 hello-samza 示例。但是,当我尝试编写自己的工作时,我不知道从哪里开始工作。
我已阅读此文档,但我仍然无法理解重点。所以任何人都可以帮助我:
- 我的代码的架构是什么(源代码、lib 代码和配置)。
- 我的代码将推送到哪个目录。
- 我需要做哪些其他工作才能让我的代码运行。
你的建议对我很有帮助,非常感谢!
最近我正在尝试在 Samza 框架上做一些流处理工作。我已经成功部署了 hello-samza 示例。但是,当我尝试编写自己的工作时,我不知道从哪里开始工作。
我已阅读此文档,但我仍然无法理解重点。所以任何人都可以帮助我:
你的建议对我很有帮助,非常感谢!
建立自己的工作非常简单。首先打招呼samza:
git clone https://git.apache.org/samza-hello-samza.git hello-samza
下一步是通过以下命令设置系统:
bin/grid bootstrap
请确保一切顺利jps
下一步是从 pom.xml 中删除 apache-rat-plugin,而不是在 hello-samza 中构建项目。
当您删除时,您可以在 src 文件夹(MyTask.java)中添加一个 java 文件 Job,并且在 config 目录(My.Task.properties)中添加一个 .properties 文件
这是一个空样本作业(MyTask.java)。
package com.samza;
public class MyTask implements StreamTask {
private static final SystemStream OUTPUT_STREAM = new SystemStream("kafka","topicOut");
public void process (IncomingMessageEnvelope envelope, MessageCollector collector,
TaskCoordinator coordinator) throws Exception {
// Do something useful
}
}
不要忘记实现一个 .properties 文件。
如果您有非错误代码,请使用 maven 构建,例如:
mvn clean package
mkdir -p deploy/samza
tar -xvf ./samza-job-package/target/samza-job-package-0.10.0-dist.tar.gz -C deploy/samza
之后,您的服务器已启动(如果不是,您可以启动./bin/grid start all)
您可以通过以下方式部署您的作业deploy/samza/bin/run-job.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory --config-path=file://$PWD/deploy/samza/config/MyTask.properties
并通过 kafka client-consumer 消费结果
deploy/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic outTopic
我通过一个 Maven Eclipse 项目创建了 Samza 作业。版本 0.9.2 的依赖项在 pom.xml 文件中加载了这个内容(我有一些版本问题,所以你可能有一些工作在那里):
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.acio.samza</groupId>
<artifactId>samzafroga</artifactId>
<version>0.0.1</version>
<name>samzafroga</name>
<dependencies>
<dependency>
<groupId>org.apache.samza</groupId>
<artifactId>samza-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.samza</groupId>
<artifactId>samza-core_2.10</artifactId>
</dependency>
<dependency>
<groupId>org.apache.samza</groupId>
<artifactId>samza-log4j</artifactId>
</dependency>
<dependency>
<groupId>org.apache.samza</groupId>
<artifactId>samza-shell</artifactId>
</dependency>
<dependency>
<groupId>org.apache.samza</groupId>
<artifactId>samza-yarn_2.10</artifactId>
<exclusions>
<exclusion>
<artifactId>jdk.tools</artifactId>
<groupId>jdk.tools</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.samza</groupId>
<artifactId>samza-kv_2.10</artifactId>
</dependency>
<dependency>
<groupId>org.apache.samza</groupId>
<artifactId>samza-kv-rocksdb_2.10</artifactId>
</dependency>
<dependency>
<groupId>org.apache.samza</groupId>
<artifactId>samza-kafka_2.10</artifactId>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
</dependency>
<dependency>
<groupId>org.schwering</groupId>
<artifactId>irclib</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-jaxrs</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-server</artifactId>
<version>${jettyVersion}</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-webapp</artifactId>
<version>${jettyVersion}</version>
</dependency>
</dependencies>
<properties>
<!-- maven specific properties -->
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<samza.version>0.8.0</samza.version>
<jettyVersion>7.6.16.v20140903</jettyVersion>
</properties>
<repositories>
<repository>
<id>apache-releases</id>
<url>https://repository.apache.org/content/groups/public</url>
</repository>
<repository>
<id>scala-tools.org</id>
<name>Scala-tools Maven2 Repository</name>
<url>https://oss.sonatype.org/content/groups/scala-tools</url>
</repository>
</repositories>
<pluginRepositories>
<pluginRepository>
<id>scala-tools.org</id>
<name>Scala-tools Maven2 Repository</name>
<url>http://scala-tools.org/repo-releases</url>
</pluginRepository>
</pluginRepositories>
<build>
<pluginManagement>
<plugins>
<plugin>
<groupId>org.apache.rat</groupId>
<artifactId>apache-rat-plugin</artifactId>
<version>0.9</version>
<configuration>
<excludes>
<exclude>*.patch</exclude>
<exclude>**/target/**</exclude>
<exclude>*.json</exclude>
<exclude>.vagrant/**</exclude>
<exclude>.git/**</exclude>
<exclude>*.md</exclude>
<exclude>docs/**</exclude>
<exclude>config/**</exclude>
<exclude>bin/**</exclude>
<exclude>.gitignore</exclude>
<exclude>**/.cache/**</exclude>
<exclude>deploy/**</exclude>
<exclude>**/.project</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</pluginManagement>
<plugins>
<!-- plugin to build the tar.gz file filled with examples -->
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.3</version>
<configuration>
<descriptors>
<descriptor>src/assembly/bin.xml</descriptor>
</descriptors>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.apache.samza</groupId>
<artifactId>samza-api</artifactId>
<version>${samza.version}</version>
</dependency>
<dependency>
<groupId>org.apache.samza</groupId>
<artifactId>samza-core_2.10</artifactId>
<version>${samza.version}</version>
</dependency>
<dependency>
<groupId>org.apache.samza</groupId>
<artifactId>samza-log4j</artifactId>
<version>${samza.version}</version>
</dependency>
<dependency>
<groupId>org.apache.samza</groupId>
<artifactId>samza-shell</artifactId>
<version>${samza.version}</version>
</dependency>
<dependency>
<groupId>org.apache.samza</groupId>
<artifactId>samza-yarn_2.10</artifactId>
<version>${samza.version}</version>
</dependency>
<dependency>
<groupId>org.apache.samza</groupId>
<artifactId>samza-kv_2.10</artifactId>
<version>${samza.version}</version>
</dependency>
<dependency>
<groupId>org.apache.samza</groupId>
<artifactId>samza-kv-rocksdb_2.10</artifactId>
<version>${samza.version}</version>
</dependency>
<dependency>
<groupId>org.apache.samza</groupId>
<artifactId>samza-kafka_2.10</artifactId>
<version>${samza.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.10</artifactId>
<version>0.8.2.0</version>
</dependency>
<dependency>
<groupId>org.schwering</groupId>
<artifactId>irclib</artifactId>
<version>1.10</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.6.2</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.6.2</version>
</dependency>
<dependency>
<groupId>org.codehaus.jackson</groupId>
<artifactId>jackson-jaxrs</artifactId>
<version>1.8.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.6.0</version>
</dependency>
</dependencies>
</dependencyManagement>
</project>
工作的基本代码是这样的:
package xxxx;
import java.util.Map;
import org.apache.samza.config.Config;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.OutgoingMessageEnvelope;
import org.apache.samza.system.SystemStream;
import org.apache.samza.task.MessageCollector;
import org.apache.samza.task.StreamTask;
import org.apache.samza.task.TaskContext;
import org.apache.samza.task.TaskCoordinator;
public class Redirect implements StreamTask {
private final SystemStream OUTPUT_STREAM = new SystemStream("kafka", "samzaout");
public void process(IncomingMessageEnvelope envelope,
MessageCollector collector,
TaskCoordinator coordinator)
{
String msg = (String)envelope.getMessage();
// Transformation
String outmsg = "xxx-" + msg + "-xxx";
collector.send(new OutgoingMessageEnvelope(OUTPUT_STREAM, outmsg));
}
}
编译完成后,您需要将其分组到一个 jar 文件中,并将其放置在所有 samza 节点、web 或 hdfs 都可以访问的位置。
从您必须创建的属性文件中引用它以启动它。在项目网页中查找示例。
如果您按照 Hello Samza 说明进行操作,您将在本地计算机上运行一个功能齐全的 Zookeeper、Kafka 和 Yarn/Samza 集群。通过该项目,您可以运行与维基百科提要相关的任务来测试事物。
然而,和你一样,我在为新任务制定正确的目录结构和构建设置时遇到了一些麻烦(没有集群管理的东西)。因此,我通过删除 hello-samza 之外的新任务不需要的所有内容来创建hello-samza-base。我在自述文件中包含了关于构建新任务的说明。
就部署而言,这有点复杂。阅读有关创建 Zookeeper、Kafka 和 Yarn 集群的信息。
多阅读该文档,多看 hello-samza 示例,如果您将它部署到 YARN,请多阅读有关它的信息。您正在寻找的所有答案都在那里。
hello-samza 中有三个工作。选择一个并按照它,配置,启动脚本等。
这是来自 hello-samza 页面的如何启动 wikipedia-feed 作业
deploy/samza/bin/run-job.sh --config-factory=org.apache.samza.config.factories.PropertiesConfigFactory --config-path=file://$PWD/deploy/samza/config/wikipedia-feed.properties
属性文件显示编译的作业/任务代码在哪里。wikipedia-feed 作业/任务的源代码在这里:
只需修改此工作,或复制和修改,即可开始您的工作。