我正在尝试从 kafka 主题读取 dat 并将其写入 HDFS 文件系统,我使用来自 [ https://github.com/apache/apex-malhar/tree/master/examples/kafka]的 apex malhar 构建我的项目。不幸的是,在设置了 kafka 属性和 hadoop 配置后,我的 hdfs 2.6.0 系统中没有创建数据。PS:控制台没有显示任何错误,一切似乎都正常
这是我用于我的应用程序的代码
public class TestConsumer {
public static void main(String[] args) {
Consumer consumerThread = new Consumer(KafkaProperties.TOPIC);
consumerThread.start();
ApplicationTest a = new ApplicationTest();
try {
a.testApplication();
} catch (Exception e) {
e.printStackTrace();
}
}
}
这是来自 apex malhar 的 ApplicationTest 类示例
package org.apache.apex.examples.kafka.kafka2hdfs;
import org.apache.log4j.Logger;
import javax.validation.ConstraintViolationException;
import org.junit.Rule;
import org.apache.apex.malhar.kafka.AbstractKafkaInputOperator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.net.NetUtils;
import com.datatorrent.api.LocalMode;
import info.batey.kafka.unit.KafkaUnitRule;
/**
* Test the DAG declaration in local mode.
*/
public class ApplicationTest
{
private static final Logger LOG = Logger.getLogger(ApplicationTest.class);
private static final String TOPIC = "kafka2hdfs";
private static final int zkPort = NetUtils.getFreeSocketPort();
private static final int brokerPort = NetUtils.getFreeSocketPort();
private static final String BROKER = "localhost:" + brokerPort;
private static final String FILE_NAME = "test";
private static final String FILE_DIR = "./target/tmp/FromKafka";
// broker port must match properties.xml
@Rule
private static KafkaUnitRule kafkaUnitRule = new KafkaUnitRule(zkPort, brokerPort);
public void testApplication() throws Exception
{
try {
// run app asynchronously; terminate after results are checked
LocalMode.Controller lc = asyncRun();
lc.shutdown();
} catch (ConstraintViolationException e) {
LOG.error("constraint violations: " + e.getConstraintViolations());
}
}
private Configuration getConfig()
{
Configuration conf = new Configuration(false);
String pre = "dt.operator.kafkaIn.prop.";
conf.setEnum(pre + "initialOffset", AbstractKafkaInputOperator.InitialOffset.EARLIEST);
conf.setInt(pre + "initialPartitionCount", 1);
conf.set(pre + "topics", TOPIC);
conf.set(pre + "clusters", BROKER);
pre = "dt.operator.fileOut.prop.";
conf.set(pre + "filePath", FILE_DIR);
conf.set(pre + "baseName", FILE_NAME);
conf.setInt(pre + "maxLength", 40);
conf.setInt(pre + "rotationWindows", 3);
return conf;
}
private LocalMode.Controller asyncRun() throws Exception
{
Configuration conf = getConfig();
LocalMode lma = LocalMode.newInstance();
lma.prepareDAG(new KafkaApp(), conf);
LocalMode.Controller lc = lma.getController();
lc.runAsync();
return lc;
}
}