我正在尝试从 java 生成一个 Avro 模式来描述我可以通过 JDBC 访问的表。
我使用 JDBC getMetaData() 方法来检索相关的列元数据并将其存储在“columnDetail”对象的数组列表中。
列详细信息定义为
private static class columnDetail {
public String tableName;
public String columnName;
public String dataTypeName;
public int dataTypeId;
public String size;
public String scale;
}
然后我遍历这个数组列表并使用 org.apache.avro.SchemaBuilder 类构建 Avro 模式。
我的问题是十进制逻辑类型。
我遍历数组列表两次。第一次将所有字段添加到 FieldAssembler,第二次修改某些字节字段以添加十进制逻辑数据类型。
我遇到的问题是,如果十进制比例值在迭代之间发生变化,我会收到错误消息。
当它遍历 columnDetail 数组时,只要值“scale”不变,它就会工作。如果它确实发生了变化,则会发生以下情况:
Exception in thread "main" org.apache.avro.AvroRuntimeException: Can't overwrite property: scale
at org.apache.avro.JsonProperties.addProp(JsonProperties.java:187)
at org.apache.avro.Schema.addProp(Schema.java:134)
at org.apache.avro.JsonProperties.addProp(JsonProperties.java:191)
at org.apache.avro.Schema.addProp(Schema.java:139)
at org.apache.avro.LogicalTypes$Decimal.addToSchema(LogicalTypes.java:193)
at GenAvroSchema.main(GenAvroSchema.java:85)
我可以通过硬编码十进制大小来防止这种情况。即我可以替换
org.apache.avro.LogicalTypes.decimal(Integer.parseInt(cd.size),Integer.parseInt(cd.scale)).addToSchema(schema.getField(cd.columnName).schema());
和
org.apache.avro.LogicalTypes.decimal(18,2).addToSchema(schema.getField(cd.columnName).schema());
然而,对于所有十进制字段,这最终会得到相同大小的数据类型,这是不可取的。
有人可以帮忙吗?
Java:1.8.0_202 Avro:avro-1.8.2.jar
我的java代码:
public static void main(String[] args) throws Exception{
String jdbcURL = "jdbc:sforce://login.salesforce.com";
String jdbcUser = "userid";
String jdbcPassword = "password";
String avroDataType = "";
HashMap<String, String> dtmap = new HashMap<String, String>();
dtmap.put("VARCHAR", "string");
dtmap.put("BOOLEAN", "boolean");
dtmap.put("NUMERIC", "bytes");
dtmap.put("INTEGER", "int");
dtmap.put("TIMESTAMP", "string");
dtmap.put("DATE", "string");
ArrayList<columnDetail> columnDetails = new ArrayList<columnDetail>();
columnDetails = populateMetadata(jdbcURL, jdbcUser, jdbcPassword); // This works so have not included code here
SchemaBuilder.FieldAssembler<Schema> fields = SchemaBuilder.builder().record("account").doc("Account Detials").fields() ;
for(columnDetail cd:columnDetails) {
avroDataType = dtmap.get(JDBCType.valueOf(cd.dataTypeId).getName());
switch(avroDataType)
{
case "string":
fields.name(cd.columnName).type().unionOf().nullType().and().stringType().endUnion().nullDefault();
break;
case "int":
fields.name(cd.columnName).type().unionOf().nullType().and().intType().endUnion().nullDefault();
break;
case "boolean":
fields.name(cd.columnName).type().unionOf().booleanType().and().nullType().endUnion().booleanDefault(false);
break;
case "bytes":
if(Integer.parseInt(cd.scale) == 0) {
fields.name(cd.columnName).type().unionOf().nullType().and().longType().endUnion().nullDefault();
} else {
fields.name(cd.columnName).type().bytesType().noDefault();
}
break;
default:
fields.name(cd.columnName).type().unionOf().nullType().and().stringType().endUnion().nullDefault();
break;
}
}
Schema schema = fields.endRecord();
for(columnDetail cd:columnDetails) {
avroDataType = dtmap.get(JDBCType.valueOf(cd.dataTypeId).getName());
if(avroDataType == "bytes" && Integer.parseInt(cd.scale) != 0) {
//org.apache.avro.LogicalTypes.decimal(Integer.parseInt(cd.size),Integer.parseInt(cd.scale)).addToSchema(schema.getField(cd.columnName).schema());
org.apache.avro.LogicalTypes.decimal(18,2).addToSchema(schema.getField(cd.columnName).schema());
}
}
BufferedWriter writer = new BufferedWriter(new FileWriter("./account.avsc"));
writer.write(schema.toString());
writer.close();
}
谢谢,
爱因。