您也可以覆盖 JsonEncoder:
import java.io.{ByteArrayOutputStream, IOException, OutputStream}
import java.nio.ByteBuffer
import java.util
import org.apache.avro.generic.{GenericDatumReader, GenericDatumWriter}
import org.apache.avro.io.{DecoderFactory, ParsingEncoder}
import org.apache.avro.io.parsing.{JsonGrammarGenerator, Parser, Symbol}
import org.apache.avro.util.Utf8
import org.apache.avro.{AvroTypeException, Schema}
import org.codehaus.jackson.util.{DefaultPrettyPrinter, MinimalPrettyPrinter}
import org.codehaus.jackson.{JsonEncoding, JsonFactory, JsonGenerator}
object inJson{
private def toObject(binary: Array[Byte], schemaWriter: Schema)(schemaReader : Schema = schemaWriter) : Object = {
val decoder = DecoderFactory.get().binaryDecoder(binary, null)
val reader = new GenericDatumReader[Object](schemaWriter, schemaReader)
reader.read(null, decoder)
}
private def toJson(`object`: Object,schemaWriter: Schema): String = {
val outputStream: ByteArrayOutputStream = new ByteArrayOutputStream()
try {
val encoder = new JsonEncoder(schemaWriter, outputStream, true)
val writer = new GenericDatumWriter[Object](schemaWriter)
writer.write(`object`, encoder)
encoder.flush()
outputStream.toString
} finally {
outputStream.close()
}
}
def ~>(binary: Array[Byte], schemaWriter: Schema, schemaReader : Schema, schemaJson : Schema) : String = {
Option(toObject(binary, schemaWriter)(schemaReader)).map(toJson(_, schemaJson)).getOrElse("")
}
def ~>(`object`: Object, schemaWriter: Schema, schemaReader : Schema, schemaJson : Schema) : String = {
toJson(`object`, schemaJson)
}
}
object JsonEncoder{
private val LINE_SEPARATOR: String = System.getProperty("line.separator")
// by default, one object per line.
// with pretty option use default pretty printer with root line separator.
@throws[IOException]
private def getJsonGenerator(out: OutputStream, pretty: Boolean): JsonGenerator = {
if (null == out) throw new NullPointerException("OutputStream cannot be null")
val g: JsonGenerator = new JsonFactory().createJsonGenerator(out, JsonEncoding.UTF8)
if (pretty) {
val pp: DefaultPrettyPrinter = new DefaultPrettyPrinter() { //@Override
@throws[IOException]
override def writeRootValueSeparator(jg: JsonGenerator): Unit = {
// jg.writeRaw(LINE_SEPARATOR)
}
}
g.setPrettyPrinter(pp)
}
else {
val pp: MinimalPrettyPrinter = new MinimalPrettyPrinter
pp.setRootValueSeparator(LINE_SEPARATOR)
g.setPrettyPrinter(pp)
}
g
}
}
class JsonEncoder(schema : Schema, out: JsonGenerator) extends ParsingEncoder with Parser.ActionHandler{
val parser = new Parser(new JsonGrammarGenerator().generate(schema), this)
/**
* Has anything been written into the collections?
*/
protected var isEmpty: util.BitSet = new util.BitSet
def this(sc: Schema, out: OutputStream) {
this(sc, JsonEncoder.getJsonGenerator(out, false))
}
def this(sc: Schema, out: OutputStream, pretty: Boolean) {
this(sc, JsonEncoder.getJsonGenerator(out, pretty))
}
@throws[IOException]
override def writeIndex(unionIndex: Int): Unit = {
parser.advance(Symbol.UNION)
val top: Symbol.Alternative = parser.popSymbol.asInstanceOf[Symbol.Alternative]
val symbol: Symbol = top.getSymbol(unionIndex)
parser.pushSymbol(symbol)
}
@throws[IOException]
override def flush(): Unit = {
parser.processImplicitActions()
if (out != null) out.flush()
}
@throws[IOException]
override def writeNull(): Unit = {
parser.advance(Symbol.NULL)
out.writeNull()
}
@throws[IOException]
override def writeBoolean(b: Boolean): Unit = {
parser.advance(Symbol.BOOLEAN)
out.writeBoolean(b)
}
@throws[IOException]
override def writeInt(n: Int): Unit = {
parser.advance(Symbol.INT)
out.writeNumber(n)
}
@throws[IOException]
override def writeLong(n: Long): Unit = {
parser.advance(Symbol.LONG)
out.writeNumber(n)
}
@throws[IOException]
override def writeFloat(f: Float): Unit = {
parser.advance(Symbol.FLOAT)
out.writeNumber(f)
}
@throws[IOException]
override def writeDouble(d: Double): Unit = {
parser.advance(Symbol.DOUBLE)
out.writeNumber(d)
}
@throws[IOException]
override def writeString(utf8: Utf8): Unit = {
writeString(utf8.toString)
}
@throws[IOException]
override def writeString(str: String): Unit = {
parser.advance(Symbol.STRING)
if (parser.topSymbol eq Symbol.MAP_KEY_MARKER) {
parser.advance(Symbol.MAP_KEY_MARKER)
out.writeFieldName(str)
}
else out.writeString(str)
}
@throws[IOException]
override def writeBytes(bytes: ByteBuffer): Unit = {
if (bytes.hasArray) writeBytes(bytes.array, bytes.position, bytes.remaining)
else {
val b: Array[Byte] = new Array[Byte](bytes.remaining)
bytes.duplicate.get(b)
writeBytes(b)
}
}
@throws[IOException]
override def writeBytes(bytes: Array[Byte], start: Int, len: Int): Unit = {
parser.advance(Symbol.BYTES)
writeByteArray(bytes, start, len)
}
@throws[IOException]
private def writeByteArray(bytes: Array[Byte], start: Int, len: Int): Unit = {
out.writeString(new String(bytes, start, len, "ISO-8859-1"))
}
@throws[IOException]
override def writeFixed(bytes: Array[Byte], start: Int, len: Int): Unit = {
parser.advance(Symbol.FIXED)
val top: Symbol.IntCheckAction = parser.popSymbol.asInstanceOf[Symbol.IntCheckAction]
if (len != top.size) throw new AvroTypeException("Incorrect length for fixed binary: expected " + top.size + " but received " + len + " bytes.")
writeByteArray(bytes, start, len)
}
@throws[IOException]
override def writeEnum(e: Int): Unit = {
parser.advance(Symbol.ENUM)
val top: Symbol.EnumLabelsAction = parser.popSymbol.asInstanceOf[Symbol.EnumLabelsAction]
if (e < 0 || e >= top.size) throw new AvroTypeException("Enumeration out of range: max is " + top.size + " but received " + e)
out.writeString(top.getLabel(e))
}
@throws[IOException]
override def writeArrayStart(): Unit = {
parser.advance(Symbol.ARRAY_START)
out.writeStartArray()
push()
isEmpty.set(depth)
}
@throws[IOException]
override def writeArrayEnd(): Unit = {
if (!isEmpty.get(pos)) parser.advance(Symbol.ITEM_END)
pop()
parser.advance(Symbol.ARRAY_END)
out.writeEndArray()
}
@throws[IOException]
override def writeMapStart(): Unit = {
push()
isEmpty.set(depth)
parser.advance(Symbol.MAP_START)
out.writeStartObject()
}
@throws[IOException]
override def writeMapEnd(): Unit = {
if (!isEmpty.get(pos)) parser.advance(Symbol.ITEM_END)
pop()
parser.advance(Symbol.MAP_END)
out.writeEndObject()
}
@throws[IOException]
override def startItem(): Unit = {
if (!isEmpty.get(pos)) parser.advance(Symbol.ITEM_END)
super.startItem()
isEmpty.clear(depth)
}
@throws[IOException]
override def doAction(input: Symbol, top: Symbol): Symbol = {
if (top.isInstanceOf[Symbol.FieldAdjustAction]) {
val fa: Symbol.FieldAdjustAction = top.asInstanceOf[Symbol.FieldAdjustAction]
out.writeFieldName(fa.fname)
}
else if (top eq Symbol.RECORD_START) out.writeStartObject()
else if ((top eq Symbol.RECORD_END) || (top eq Symbol.UNION_END)) out.writeEndObject()
else if (top ne Symbol.FIELD_END) throw new AvroTypeException("Unknown action symbol " + top)
null
}
}