1

执行 DataFlow 管道时,我们偶尔会看到这些异常。我们能对他们做些什么吗?我们有一个非常简单的流程,它从 GCS 中的文件读取并在输入文件中的每行创建一条记录——输入文件中大约有 100 万行。

管道内的数据也会发生什么?是否经过再加工?还是在传输到 BigTable 的过程中丢失了?

(609803d25ddab111): io.grpc.StatusRuntimeException: UNKNOWN
at io.grpc.Status.asRuntimeException(Status.java:428)
at io.grpc.stub.Calls$StreamObserverToCallListenerAdapter.onClose(Calls.java:284) 
at io.grpc.ClientInterceptors$CheckedForwardingCall.start(ClientInterceptors.java:202) 
at com.google.cloud.bigtable.grpc.io.RetryingCall.retryCall(RetryingCall.java:123) 
at com.google.cloud.bigtable.grpc.io.RetryingCall.runCall(RetryingCall.java:110) 
at com.google.cloud.bigtable.grpc.io.RetryingCall.halfClose(RetryingCall.java:100) 
at io.grpc.stub.Calls.asyncServerStreamingCall(Calls.java:178) 
at io.grpc.stub.Calls.asyncServerStreamingCall(Calls.java:166) 
at io.grpc.stub.Calls.asyncUnaryCall(Calls.java:143) 
at com.google.cloud.bigtable.grpc.BigtableDataGrpcClient.listenableAsyncCall(BigtableDataGrpcClient.java:244)
at com.google.cloud.bigtable.grpc.BigtableDataGrpcClient.mutateRowAsync(BigtableDataGrpcClient.java:256) 
at com.google.cloud.bigtable.hbase.BatchExecutor.issuePutRequest(BatchExecutor.java:262) 
at com.google.cloud.bigtable.hbase.BatchExecutor.issueRequest(BatchExecutor.java:300) 
at com.google.cloud.bigtable.hbase.BigtableBufferedMutator.issueRequest(BigtableBufferedMutator.java:365) 
at com.google.cloud.bigtable.hbase.BigtableBufferedMutator.doMutation(BigtableBufferedMutator.java:360) 
at com.google.cloud.bigtable.hbase.BigtableBufferedMutator.mutate(BigtableBufferedMutator.java:335) 
at com.company.HBaseBigtableWriter.processElement(HBaseBigtableWriter.java:70) 
at com.google.cloud.dataflow.sdk.util.DoFnRunner.invokeProcessElement(DoFnRunner.java:189) 
at com.google.cloud.dataflow.sdk.util.DoFnRunner.processElement(DoFnRunner.java:171) 
at com.google.cloud.dataflow.sdk.runners.worker.ParDoFnBase.processElement(ParDoFnBase.java:193) 
at com.google.cloud.dataflow.sdk.util.common.worker.ParDoOperation.process(ParDoOperation.java:52) 
at com.google.cloud.dataflow.sdk.util.common.worker.OutputReceiver.process(OutputReceiver.java:52) 
at com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation.runReadLoop(ReadOperation.java:171) 
at com.google.cloud.dataflow.sdk.util.common.worker.ReadOperation.start(ReadOperation.java:117) 
at com.google.cloud.dataflow.sdk.util.common.worker.MapTaskExecutor.execute(MapTaskExecutor.java:66) 
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.executeWork(DataflowWorker.java:234) 
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.doWork(DataflowWorker.java:171) 
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:137) 
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:147) 
at com.google.cloud.dataflow.sdk.runners.worker.DataflowWorkerHarness$WorkerThread.call(DataflowWorkerHarness.java:132) 
at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 
Caused by: java.net.SocketTimeoutException: connect timed out at java.net.PlainSocketImpl.socketConnect(Native Method) 
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:345) 
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206) 
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188) 
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) 
at java.net.Socket.connect(Socket.java:589) 
at sun.net.NetworkClient.doConnect(NetworkClient.java:175) 
at sun.net.www.http.HttpClient.openServer(HttpClient.java:432) 
at sun.net.www.http.HttpClient.openServer(HttpClient.java:527) 
at sun.net.www.http.HttpClient.<init>(HttpClient.java:211) 
at sun.net.www.http.HttpClient.New(HttpClient.java:308) 
at sun.net.www.http.HttpClient.New(HttpClient.java:326) 
at sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1168) 
at sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1104) 
at sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:998) 
at sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:932) 
at com.google.bigtable.repackaged.com.google.api.client.http.javanet.NetHttpRequest.execute(NetHttpRequest.java:93) 
at com.google.bigtable.repackaged.com.google.api.client.http.HttpRequest.execute(HttpRequest.java:965) 
at com.google.auth.oauth2.ComputeEngineCredentials.refreshAccessToken(ComputeEngineCredentials.java:61) 
at com.google.cloud.bigtable.grpc.io.RefreshingOAuth2CredentialsInterceptor.doRefresh(RefreshingOAuth2CredentialsInterceptor.java:232) 
at com.google.cloud.bigtable.grpc.io.RefreshingOAuth2CredentialsInterceptor.syncRefresh(RefreshingOAuth2CredentialsInterceptor.java:166) 
at com.google.cloud.bigtable.grpc.BigtableSession$7.call(BigtableSession.java:302) 
at com.google.cloud.bigtable.grpc.BigtableSession$7.call(BigtableSession.java:299) ... 4 more

我们能做些什么来强化我们的代码吗?

数据流本身非常简单

Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
options.setMaxNumWorkers(20);

Pipeline p = Pipeline.create(options);

CloudBigtableIO.initializeForWrite(p)
            .apply(TextIO.Read.from(options.getInputFile()))
            .apply(ParDo.of(new HBaseBigtableWriter(options)));
p.run();

ParDo看起来像:

public class HBaseBigtableWriter extends DoFn<String, Void> {
private Connection conn;
private BufferedMutator mutator;
private final CloudBigtableTableConfiguration btConfig;

public HBaseBigtableWriter(CloudBigtableOptions options) {
    this.btConfig = CloudBigtableTableConfiguration.fromCBTOptions(options);

@Override
public void startBundle(DoFn<String, Void>.Context c) throws Exception {
    super.startBundle(c);
    conn = new BigtableConnection(btConfig.toHBaseConfig());
    mutator = conn.getBufferedMutator(TableName.valueOf(btConfig.getTableId()));
}

@Override
public void processElement(DoFn<String, Void>.ProcessContext c)  {
    Put put = Put(....);
    //some of based on the input line.. no sideInputs or anything
    p.addImmutable(...)
    mutator.mutate(put); //mentioned line in stacktrace
} 

@Override
public void finishBundle(DoFn<String, Void>.Context c) throws Exception  {
    try {
        mutator.close();
    } catch (RetriesExhaustedWithDetailsException e) {
        retriesExceptionAggregator.addValue(1);
        List<Throwable> causes = e.getCauses();
        if (causes.size() == 1) {
            throw (Exception) causes.get(0);
        } else {
            throw e;

        }
    }
    finally {
        conn.close();
        super.finishBundle(c);
    }
}
}

这个也是时不时冒出来的。

java.util.concurrent.RejectedExecutionException: Task io.grpc.SerializingExecutor$TaskRunner@5a497f63 rejected from java.util.concurrent.ThreadPoolExecutor@49e90a5c[Shutting down, pool size = 2, active threads = 2, queued tasks = 0, completed tasks = 155291]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
at io.grpc.SerializingExecutor.execute(SerializingExecutor.java:112)
at io.grpc.ChannelImpl$CallImpl$ClientStreamListenerImpl.closed(ChannelImpl.java:398)
at io.grpc.transport.AbstractClientStream.closeListener(AbstractClientStream.java:256)
at io.grpc.transport.AbstractClientStream.transportReportStatus(AbstractClientStream.java:230)
at io.grpc.transport.AbstractClientStream.remoteEndClosed(AbstractClientStream.java:180)
at io.grpc.transport.AbstractStream$1.endOfStream(AbstractStream.java:121)
at io.grpc.transport.MessageDeframer.deliver(MessageDeframer.java:253)
at io.grpc.transport.MessageDeframer.deframe(MessageDeframer.java:168)
at io.grpc.transport.AbstractStream.deframe(AbstractStream.java:285)
at io.grpc.transport.AbstractClientStream.inboundTrailersReceived(AbstractClientStream.java:175)
at io.grpc.transport.Http2ClientStream.transportTrailersReceived(Http2ClientStream.java:162)
at io.grpc.transport.netty.NettyClientStream.transportHeadersReceived(NettyClientStream.java:110)
at io.grpc.transport.netty.NettyClientHandler.onHeadersRead(NettyClientHandler.java:179)
at io.grpc.transport.netty.NettyClientHandler.access$800(NettyClientHandler.java:69)
at io.grpc.transport.netty.NettyClientHandler$LazyFrameListener.onHeadersRead(NettyClientHandler.java:424)
at com.google.bigtable.repackaged.io.netty.handler.codec.http2.DefaultHttp2ConnectionDecoder$FrameReadListener.onHeadersRead(DefaultHttp2ConnectionDecoder.java:316)

此外,对于 Google SDK 类,它看起来也正在发生同样的事情 - 特别是在负载下 - 即数据流作业 2015-09-10_10_26_26-7782438171725519247

(dedc6cc776609500): org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException: Failed 2 actions: StatusRuntimeException: 2 times,
at    com.google.cloud.bigtable.hbase.BigtableBufferedMutator.handleExceptions(BigtableBufferedMutator.java:408) 
at com.google.cloud.bigtable.hbase.BigtableBufferedMutator.doFlush(BigtableBufferedMutator.java:285) 
at com.google.cloud.bigtable.hbase.BigtableBufferedMutator.close(BigtableBufferedMutator.java:258) 
at org.apache.hadoop.hbase.client.AbstractBigtableConnection$2.close(AbstractBigtableConnection.java:181) 
at com.google.cloud.bigtable.dataflow.CloudBigtableIO$CloudBigtableSingleTableWriteFn.finishBundle(CloudBigtableIO.java:613)

对这些例外有什么建议吗?谢谢!

4

1 回答 1

2

关闭连接然后进行突变可能会导致您看到堆栈跟踪(我猜当您在缓冲突变仍在进行时停止工作人员时会发生这种情况)。

你能在我们的 github 问题跟踪器上打开一个错误吗?我认为这可能是诊断此问题的最有效方法。 https://github.com/GoogleCloudPlatform/cloud-bigtable-client/issues

如果我正确读取了堆栈跟踪,看起来您没有利用 CloudBigtableIO.writeToTable() 方法,并且您正在使用自定义 ParDo 来写入数据。如果是这样,那么您问题的答案实际上取决于您在自定义 ParDo 中所做的事情以及“停止工作人员”的动态。

于 2015-09-09T19:36:43.593 回答