2

我创建了一个 Beam 脚本来从 kafka 获取数据并使用 Apache Beam 将其推送到 BigQuery。现在我正在使用 java-direct-runner,只需要将数据推送到我的 bigquery。

这是我的代码:-

package com.knoldus.section8;

import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;


import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.bigquery.storage.v1beta1.BigQueryStorageClient;
import com.google.common.collect.Lists;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.WriteResult;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.values.PCollection;
import org.apache.kafka.common.serialization.LongDeserializer;

import java.io.FileInputStream;
import java.io.IOException;
import java.util.ArrayList;

public class KafkaStreaming {

    static GoogleCredentials authExplicit(String jsonPath) throws IOException {
        // You can specify a credential file by providing a path to GoogleCredentials.
        // Otherwise credentials are read from the GOOGLE_APPLICATION_CREDENTIALS environment variable.
        GoogleCredentials credentials = GoogleCredentials.fromStream(new FileInputStream(jsonPath))
                .createScoped(Lists.newArrayList("https://www.googleapis.com/auth/cloud-platform"));

        return credentials;
}
    public static void main(String[] args) throws IOException {


        PipelineOptions options = PipelineOptionsFactory.create();
        GcpOptions gcpOptions = options.as(GcpOptions.class);
        gcpOptions.setProject("excellent-guard-314111");
        gcpOptions.setGcpTempLocation("./");
        System.out.println(gcpOptions.getGcpCredential());
    gcpOptions.setGcpCredential(
        authExplicit(
            "excellent-guard-314111-01f257a67f01.json"));
        Pipeline p = Pipeline.create(options);

        ArrayList<TableFieldSchema> columns = new ArrayList<>();
        columns.add(new TableFieldSchema().setName("deviceID").setType("STRING"));
        columns.add(new TableFieldSchema().setName("name").setType("STRING"));
        columns.add(new TableFieldSchema().setName("description").setType("STRING"));
        columns.add(new TableFieldSchema().setName("eventtime").setType("LONG"));
        columns.add(new TableFieldSchema().setName("temperature").setType("DOUBLE"));
        columns.add(new TableFieldSchema().setName("unit").setType("STRING"));
        TableSchema tblSchema = new TableSchema().setFields(columns);
        PCollection<IotEvent> iotEventPCollection = p.apply(KafkaIO.<Long, IotEvent>read().withBootstrapServers("localhost:9092").withTopic("test-new").withKeyDeserializer(LongDeserializer.class)
                .withValueDeserializer(IotDes.class)
                .withoutMetadata()).apply(Values.<IotEvent>create()).
                apply(ParDo.of(new DoFn<IotEvent, IotEvent>() {
            @ProcessElement
            public void processElement(ProcessContext c) {
                System.out.println(c.element().getDeviceID());
                c.output(c.element());
            }

        }));

    PCollection<TableRow> rowData =
        iotEventPCollection.apply(
            ParDo.of(
                new DoFn<IotEvent, TableRow>() {
                  @ProcessElement
                  public void processElement(ProcessContext c) throws IOException {
                    IotEvent event = c.element();
                    TableRow row = new TableRow();
                      assert event != null;
                      row.set("deviceID", event.getDeviceID());
                    row.set("name", event.getName());
                    row.set("description", event.getDescription());
                      row.set("eventtime", event.getEventtime());
                    row.set("temperature", event.getTemperature());
                    row.set("unit", event.getUnit());
                    System.out.println(row.toPrettyString());
                    c.output(row);
                  }
                }));


        WriteResult writeResult = rowData.apply(BigQueryIO.writeTableRows().to("beam_poc.table_poc").withSchema(tblSchema)
                .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
                        .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));

   

        p.run().waitUntilFinish();


    }
}

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>Beam</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <maven.compiler.source>11</maven.compiler.source>
        <maven.compiler.target>11</maven.compiler.target>
    </properties>

   <dependencies>
       <dependency>
           <groupId>org.apache.beam</groupId>
           <artifactId>beam-sdks-java-core</artifactId>
           <version>2.29.0</version>
       </dependency>
       <dependency>
           <groupId>org.apache.beam</groupId>
           <artifactId>beam-runners-direct-java</artifactId>
           <version>2.29.0</version>
       </dependency>
<!--        https://mvnrepository.com/artifact/org.apache.beam/beam-runners-google-cloud-dataflow-java -->
       <!-- https://mvnrepository.com/artifact/org.apache.beam/beam-runners-google-cloud-dataflow-java -->
<!--       <dependency>-->
<!--           <groupId>org.apache.beam</groupId>-->
<!--           <artifactId>beam-runners-google-cloud-dataflow-java</artifactId>-->
<!--           <version>2.29.0</version>-->
<!--       </dependency>-->

       <!-- https://mvnrepository.com/artifact/org.apache.beam/beam-sdks-java-io-kafka -->
       <dependency>
           <groupId>org.apache.beam</groupId>
           <artifactId>beam-sdks-java-io-kafka</artifactId>
           <version>2.29.0</version>
       </dependency>

       <dependency>
           <groupId>org.apache.kafka</groupId>
           <artifactId>kafka-clients</artifactId>
           <version>2.8.0</version>
       </dependency>
       <!-- https://mvnrepository.com/artifact/com.google.cloud/google-cloud-bigquery -->
       <!-- https://mvnrepository.com/artifact/org.apache.beam/beam-sdks-java-io-google-cloud-platform -->
       <!-- https://mvnrepository.com/artifact/org.apache.beam/beam-sdks-java-io-google-cloud-platform -->
       <dependency>
           <groupId>org.apache.beam</groupId>
           <artifactId>beam-sdks-java-io-google-cloud-platform</artifactId>
           <version>2.29.0</version>
       </dependency>

       <dependency>
           <groupId>org.apache.logging.log4j</groupId>
           <artifactId>log4j-api</artifactId>
           <version>2.14.1</version>
       </dependency>
       <dependency>
           <groupId>org.apache.logging.log4j</groupId>
           <artifactId>log4j-core</artifactId>
           <version>2.14.1</version>
       </dependency>

       <!-- Thanks for using https://jar-download.com -->



   </dependencies>


</project>

我得到的错误: -

2021 年 5 月 19 日晚上 9:21:45 io.grpc.internal.ManagedChannelOrphanWrapper$ManagedChannelReference cleanQueue SEVERE: ~ ~ ~ Channel ManagedChannelImpl{logId=1, target=bigquerystorage.googleapis.com:443} 没有正确关闭!!!~ ~ ~ 确保调用 shutdown()/shutdownNow() 并等待 awaitTermination() 返回 true。java.lang.RuntimeException: ManagedChannel 分配站点

4

2 回答 2

1

已发布修复程序 ( https://issues.apache.org/jira/browse/BEAM-12356 ),只要使用 apache beam 版本 2.31.0 及更高版本,就不会再出现此错误。

于 2021-08-19T06:20:55.240 回答
0

我遇到了同样的问题,并且能够通过降级到 Apache Beam SDK 2.28.0 来解决它(参见这篇文章

于 2021-06-11T05:02:46.293 回答