2

现在我正在使用 Apache Kafka 并有任务:我们在目录中有一些 csv 文件,它是一个小批量文件,每个文件大约 25-30 mb。我所需要的-解析文件并将其放入kafka。

正如我所看到的,Kafka 有一些有趣的东西,比如 Connector。

我可以创建 Source-Connector 和 SourceTask,但我不明白一件事:当我处理文件时,如何停止或删除我的任务?

例如我有虚拟连接器:

public class DummySourceConnector extends SourceConnector {
private static final Logger logger = LogManager.getLogger();

@Override
public String version() {
    logger.info("version");

    return "1";
}

@Override
public ConfigDef config() {
    logger.info("config");

    return null;
}

@Override
public Class<? extends Task> taskClass() {
    return DummySourceTask.class;
}

@Override
public void start(Map<String, String> props) {
    logger.info("start {}", props);
}

@Override
public void stop() {
    logger.info("stop");
}

@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
    logger.info("taskConfigs {}", maxTasks);

    return ImmutableList.of(ImmutableMap.of("key", "value"));
}

和任务:

public class DummySourceTask extends SourceTask {
private static final Logger logger = LogManager.getLogger();

private long offset = 0;

@Override
public String version() {
    logger.info("version");

    return "1";
}

@Override
public void start(Map<String, String> props) {
    logger.info("start {}", props);
}


@Override
public List<SourceRecord> poll() throws InterruptedException {
    Thread.sleep(3000);

    final String value = "Offset " + offset++ + " Timestamp " + Instant.now().toString();

    logger.info("poll value {}", value);

    return ImmutableList.of(new SourceRecord(
            ImmutableMap.of("partition", 0),
            ImmutableMap.of("offset", offset),
            "topic-dummy",
            SchemaBuilder.STRING_SCHEMA,
            value
    ));
}

public void stop() {
    logger.info("stop");
}

但是当一切都完成后我怎么能关闭我的任务呢?或者,也许您可​​以帮助我为这项任务提出另一个想法。

感谢您的帮助!

4

2 回答 2

2

首先,我鼓励您在此处查看现有的连接器。我觉得 spooldir 连接器会对您有所帮助。您甚至可以直接下载并安装它,而无需编写任何代码。

其次,如果我理解正确,您想停止一项任务。我相信这个讨论是你想要的。

于 2016-10-06T00:01:54.823 回答
1

当事件发生时终止任务的一个不太优雅的解决方案是检查任务源中的事件并调用 System.exit(1)。

尽管如此,我发现的最优雅的解决方案是:

当事件发生时,连接器任务向代理应用 REST 调用,以停止运行任务的连接器。

为此,任务本身应该知道运行任务的连接器的名称,您可以按照本讨论的步骤找到该名称。

所以连接器的名称在任务的属性参数中,存在一个带有“名称”键的属性,其值是执行任务的连接器的名称(如果发生事件,我们希望停止它)。

最后,我们进行 REST 调用,如果任务停止,我们会得到一个没有内容的 204 应答。

调用的代码是这样的:

 try {

  URL url = new URL("url/" + connectorName);
  HttpURLConnection conn = (HttpURLConnection) url.openConnection();
  conn.setRequestMethod("DELETE");
  conn.setRequestProperty("Accept", "application/json");

  if (conn.getResponseCode() != 204) {
    throw new RuntimeException("Failed : HTTP error code : "
        + conn.getResponseCode());
  }

  BufferedReader br = new BufferedReader(new InputStreamReader(
    (conn.getInputStream())));

  String output;
  System.out.println("Task Stopped \n");
  while ((output = br.readLine()) != null) {
    System.out.println(output);
  }

  conn.disconnect();

  } catch (MalformedURLException e) {

  e.printStackTrace();

  } catch (IOException e) {

  e.printStackTrace();

  }

现在所有连接器任务都停止了。

当然,正如前面提到的,您必须记住每个 SourceTask 和每个 SinkTask 的逻辑是永无止境的如果发生事件,它们应该永远不会停止,而是不断在您提供的文件中搜索新条目。 . 所以通常你用 REST 调用来停止它们,如果你想让它们在事件发生时停止,你可以把 REST 调用放在他们自己的代码中。

于 2019-03-30T17:49:38.253 回答