我正在尝试从使用 opencsv 的 CSVWriter 编写的 CSV 文件进行批量处理: CSVWriter writer = new CSVWriter(new FileWriter(filePath+createFileName), ',', CSVWriter.DEFAULT_QUOTE_CHARACTER);
和 BufferedReader 读取写入的文件。CSV文件被写入,我认为读取操作也很顺利。所以,到目前为止它工作得很好。但是,当我选择使用相同操作将特定数据写入 Csv 时,批次的创建就会出错。出现异常,说明“无法解析 CSV。找到未转义的引号。带引号的值应在引号内”,这使应用程序无法以预期的方式运行。
经历此错误后,数据中似乎存在一些“”(双引号)或“(双引号)符号。(我的数据格式为“asdf”、“1.0”、“”、“def ")。据我了解,我尝试应用正则表达式来查找双引号但找不到任何内容,因为在检查文件后它不包含重复的双引号。我遵循的链接是:正则表达式查找和替换 CSV 文件中未转义的非连续双引号
此后在代码中,我使用: File tmpFile = File.createTempFile("bulkAPIInsert", ".csv"); 将数据保存在临时文件中,然后将其删除。
在用以下代码替换上面的代码后,我以某种方式处理了即将到来的异常,但它进一步导致另一个声明“无法解析 CSV。在关闭打开的报价之前达到 EOF”。文件 tmpFile = new File("bulkAPIInsert.csv");
我认为不应遵循上述解决方法,因为这将是应用程序的性能问题。
通过查看 CSVReader 类,我发现定义的自定义异常声明与我得到的完全相同的异常。但我认为它是在某个双引号(CSV 文件的单元格值)中找到双引号时出现的。我将链接称为:https ://github.com/mulesoft/salesforce-connector/blob/master/src/main/java/com/sforce/async/CSVReader.java
任何人都可以建议我在哪里做错或任何解决这个问题的方法吗?
我将代码片段分享为:Method1 然后 Method2 被调用。
Method1: private List<BatchInfo> createBatchesFromCSVFile(RestConnection connection,
JobInfo jobInfo, String csvFileName) throws Exception {
List<BatchInfo> batchInfos = new ArrayList<BatchInfo>();
BufferedReader rdr = new BufferedReader(new InputStreamReader(
new FileInputStream(csvFileName)));
// read the CSV header row
String hdr = rdr.readLine();
byte[] headerBytes = (hdr + "\n").getBytes("UTF-8");
int headerBytesLength = headerBytes.length;
// I was making use of the following code which I replaced with the next line of code.
// File tmpFile = File.createTempFile("bulkAPIInsert", ".csv");
File tmpFile = new File("bulkAPIInsert.csv");
// Split the CSV file into multiple batches
try {
FileOutputStream tmpOut = new FileOutputStream(tmpFile);
int maxBytesPerBatch = 10000000; // 10 million bytes per batch
int maxRowsPerBatch = 10000; // 10 thousand rows per batch
int currentBytes = 0;
int currentLines = 0;
String nextLine;
while ((nextLine = rdr.readLine()) != null) {
byte[] bytes = (nextLine + "\n").getBytes("UTF-8"); //TODO
if (currentBytes + bytes.length > maxBytesPerBatch
|| currentLines > maxRowsPerBatch) {
createBatch(tmpOut, tmpFile, batchInfos, connection, jobInfo);
currentBytes = 0;
currentLines = 0;
}
if (currentBytes == 0) {
tmpOut = new FileOutputStream(tmpFile);
tmpOut.write(headerBytes);
currentBytes = headerBytesLength;
currentLines = 1;
}
tmpOut.write(bytes);
currentBytes += bytes.length;
currentLines++;
}
if (currentLines > 1) {
createBatch(tmpOut, tmpFile, batchInfos, connection, jobInfo);
}
} finally {
if(!tmpFile.delete())
tmpFile.deleteOnExit();
rdr.close();
}
return batchInfos;
}
/**
* Wait for a job to complete by polling the Bulk API.
*/
Method2: private void awaitCompletion(RestConnection connection, JobInfo job,
List<BatchInfo> batchInfoList) throws AsyncApiException {
try{
/****
Some code
**/
BatchInfo[] statusList = connection.getBatchInfoList(job.getId())
.getBatchInfo();
for (BatchInfo b : statusList) {
if (b.getState() == BatchStateEnum.Completed) {
if (incomplete.remove(b.getId()))
//Do Something
}
else if(b.getState() == BatchStateEnum.Failed){
System.out.println("Reason: "+b.getStateMessage()+".\n " +
"Number of Records Processed: "+b.getNumberRecordsProcessed());
throw (new Exception(""));
}
}
}
}catch(Exception ex){log.debug(" Exception occurred.");}
}
BatchInfo 的 getStateMessage() 方法给出了所讨论的错误消息。