我有一个 Apache Apex 应用程序 DAG,它从队列中读取 RabbitMQ 消息。我应该使用哪个 Apache Apex Malhar 运算符?有几个运算符,但不清楚使用哪一个以及如何使用它。
3 回答
这是主要的操作符代码,其中元组类型是泛型参数,emitTuple() 是子类需要实现的抽象方法。
AbstractSinglePortRabbitMQInputOperator 是一个简单的子类,它提供单个输出端口并使用另一个抽象方法 getTuple() 来实现 emitTuple(),该方法需要在其子类中实现。
Sanjay 指出的测试显示了如何使用这些类。
我在找出如何将消息从 RabbitMQ 读取到 Apache Apex 时也遇到了问题。在提供的 Sanjay 答案链接(https://stackoverflow.com/a/42210636/2350644)的帮助下,我终于设法让它运行起来。以下是它如何协同工作:
1. 设置 RabbitMQ 服务器
这里描述了很多安装 RabbitMQ 的方法:https
://www.rabbitmq.com/download.html 对我来说最简单的方法是使用docker
(参见:https ://store.docker.com/images/rabbitmq )
docker pull rabbitmq
docker run -d --hostname my-rabbit --name some-rabbit -p 5672:5672 -p 15672:15672 rabbitmq:3-management
要检查 RabbitMQ 是否正常工作,请打开浏览器并导航到:http://localhost:15672/
。您应该看到 RabbitMQ 的管理页面。
2.编写生产者程序
要将消息发送到队列,您可以编写一个简单的 JAVA 程序,如下所示:
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.util.ArrayList;
public class Send {
private final static String EXCHANGE = "myExchange";
public static void main(String[] args) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE, BuiltinExchangeType.FANOUT);
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE, "");
List<String> messages = Arrays.asList("Hello", "World", "!");
for (String msg : messages) {
channel.basicPublish(EXCHANGE, "", null, msg.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + msg + "'");
}
channel.close();
connection.close();
}
}
如果您执行 JAVA 程序,您应该会在 RabbitMQ 的管理 UI 中看到一些输出。
3. 实现一个示例 Apex 应用程序
3.1 引导示例 apex 应用程序
按照官方顶点文档http://docs.datatorrent.com/beginner/
3.2 向 pom.xml 添加额外的依赖
要使用通过malhar
添加以下依赖项提供的类:
<dependency>
<groupId>org.apache.apex</groupId>
<artifactId>malhar-contrib</artifactId>
<version>3.7.0</version>
</dependency>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>4.2.0</version>
</dependency>
3.3 创建消费者
我们首先需要创建一个InputOperator
使用来自 RabbitMQ 的可用代码的消息apex-malhar
。
import com.datatorrent.contrib.rabbitmq.AbstractSinglePortRabbitMQInputOperator;
public class MyRabbitMQInputOperator extends AbstractSinglePortRabbitMQInputOperator<String> {
@Override
public String getTuple(byte[] message) {
return new String(message);
}
}
您只需要覆盖该getTuple()
方法。在这种情况下,我们只是返回从 RabbitMQ 接收到的消息。
3.4 设置 Apex DAG
为了测试应用程序,我们只需添加一个InputOperator
(MyRabbitMQInputOperator
我们之前实现的)使用来自 RabbitMQ 的数据和一个ConsoleOutputOperator
打印接收到的消息的应用程序。
import com.rabbitmq.client.BuiltinExchangeType;
import org.apache.hadoop.conf.Configuration;
import com.datatorrent.api.annotation.ApplicationAnnotation;
import com.datatorrent.api.StreamingApplication;
import com.datatorrent.api.DAG;
import com.datatorrent.api.DAG.Locality;
import com.datatorrent.lib.io.ConsoleOutputOperator;
@ApplicationAnnotation(name="MyFirstApplication")
public class Application implements StreamingApplication
{
private final static String EXCHANGE = "myExchange";
@Override
public void populateDAG(DAG dag, Configuration conf)
{
MyRabbitMQInputOperator consumer = dag.addOperator("Consumer", new MyRabbitMQInputOperator());
consumer.setHost("localhost");
consumer.setExchange(EXCHANGE);
consumer.setExchangeType(BuiltinExchangeType.FANOUT.getType());
ConsoleOutputOperator cons = dag.addOperator("console", new ConsoleOutputOperator());
dag.addStream("myStream", consumer.outputPort, cons.input).setLocality(Locality.CONTAINER_LOCAL);
}
}
3.5 测试应用
为了简单地测试创建的应用程序,我们可以编写一个 UnitTest,因此无需设置 Hadoop/YARN 集群。在引导应用程序中已经有一个 UnitTest ApplicationTest.java
,我们可以使用它:
import java.io.IOException;
import javax.validation.ConstraintViolationException;
import org.junit.Assert;
import org.apache.hadoop.conf.Configuration;
import org.junit.Test;
import com.datatorrent.api.LocalMode;
/**
* Test the DAG declaration in local mode.
*/
public class ApplicationTest {
@Test
public void testApplication() throws IOException, Exception {
try {
LocalMode lma = LocalMode.newInstance();
Configuration conf = new Configuration(true);
//conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml"));
lma.prepareDAG(new Application(), conf);
LocalMode.Controller lc = lma.getController();
lc.run(10000); // runs for 10 seconds and quits
} catch (ConstraintViolationException e) {
Assert.fail("constraint violations: " + e.getConstraintViolations());
}
}
}
由于我们不需要此应用程序的任何属性,因此此文件中唯一更改的是取消注释该行:
conf.addResource(this.getClass().getResourceAsStream("/META-INF/properties.xml"));
如果您ApplicationTest.java
使用 Producer 程序执行并发送消息到 RabbitMQ,如 中所述2.
,测试应该输出所有消息。您可能需要增加测试时间才能看到所有消息(目前设置为 10 秒)。