0

我在内存中创建 PCollection 并将其写入 postgres sql。现在,当我向表中插入数据时,很少有记录可能会抛出异常并且不会被插入。启动管道时如何提取此类失败的插入记录?

下面是我为管道编写的代码:

PipelineOptions options = PipelineOptionsFactory.create();
     
    options.setRunner(FlinkRunner.class);
    Pipeline p = Pipeline.create(options);

    // Preparing dummy data
    Collection<Stock> stockList = Arrays.asList(new Stock("AAP", 2000,"Apple Inc"),
            new Stock("MSF", 3000, "Microsoft Corporation"),
            new Stock("NVDA", 4000, "NVIDIA Corporation"),
            new Stock("INT", 3200, "Intel Corporation"));
    
    // Reading dummy data and save it into PCollection<Stock>
    PCollection<Stock> data = p.apply(Create.of(stockList)
                               .withCoder(SerializableCoder.of(Stock.class)));
    //insert
    @SuppressWarnings("unused")
    PDone insertData =  data.apply(JdbcIO.<Stock>write()
                                
                    .withDataSourceConfiguration(JdbcIO.DataSourceConfiguration
                            .create("org.postgresql.Driver","jdbc:postgresql://localhost:5432/postgres")
                            .withUsername("postgres").withPassword("sachin"))           
                    .withStatement("insert into stocks values(?, ?, ?)")
                    .withPreparedStatementSetter(new JdbcIO.PreparedStatementSetter<Stock>() {
                        private static final long serialVersionUID = 1L;
                        public void setParameters(Stock element, PreparedStatement query) throws SQLException {
                            query.setString(1, element.getSymbol());
                            query.setLong(2, element.getPrice());
                            query.setString(3, element.getCompany());
                        }
        
                    }));
    
    
        p.run().waitUntilFinish();
    
4

1 回答 1

1

在浏览了所有 apache Beam 编程指南之后,我没有得到任何线索,所以,复制了 JdbcIO 并修改了执行批处理,其中我使用 TupleTags 分隔了插入的成功记录和插入失败的记录。现在,它正在工作。

下面是修改后的 JdbcIO 的代码:

  private static class WriteFn<T> extends DoFn<T, T> {
  private static final int DEFAULT_BATCH_SIZE = 1;

  private final Write<T> spec;

  private DataSource dataSource;
  private Connection connection;
  private PreparedStatement preparedStatement;
  **private TupleTag<T> validTupleTag;
  private TupleTag<T> inValidTupleTag;**
  private int batchCount;

  public WriteFn(Write<T> spec) {
    this.spec = spec;
  }

  @Setup
  public void setup() throws Exception {
    dataSource = spec.getDataSourceConfiguration().buildDatasource();
    connection = dataSource.getConnection();
    connection.setAutoCommit(false);
    preparedStatement = connection.prepareStatement(spec.getStatement());
    validTupleTag = spec.getValidTupleTag();
    inValidTupleTag = spec.getInvalidTupleTag();
  }

  @StartBundle
  public void startBundle() {
    batchCount = 0;
  }
  
  @ProcessElement
  public void processElement(@Element T record, MultiOutputReceiver out) 
  throws Exception {
    preparedStatement.clearParameters();
    spec.getPreparedStatementSetter().setParameters(record, 
   preparedStatement);
    preparedStatement.addBatch();

    batchCount++;

    if (batchCount >= DEFAULT_BATCH_SIZE) {
        if (batchCount > 0) {
            try {
            preparedStatement.executeBatch();
            connection.commit();
            **out.get(validTupleTag).output(record);**
        } catch (SQLException e1) {
            //TODO add logger
            **out.get(inValidTupleTag).output(record);**
        }
            batchCount = 0;
          }
    }
  }

和客户端代码:

import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;

import org.apache.beam.runners.flink.FlinkRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult.State;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;

/**
 * @author sachin
 * @date 18-Nov-2021
*/

public class BeamTest {
static List<Stock> stocks = new ArrayList<>();

public static void main(String[] args) {
    System.setProperty("java.specification.version", "1.8");
    process();
    // read();

}

public static void process() {
    final TupleTag<Stock> VALID = new TupleTag<Stock>() {
    };
    final TupleTag<Stock> INVALID = new TupleTag<Stock>() {
    };

    PipelineOptions options = PipelineOptionsFactory.create();

    options.setRunner(FlinkRunner.class);
    Pipeline p = Pipeline.create(options);

    // Preparing dummy data
    Collection<Stock> stockList = Arrays.asList(
            new Stock("AAP", 2000, "Apple Inc"),
            new Stock("MSF", 3000, "Microsoft Corporation"), 
            new Stock("NVDA", 4000, "NVIDIA Corporation"),
            new Stock("INT", 3200, "Intel Corporation"));

    // Reading dummy data and save it into PCollection<Stock>
    PCollection<Stock> data = 




  p.apply(Create.of(stockList).
  withCoder(SerializableCoder.of(Stock.class)));
    // insert
    PCollectionTuple pCollectionTupleResult = data.apply("write", 
 CustomJdbcIOWrite.<Stock>write()

                     
.withDataSourceConfiguration(CustomJdbcIOWrite.DataSourceConfiguration
                    .create("org.postgresql.Driver", 
 "jdbc:postgresql://localhost:5432/postgres")
                    .withUsername("postgres").withPassword("sachin"))
                    .withStatement("insert into stocks values(?, ?, 
 ?)").withValidTag(VALID).withInValidTag(INVALID)
                    .withPreparedStatementSetter(new 
 CustomJdbcIOWrite.PreparedStatementSetter<Stock>() {
                        private static final long serialVersionUID = 1L;
    
                        public void setParameters(Stock element, 
 PreparedStatement query) throws SQLException {
                            query.setString(1, element.getSymbol());
                            query.setLong(2, element.getPrice());
                            query.setString(3, element.getCompany());
                        }

                    }));
    // get failed PCollection using INVALID tupletag
    PCollection<Stock> failedPcollection = 
    pCollectionTupleResult.get(INVALID)
            .setCoder(SerializableCoder.of(Stock.class));
        
    failedPcollection.apply(ParDo.of(new DoFn<Stock, Stock>() {

        private static final long serialVersionUID = 1L;

        @ProcessElement
        public void process(ProcessContext pc) {
            System.out.println("Failed pCollection element:" + 
        pc.element().getCompany());
        }

    }));

    //get failed PCollection using INVALID tupletag
    PCollection<Stock> insertedPcollection = 
    pCollectionTupleResult.get(VALID)
            .setCoder(SerializableCoder.of(Stock.class));
    insertedPcollection.apply(ParDo.of(new DoFn<Stock, Stock>() {

        private static final long serialVersionUID = 1L;
        
        @ProcessElement
        public void process(ProcessContext pc) {
            System.out.println("Inserted pCollection element:" + 
        pc.element().getCompany());
        }
        
    }));
    
    // run pipeline
    State state = p.run().waitUntilFinish();
    System.out.println("Data inserted successfully with state : " + 
    state);

   }

 }

下面是 new Stock("NVDA", 4000, "NVIDIA Corporation") 的输出,因为我的 db 列只接受 3 个字符“NVD”而不是 4 个字符“NVDA”,所以没有插入:

Inserted pCollection element:Microsoft Corporation
Failed pCollection element:NVIDIA Corporation
Inserted pCollection element:Intel Corporation
Inserted pCollection element:Apple Inc
Data inserted successfully with state : DONE

完整详细信息和 github 链接

于 2021-11-24T08:09:40.760 回答