1

最近我正在编写将数据推送到弹性搜索的映射器。我的输入是 avro 对象,我试图将其转换为 Json。一切都很好,但我在 json 中获取命名空间,其中 elasticsearch 不允许命名空间作为关键对象。

"requestobj":{"com.nw.data.Request":{"event_id":null,"event_epoch":-1,"event_dispatch_epoch":-1,"server_epoch":1471852915279,"date":{"string":"2016-08-22"},"time":{"string":"08:01:55"},"req_source":{"string":"app"},"req_channel":{"string":"Mobile"},"req_dimension":{"string":"1312x704"}

有没有办法排除命名空间 - com.nw.data.Request

我正在使用以下代码将 avro 转换为 json:

public static String getJsonString(GenericRecord record) throws IOException {
        ByteArrayOutputStream os = new ByteArrayOutputStream();
        JsonEncoder encoder = EncoderFactory.get().jsonEncoder(record.getSchema(), os);
        DatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(record.getSchema());
        writer.setSchema(record.getSchema());
        writer.write(record, encoder);
        encoder.flush();
        String jsonString = new String(os.toByteArray(), Charset.forName("UTF-8"));
        os.close();

        return jsonString;
    }
4

2 回答 2

2

您也可以覆盖 JsonEncoder:

    import java.io.{ByteArrayOutputStream, IOException, OutputStream}
    import java.nio.ByteBuffer
    import java.util

    import org.apache.avro.generic.{GenericDatumReader, GenericDatumWriter}
    import org.apache.avro.io.{DecoderFactory, ParsingEncoder}
    import org.apache.avro.io.parsing.{JsonGrammarGenerator, Parser, Symbol}
    import org.apache.avro.util.Utf8
    import org.apache.avro.{AvroTypeException, Schema}
    import org.codehaus.jackson.util.{DefaultPrettyPrinter, MinimalPrettyPrinter}
    import org.codehaus.jackson.{JsonEncoding, JsonFactory, JsonGenerator}


    object inJson{

      private def toObject(binary: Array[Byte], schemaWriter: Schema)(schemaReader : Schema = schemaWriter) : Object = {
        val decoder = DecoderFactory.get().binaryDecoder(binary, null)
        val reader = new GenericDatumReader[Object](schemaWriter, schemaReader)
        reader.read(null, decoder)
      }

      private def toJson(`object`: Object,schemaWriter: Schema): String = {
        val outputStream: ByteArrayOutputStream = new ByteArrayOutputStream()
        try {
          val encoder = new JsonEncoder(schemaWriter, outputStream, true)
          val writer = new GenericDatumWriter[Object](schemaWriter)
          writer.write(`object`, encoder)
          encoder.flush()
          outputStream.toString
        } finally {
          outputStream.close()
        }
      }

      def ~>(binary: Array[Byte], schemaWriter: Schema, schemaReader : Schema, schemaJson : Schema) : String = {
        Option(toObject(binary, schemaWriter)(schemaReader)).map(toJson(_, schemaJson)).getOrElse("")
      }

      def ~>(`object`: Object, schemaWriter: Schema, schemaReader : Schema, schemaJson : Schema) : String = {
        toJson(`object`, schemaJson)
      }
    }

    object JsonEncoder{
      private val LINE_SEPARATOR: String = System.getProperty("line.separator")

      // by default, one object per line.
      // with pretty option use default pretty printer with root line separator.
      @throws[IOException]
      private def getJsonGenerator(out: OutputStream, pretty: Boolean): JsonGenerator = {
        if (null == out) throw new NullPointerException("OutputStream cannot be null")
        val g: JsonGenerator = new JsonFactory().createJsonGenerator(out, JsonEncoding.UTF8)
        if (pretty) {
          val pp: DefaultPrettyPrinter = new DefaultPrettyPrinter() { //@Override
            @throws[IOException]
            override def writeRootValueSeparator(jg: JsonGenerator): Unit = {
           //   jg.writeRaw(LINE_SEPARATOR)
            }
          }
          g.setPrettyPrinter(pp)
        }
        else {
          val pp: MinimalPrettyPrinter = new MinimalPrettyPrinter
          pp.setRootValueSeparator(LINE_SEPARATOR)
          g.setPrettyPrinter(pp)
        }
        g
      }
    }

    class JsonEncoder(schema : Schema, out: JsonGenerator) extends ParsingEncoder with Parser.ActionHandler{

      val parser = new Parser(new JsonGrammarGenerator().generate(schema), this)
      /**
        * Has anything been written into the collections?
        */
      protected var isEmpty: util.BitSet = new util.BitSet

      def this(sc: Schema, out: OutputStream) {
        this(sc, JsonEncoder.getJsonGenerator(out, false))
      }

      def this(sc: Schema, out: OutputStream, pretty: Boolean) {
        this(sc, JsonEncoder.getJsonGenerator(out, pretty))
      }


      @throws[IOException]
      override def writeIndex(unionIndex: Int): Unit = {
        parser.advance(Symbol.UNION)
        val top: Symbol.Alternative = parser.popSymbol.asInstanceOf[Symbol.Alternative]
        val symbol: Symbol = top.getSymbol(unionIndex)
        parser.pushSymbol(symbol)
      }

      @throws[IOException]
      override def flush(): Unit = {
        parser.processImplicitActions()
        if (out != null) out.flush()
      }

      @throws[IOException]
      override def writeNull(): Unit = {
        parser.advance(Symbol.NULL)
        out.writeNull()
      }

      @throws[IOException]
      override def writeBoolean(b: Boolean): Unit = {
        parser.advance(Symbol.BOOLEAN)
        out.writeBoolean(b)
      }

      @throws[IOException]
      override def writeInt(n: Int): Unit = {
        parser.advance(Symbol.INT)
        out.writeNumber(n)
      }

      @throws[IOException]
      override def writeLong(n: Long): Unit = {
        parser.advance(Symbol.LONG)
        out.writeNumber(n)
      }

      @throws[IOException]
      override def writeFloat(f: Float): Unit = {
        parser.advance(Symbol.FLOAT)
        out.writeNumber(f)
      }

      @throws[IOException]
      override def writeDouble(d: Double): Unit = {
        parser.advance(Symbol.DOUBLE)
        out.writeNumber(d)
      }

      @throws[IOException]
      override def writeString(utf8: Utf8): Unit = {
        writeString(utf8.toString)
      }

      @throws[IOException]
      override def writeString(str: String): Unit = {
        parser.advance(Symbol.STRING)
        if (parser.topSymbol eq Symbol.MAP_KEY_MARKER) {
          parser.advance(Symbol.MAP_KEY_MARKER)
          out.writeFieldName(str)
        }
        else out.writeString(str)
      }

      @throws[IOException]
      override def writeBytes(bytes: ByteBuffer): Unit = {
        if (bytes.hasArray) writeBytes(bytes.array, bytes.position, bytes.remaining)
        else {
          val b: Array[Byte] = new Array[Byte](bytes.remaining)
          bytes.duplicate.get(b)
          writeBytes(b)
        }
      }

      @throws[IOException]
      override def writeBytes(bytes: Array[Byte], start: Int, len: Int): Unit = {
        parser.advance(Symbol.BYTES)
        writeByteArray(bytes, start, len)
      }

      @throws[IOException]
      private def writeByteArray(bytes: Array[Byte], start: Int, len: Int): Unit = {
        out.writeString(new String(bytes, start, len, "ISO-8859-1"))
      }

      @throws[IOException]
      override def writeFixed(bytes: Array[Byte], start: Int, len: Int): Unit = {
        parser.advance(Symbol.FIXED)
        val top: Symbol.IntCheckAction = parser.popSymbol.asInstanceOf[Symbol.IntCheckAction]
        if (len != top.size) throw new AvroTypeException("Incorrect length for fixed binary: expected " + top.size + " but received " + len + " bytes.")
        writeByteArray(bytes, start, len)
      }

      @throws[IOException]
      override def writeEnum(e: Int): Unit = {
        parser.advance(Symbol.ENUM)
        val top: Symbol.EnumLabelsAction = parser.popSymbol.asInstanceOf[Symbol.EnumLabelsAction]
        if (e < 0 || e >= top.size) throw new AvroTypeException("Enumeration out of range: max is " + top.size + " but received " + e)
        out.writeString(top.getLabel(e))
      }

      @throws[IOException]
      override def writeArrayStart(): Unit = {
        parser.advance(Symbol.ARRAY_START)
        out.writeStartArray()
        push()
        isEmpty.set(depth)
      }

      @throws[IOException]
      override def writeArrayEnd(): Unit = {
        if (!isEmpty.get(pos)) parser.advance(Symbol.ITEM_END)
        pop()
        parser.advance(Symbol.ARRAY_END)
        out.writeEndArray()
      }

      @throws[IOException]
      override def writeMapStart(): Unit = {
        push()
        isEmpty.set(depth)
        parser.advance(Symbol.MAP_START)
        out.writeStartObject()
      }

      @throws[IOException]
      override def writeMapEnd(): Unit = {
        if (!isEmpty.get(pos)) parser.advance(Symbol.ITEM_END)
        pop()
        parser.advance(Symbol.MAP_END)
        out.writeEndObject()
      }

      @throws[IOException]
      override def startItem(): Unit = {
        if (!isEmpty.get(pos)) parser.advance(Symbol.ITEM_END)
        super.startItem()
        isEmpty.clear(depth)
      }



      @throws[IOException]
      override def doAction(input: Symbol, top: Symbol): Symbol = {
        if (top.isInstanceOf[Symbol.FieldAdjustAction]) {
          val fa: Symbol.FieldAdjustAction = top.asInstanceOf[Symbol.FieldAdjustAction]
          out.writeFieldName(fa.fname)
        }
        else if (top eq Symbol.RECORD_START) out.writeStartObject()
        else if ((top eq Symbol.RECORD_END) || (top eq Symbol.UNION_END)) out.writeEndObject()
        else if (top ne Symbol.FIELD_END) throw new AvroTypeException("Unknown action symbol " + top)
        null
      }
    }
于 2017-06-29T07:40:37.563 回答
1

除了提到的 Stéphane 之外,到目前为止,唯一的方法是创建自己的JsonEncoder. 有趣的是,您真正需要更改的唯一方法是writeIndex当联合记录与 NULL 不同时,谁在附加联合记录。这是我的代码:

public class CustomJsonEncoder extends ParsingEncoder implements Parser.ActionHandler {
    private static final String LINE_SEPARATOR = System.getProperty("line.separator");
    final Parser parser;
    private JsonGenerator out;
    protected BitSet isEmpty = new BitSet();

    public CustomJsonEncoder(Schema sc, OutputStream out) throws IOException {
        this(sc, getJsonGenerator(out, false));
    }

    public CustomJsonEncoder(Schema sc, OutputStream out, boolean pretty) throws IOException {
        this(sc, getJsonGenerator(out, pretty));
    }

    public CustomJsonEncoder(Schema sc, JsonGenerator out) throws IOException {
        configure(out);
        this.parser = new Parser(new JsonGrammarGenerator().generate(sc), this);
    }

    @Override
    public void flush() throws IOException {
        parser.processImplicitActions();
        if (out != null) {
            out.flush();
        }
    }

    private static JsonGenerator getJsonGenerator(OutputStream out, boolean pretty) throws IOException {
        if (null == out)
            throw new NullPointerException("OutputStream cannot be null");
        JsonGenerator g = new JsonFactory().createGenerator(out, JsonEncoding.UTF8);
        if (pretty) {
            DefaultPrettyPrinter pp = new DefaultPrettyPrinter() {
                @Override
                public void writeRootValueSeparator(JsonGenerator jg) throws IOException {
                    jg.writeRaw(LINE_SEPARATOR);
                }
            };
            g.setPrettyPrinter(pp);
        } else {
            MinimalPrettyPrinter pp = new MinimalPrettyPrinter();
            pp.setRootValueSeparator(LINE_SEPARATOR);
            g.setPrettyPrinter(pp);
        }
        return g;
    }

    public CustomJsonEncoder configure(OutputStream out) throws IOException {
        this.configure(getJsonGenerator(out, false));
        return this;
    }

    private CustomJsonEncoder configure(JsonGenerator generator) throws IOException {
        if (null == generator)
            throw new NullPointerException("JsonGenerator cannot be null");
        if (null != parser) {
            flush();
        }
        this.out = generator;
        return this;
    }

    @Override
    public void writeNull() throws IOException {
        parser.advance(Symbol.NULL);
        out.writeNull();
    }

    @Override
    public void writeBoolean(boolean b) throws IOException {
        parser.advance(Symbol.BOOLEAN);
        out.writeBoolean(b);
    }

    @Override
    public void writeInt(int n) throws IOException {
        parser.advance(Symbol.INT);
        out.writeNumber(n);
    }

    @Override
    public void writeLong(long n) throws IOException {
        parser.advance(Symbol.LONG);
        out.writeNumber(n);
    }

    @Override
    public void writeFloat(float f) throws IOException {
        parser.advance(Symbol.FLOAT);
        out.writeNumber(f);
    }

    @Override
    public void writeDouble(double d) throws IOException {
        parser.advance(Symbol.DOUBLE);
        out.writeNumber(d);
    }

    @Override
    public void writeString(Utf8 utf8) throws IOException {
        writeString(utf8.toString());
    }

    @Override
    public void writeString(String str) throws IOException {
        parser.advance(Symbol.STRING);
        if (parser.topSymbol() == Symbol.MAP_KEY_MARKER) {
            parser.advance(Symbol.MAP_KEY_MARKER);
            out.writeFieldName(str);
        } else {
            out.writeString(str);
        }
    }

    @Override
    public void writeBytes(ByteBuffer bytes) throws IOException {
        if (bytes.hasArray()) {
            writeBytes(bytes.array(), bytes.position(), bytes.remaining());
        } else {
            byte[] b = new byte[bytes.remaining()];
            bytes.duplicate().get(b);
            writeBytes(b);
        }
    }

    @Override
    public void writeBytes(byte[] bytes, int start, int len) throws IOException {
        parser.advance(Symbol.BYTES);
        writeByteArray(bytes, start, len);
    }

    private void writeByteArray(byte[] bytes, int start, int len) throws IOException {
        out.writeString(new String(bytes, start, len, StandardCharsets.ISO_8859_1));
    }

    @Override
    public void writeFixed(byte[] bytes, int start, int len) throws IOException {
        parser.advance(Symbol.FIXED);
        Symbol.IntCheckAction top = (Symbol.IntCheckAction) parser.popSymbol();
        if (len != top.size) {
            throw new AvroTypeException(
                    "Incorrect length for fixed binary: expected " + top.size + " but received " + len + " bytes.");
        }
        writeByteArray(bytes, start, len);
    }

    @Override
    public void writeEnum(int e) throws IOException {
        parser.advance(Symbol.ENUM);
        Symbol.EnumLabelsAction top = (Symbol.EnumLabelsAction) parser.popSymbol();
        if (e < 0 || e >= top.size) {
            throw new AvroTypeException("Enumeration out of range: max is " + top.size + " but received " + e);
        }
        out.writeString(top.getLabel(e));
    }

    @Override
    public void writeArrayStart() throws IOException {
        parser.advance(Symbol.ARRAY_START);
        out.writeStartArray();
        push();
        isEmpty.set(depth());
    }

    @Override
    public void writeArrayEnd() throws IOException {
        if (!isEmpty.get(pos)) {
            parser.advance(Symbol.ITEM_END);
        }
        pop();
        parser.advance(Symbol.ARRAY_END);
        out.writeEndArray();
    }

    @Override
    public void writeMapStart() throws IOException {
        push();
        isEmpty.set(depth());

        parser.advance(Symbol.MAP_START);
        out.writeStartObject();
    }

    @Override
    public void writeMapEnd() throws IOException {
        if (!isEmpty.get(pos)) {
            parser.advance(Symbol.ITEM_END);
        }
        pop();

        parser.advance(Symbol.MAP_END);
        out.writeEndObject();
    }

    @Override
    public void startItem() throws IOException {
        if (!isEmpty.get(pos)) {
            parser.advance(Symbol.ITEM_END);
        }
        super.startItem();
        isEmpty.clear(depth());
    }

    @Override
    public void writeIndex(int unionIndex) throws IOException {
        parser.advance(Symbol.UNION);
        Symbol.Alternative top = (Symbol.Alternative) parser.popSymbol();
        Symbol symbol = top.getSymbol(unionIndex);
        parser.pushSymbol(symbol);
    }

    @Override
    public Symbol doAction(Symbol input, Symbol top) throws IOException {
        if (top instanceof Symbol.FieldAdjustAction) {
            Symbol.FieldAdjustAction fa = (Symbol.FieldAdjustAction) top;
            out.writeFieldName(fa.fname);
        } else if (top == Symbol.RECORD_START) {
            out.writeStartObject();
        } else if (top == Symbol.RECORD_END || top == Symbol.UNION_END) {
            out.writeEndObject();
        } else if (top != Symbol.FIELD_END) {
            throw new AvroTypeException("Unknown action symbol " + top);
        }
        return null;
    }
}

更新

我创建了一个 PR,在 JsonEncoder 类中添加了这个特性。但让我们看看他们是否接受。

https://github.com/apache/avro/pull/508

于 2019-04-24T08:48:18.717 回答