现在我正在使用 Apache Kafka 并有任务:我们在目录中有一些 csv 文件,它是一个小批量文件,每个文件大约 25-30 mb。我所需要的-解析文件并将其放入kafka。
正如我所看到的,Kafka 有一些有趣的东西,比如 Connector。
我可以创建 Source-Connector 和 SourceTask,但我不明白一件事:当我处理文件时,如何停止或删除我的任务?
例如我有虚拟连接器:
public class DummySourceConnector extends SourceConnector {
private static final Logger logger = LogManager.getLogger();
@Override
public String version() {
logger.info("version");
return "1";
}
@Override
public ConfigDef config() {
logger.info("config");
return null;
}
@Override
public Class<? extends Task> taskClass() {
return DummySourceTask.class;
}
@Override
public void start(Map<String, String> props) {
logger.info("start {}", props);
}
@Override
public void stop() {
logger.info("stop");
}
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
logger.info("taskConfigs {}", maxTasks);
return ImmutableList.of(ImmutableMap.of("key", "value"));
}
和任务:
public class DummySourceTask extends SourceTask {
private static final Logger logger = LogManager.getLogger();
private long offset = 0;
@Override
public String version() {
logger.info("version");
return "1";
}
@Override
public void start(Map<String, String> props) {
logger.info("start {}", props);
}
@Override
public List<SourceRecord> poll() throws InterruptedException {
Thread.sleep(3000);
final String value = "Offset " + offset++ + " Timestamp " + Instant.now().toString();
logger.info("poll value {}", value);
return ImmutableList.of(new SourceRecord(
ImmutableMap.of("partition", 0),
ImmutableMap.of("offset", offset),
"topic-dummy",
SchemaBuilder.STRING_SCHEMA,
value
));
}
public void stop() {
logger.info("stop");
}
但是当一切都完成后我怎么能关闭我的任务呢?或者,也许您可以帮助我为这项任务提出另一个想法。
感谢您的帮助!