18

我想通过 HTTP 协议将我的 DataStream 流的计算结果发送到其他服务。我看到了两种可能的实现方式:

  1. 在接收器中使用同步 Apache HttpClient 客户端
public class SyncHttpSink extends RichSinkFunction<SessionItem> {
    private static final String URL = "http://httpbin.org/post";

    private CloseableHttpClient httpClient;

    private Histogram httpStatusesAccumulator;

    @Override
    public void open(Configuration parameters) throws Exception {
        httpClient = HttpClients.custom()
            .setKeepAliveStrategy(DefaultConnectionKeepAliveStrategy.INSTANCE)
            .build();

        httpStatusesAccumulator = getRuntimeContext().getHistogram("http_statuses");
    }

    @Override
    public void close() throws Exception {
        httpClient.close();

        httpStatusesAccumulator.resetLocal();
    }

    @Override
    public void invoke(SessionItem value) throws Exception {
        List<NameValuePair> params = new ArrayList<>();
        params.add(new BasicNameValuePair("session_uid", value.getSessionUid()));
        params.add(new BasicNameValuePair("traffic", String.valueOf(value.getTraffic())));
        params.add(new BasicNameValuePair("duration", String.valueOf(value.getDuration())));

        UrlEncodedFormEntity entity = new UrlEncodedFormEntity(params, Consts.UTF_8);

        HttpPost httpPost = new HttpPost(URL);
        httpPost.setEntity(entity);

        try(CloseableHttpResponse response = httpClient.execute(httpPost)) {
            int httpStatusCode = response.getStatusLine().getStatusCode();

            httpStatusesAccumulator.add(httpStatusCode);
        }
    }
}
  1. 在接收器中使用异步 Apache HttpAsyncClient 客户端
public class AsyncHttpSink extends RichSinkFunction<SessionItem> {
    private static final String URL = "http://httpbin.org/post";

    private CloseableHttpAsyncClient httpClient;

    private Histogram httpStatusesAccumulator;

    @Override
    public void open(Configuration parameters) throws Exception {
        httpClient = HttpAsyncClients.custom()
                .setKeepAliveStrategy(DefaultConnectionKeepAliveStrategy.INSTANCE)
                .build();
        httpClient.start();

        httpStatusesAccumulator = getRuntimeContext().getHistogram("http_statuses");
    }

    @Override
    public void close() throws Exception {
        httpClient.close();

        httpStatusesAccumulator.resetLocal();
    }

    @Override
    public void invoke(SessionItem value) throws Exception {
        List<NameValuePair> params = new ArrayList<>();
        params.add(new BasicNameValuePair("session_uid", value.getSessionUid()));
        params.add(new BasicNameValuePair("traffic", String.valueOf(value.getTraffic())));
        params.add(new BasicNameValuePair("duration", String.valueOf(value.getDuration())));

        UrlEncodedFormEntity entity = new UrlEncodedFormEntity(params, Consts.UTF_8);

        HttpPost httpPost = new HttpPost(URL);
        httpPost.setEntity(entity);

        httpClient.execute(httpPost, new FutureCallback<HttpResponse>() {
            @Override
            public void completed(HttpResponse response) {
                int httpStatusCode = response.getStatusLine().getStatusCode();

                httpStatusesAccumulator.add(httpStatusCode);
            }

            @Override
            public void failed(Exception ex) {
                httpStatusesAccumulator.add(-1); // -1 - failed
            }

            @Override
            public void cancelled() {
                httpStatusesAccumulator.add(-2); // -2 - cancelled
            }
        });
    }
}

问题:

  1. 我应该在接收器中使用同步还是异步 HTTP 客户端?

  2. 如果我使用同步客户端,它将阻塞接收器,并且通过背压 Flink 将阻塞源。正确的?

  3. 如果我将使用异步客户端,它不会阻止接收器。正确的?

  4. 累加器不是线程安全的?即我可以在异步回调中使用它吗?

  5. RuntimeContext 不是线程安全的?即我可以在异步回调中使用它吗?

4

1 回答 1

16

1. 我应该在接收器中使用同步还是异步 HTTP 客户端?

为了避免由于阻塞 HTTP 调用而造成的背压,我建议使用异步 HTTP 客户端。

2. 如果我使用同步客户端,它会阻塞 sink,通过背压 Flink 会阻塞 source。正确的?

是的,这是正确的。背压将通过您的拓扑传播到源。

3. 如果我使用异步客户端,它不会阻塞接收器。正确的?

这是对的。

4. 累加器不是线程安全的?即我可以在异步回调中使用它吗?

累加器不是线程安全的,因此必须同步对它们的访问。

5. RuntimeContext 不是线程安全的?即我可以在异步回调中使用它吗?

RuntimeContext不是线程安全的,因此必须同步对它的访问。

于 2016-03-29T07:46:19.953 回答