2

我正在使用以下代码将数据填充到 Bigtable 中:

CloudBigtableScanConfiguration config = new CloudBigtableScanConfiguration.Builder()
                .withConfiguration("clusterId", options.getBigTableClusterId())
                .withProjectId(options.getProject())
                .withInstanceId(options.getBigTableInstanceId())
                .withTableId(options.getOutputBTTable())
                .build();
     Pipeline p = Pipeline.create(options);
     /**
      * Read Data from Big Query
      */
     CloudBigtableIO.initializeForWrite(p);
     p.apply(BigQueryIO.Read.fromQuery(getQuery(options.getDate())))
        .apply(ParDo.of(new DoFn<TableRow, Mutation>() {
           public void processElement(ProcessContext c) {
             Mutation output = convertDataToRow(c.element());
             if (output != null) { 
                 c.output(output); 
                 };
           }

           }))
         .apply(CloudBigtableIO.writeToTable(config));
     p.run();

private static Mutation convertDataToRow(TableRow element) {
     LOG.info("element: "+ element);
     if(element.get("BASM_AID") != null){
         Put obj = new Put(getRowKey(element).getBytes()).addColumn(SEGMENT_FAMILY, SEGMENT_COLUMN_NAME, ((String)element.get("BAS_category")).getBytes() );
                obj.addColumn(USER_FAMILY, AID, ((String)element.get("BASM_AID")).getBytes());
         if(element.get("BASM_segment_id") != null){
                obj.addColumn(SEGMENT_FAMILY, SEGMENT_ID, ((String)element.get("BASM_segment_id")).getBytes());
         }
         if(element.get("BAS_sub_category") != null){
                obj.addColumn(SEGMENT_FAMILY, SUB_CATEGORY, ((String)element.get("BAS_sub_category")).getBytes());
         }
         if(element.get("BAS_name") != null){
                obj.addColumn(SEGMENT_FAMILY, NAME, ((String)element.get("BAS_name")).getBytes());
         }
         if(element.get("BAS_description") != null){
                obj.addColumn(SEGMENT_FAMILY, DESCRIPTION, ((String)element.get("BAS_description")).getBytes());
         }
         if(element.get("BASM_krux_user_id") != null){
             obj.addColumn(USER_FAMILY, KRUX_USER_ID, ((String)element.get("BASM_krux_user_id")).getBytes());
         }
         if(element.get("BAS_last_compute_day") != null){
                obj.addColumn(SEGMENT_FAMILY, LAST_COMPUTE_DAY, ((String)element.get("BAS_last_compute_day")).getBytes());
         }
         if(element.get("BAS_type") != null){
                obj.addColumn(SEGMENT_FAMILY, TYPE, ((String)element.get("BAS_type")).getBytes());
         }      
         if(element.get("BASM_REGID") != null){
                obj.addColumn(USER_FAMILY, REGID, ((String)element.get("BASM_REGID")).getBytes() );
         }
        return obj;
     }else{
         return null;
     }
    }

我们有 30 个 Bigtable 节点,我的数据流工作需要 100 个工作人员,整个过程要处理大约 100 亿行数据,在上述配置下,我的工作需要一天多的时间才能完成,这并不理想。

代码级别的任何建议可以让我们更快地运行我们的工作,我知道增加 Bigtable 节点的数量是一种选择,但目前,我正在寻找其他不需要增加节点的选项。

4

2 回答 2

1

你不妨看看这个问题。基于此,您需要查看写入带宽,如果超过 80%,您可能希望减少工作节点的数量,或者要求增加配额并增加集群的大小。

于 2016-08-26T20:05:30.220 回答
0

将大量数据批量加载到空表中(无论是在 Bigtable 还是 HBase 中)都会出现性能问题,除非您提前预先拆分表,因为最初有零个 tablet,所以所有写入都将转到一个单个服务器节点,分布不均。

因此,无论您的集群大小如何,初始批量加载都不会实现高性能,因为它不会被并行化。

要解决此问题,您需要创建一个带有预拆分的表。您可以查看示例,了解如何通过具有 pre-splits 的 HBase shell 在 Cloud Bigtable 中创建表。当我们运行将数据加载到 Bigtable 和 HBase 的基准测试时,我们还预先拆分了表

写入具有大量数据的稳态现有表将有许多平板电脑,在集群上分布良好,因此它会表现良好。但是,如果您要对空表进行批量加载,则必须预先拆分。

于 2016-09-02T22:36:45.970 回答