1

我无法在 spark 1.6 的 spark 数据帧上使用 concat_list

这是链接!对于原始帖子,我需要在 java SE 7、apache-spark 1.6 中使用相同的代码。

斯卡拉代码

object GroupConcat extends UserDefinedAggregateFunction {
def inputSchema = new StructType().add("x", StringType)
def bufferSchema = new StructType().add("buff", ArrayType(StringType))
def dataType = StringType
def deterministic = true 

def initialize(buffer: MutableAggregationBuffer) = {
  buffer.update(0, ArrayBuffer.empty[String])
}

def update(buffer: MutableAggregationBuffer, input: Row) = {
  if (!input.isNullAt(0)) 
    buffer.update(0, buffer.getSeq[String](0) :+ input.getString(0))
}

def merge(buffer1: MutableAggregationBuffer, buffer2: Row) = {
  buffer1.update(0, buffer1.getSeq[String](0) ++ buffer2.getSeq[String](0))
}

def evaluate(buffer: Row) = UTF8String.fromString(
  buffer.getSeq[String](0).mkString(","))}

Java 代码

如果我错了,请纠正我,

public static class GroupConcat extends UserDefinedAggregateFunction {

     private StructType inputSchema;
        private StructType bufferSchema;

        public GroupConcat() {
          List<StructField> inputFields = new ArrayList<>();
          inputFields.add(DataTypes.createStructField("inputColumn", DataTypes.StringType, true));
          inputSchema = DataTypes.createStructType(inputFields);

          List<StructField> bufferFields = new ArrayList<>();
          bufferFields.add(DataTypes.createStructField("list", DataTypes.createArrayType(DataTypes.StringType), true)); 
          bufferSchema = DataTypes.createStructType(bufferFields);
        }
     // The data type of the returned value
        public DataType dataType() {
          return DataTypes.DoubleType;
        }
    // Whether this function always returns the same output on the identical input
        public boolean deterministic() {
          return true;
        }
        // Data types of input arguments of this aggregate function
        public StructType inputSchema() {
          return inputSchema;
        }
        // Data types of values in the aggregation buffer
        public StructType bufferSchema() {
          return bufferSchema;
        }

        // Initializes the given aggregation buffer. The buffer itself is a `Row` that in addition to
         // standard methods like retrieving a value at an index (e.g., get(), getBoolean()), provides
         // the opportunity to update its values. Note that arrays and maps inside the buffer are still
         // immutable.
         public void initialize(MutableAggregationBuffer buffer) {
           buffer.update(0,null);

         }

         // Updates the given aggregation buffer `buffer` with new input data from `input`
         public void update(MutableAggregationBuffer buffer, Row input) {
           if (!input.isNullAt(0)) {
             //  buffer.update(0, buffer.getSeq[String](0) :+ input.getString(0))
               buffer.update(0, buffer.getSeq(0)+input.getString(0));
           }
         }

         // Merges two aggregation buffers and stores the updated buffer values back to `buffer1`
         public void merge(MutableAggregationBuffer buffer1, Row buffer2) {


           buffer1.update(0,  buffer1.getSeq(0).toString()+buffer2.getSeq(0).toString());
         }

      // Calculates the final result
         public String evaluate(Row buffer) {
         //  return ((double) buffer.getLong(0)) / buffer.getLong(1);
             return  buffer.getSeq(0).mkString(",");
         }


}

谢谢。

4

0 回答 0