我正在使用示例 Python 脚本将发票或一批发票提取到 CSV 文件中
########### Python Form Recognizer Async Invoice #############
import json
import time
import os
import ntpath
import sys
from requests import get, post
import csv
def analyzeInvoice(filename):
invoiceResultsFilename = filename + ".invoice.json"
# do not run analyze if .invoice.json file is present on disk
if os.path.isfile(invoiceResultsFilename):
with open(invoiceResultsFilename) as json_file:
return json.load(json_file)
# Endpoint URL
endpoint = r"XXXXXXXXX"
apim_key = "XXXXXXXXX"
post_url = endpoint + "/formrecognizer/v2.1/prebuilt/invoice/analyzeResults"
headers = {
# Request headers
'Content-Type': 'application/octet-stream',
'Ocp-Apim-Subscription-Key': apim_key,
}
params = {
"includeTextDetails": True
}
with open(filename, "rb") as f:
data_bytes = f.read()
try:
resp = post(url = post_url, data = data_bytes, headers = headers, params = params)
if resp.status_code != 202:
print("POST analyze failed:\n%s" % resp.text)
return None
print("POST analyze succeeded: %s" % resp.headers["operation-location"])
get_url = resp.headers["operation-location"]
except Exception as e:
print("POST analyze failed:\n%s" % str(e))
return None
n_tries = 50
n_try = 0
wait_sec = 6
while n_try < n_tries:
try:
resp = get(url = get_url, headers = {"Ocp-Apim-Subscription-Key": apim_key})
resp_json = json.loads(resp.text)
if resp.status_code != 200:
print("GET Invoice results failed:\n%s" % resp_json)
return None
status = resp_json["status"]
if status == "succeeded":
print("Invoice analysis succeeded.")
with open(invoiceResultsFilename, 'w') as outfile:
json.dump(resp_json, outfile, indent=4)
return resp_json
if status == "failed":
print("Analysis failed:\n%s" % resp_json)
return None
# Analysis still running. Wait and retry.
time.sleep(wait_sec)
n_try += 1
except Exception as e:
msg = "GET analyze results failed:\n%s" % str(e)
print(msg)
return None
return resp_json
def parseInvoiceResults(resp_json):
docResults = resp_json["analyzeResult"]["documentResults"]
invoiceResult = {}
for docResult in docResults:
for fieldName, fieldValue in sorted(docResult["fields"].items()):
valueFields = list(filter(lambda item: ("value" in item[0]) and ("valueString" not in item[0]), fieldValue.items()))
invoiceResult[fieldName] = fieldValue["text"]
if len(valueFields) == 1:
print("{0:26} : {1:50} NORMALIZED VALUE: {2}".format(fieldName , fieldValue["text"], valueFields[0][1]))
invoiceResult[fieldName + "_normalized"] = valueFields[0][1]
else:
print("{0:26} : {1}".format(fieldName , fieldValue["text"]))
print("")
return invoiceResult
def main(argv):
if (len(argv) != 2):
print("ERROR: Please provide invoice filename or root directory with invoice PDFs/images as an argument to the python script")
return
# list of invoice to analyze
invoiceFiles = []
csvPostfix = '-invoiceResults.csv'
if os.path.isfile(argv[1]):
# Single invoice
invoiceFiles.append(argv[1])
csvFileName = argv[1] + csvPostfix
else:
# Folder of invoices
supportedExt = ['.pdf', '.jpg','.jpeg','.tif','.tiff','.png','.bmp']
invoiceDirectory = argv[1]
csvFileName = os.path.join(invoiceDirectory, os.path.basename(os.path.abspath(invoiceDirectory)) + csvPostfix)
for root, directories, filenames in os.walk(invoiceDirectory):
for invoiceFilename in filenames:
ext = os.path.splitext(invoiceFilename)[-1].lower()
if ext in supportedExt:
fullname = os.path.join(root, invoiceFilename)
invoiceFiles.append(fullname)
with open(csvFileName, mode='w', newline='\n', encoding='utf-8') as csv_file:
fieldnames = ['Filename',
'FullFilename','InvoiceTotal','InvoiceTotal_normalized','AmountDue','AmountDue_normalized','SubTotal','SubTotal_normalized','TotalTax','TotalTax_normalized','CustomerName','VendorName',
'InvoiceId','CustomerId','PurchaseOrder','InvoiceDate','InvoiceDate_normalized','DueDate','DueDate_normalized',
'VendorAddress','VendorAddressRecipient','BillingAddress','BillingAddressRecipient','ShippingAddress','ShippingAddressRecipient','CustomerAddress','CustomerAddressRecipient','ServiceAddress','ServiceAddressRecipient','RemittanceAddress','RemittanceAddressRecipient', 'ServiceStartDate','ServiceStartDate_normalized','ServiceEndDate','ServiceEndDate_normalized','PreviousUnpaidBalance','PreviousUnpaidBalance_normalized']
writer = csv.DictWriter(csv_file, fieldnames=fieldnames)
writer.writeheader()
counter = 0
for invoiceFullFilename in invoiceFiles:
counter = counter + 1
invoiceFilename = ntpath.basename(invoiceFullFilename)
print("----- Processing {0}/{1} : {2} -----".format(counter, len(invoiceFiles),invoiceFullFilename))
resp_json = analyzeInvoice(invoiceFullFilename)
if (resp_json is not None):
invoiceResults = parseInvoiceResults(resp_json)
invoiceResults["FullFilename"] = invoiceFullFilename
invoiceResults["Filename"] = invoiceFilename
writer.writerow(invoiceResults)
if __name__ == '__main__':
main(sys.argv)
当我运行代码时PS C:\Users\absf\Desktop\Projects\Azure_FR\fr_azure> python fr_azure.py "C:\Users\absf\Desktop\Projects\Azure_FR\invoices"
在终端上,它给出以下错误
Traceback (most recent call last):
File "fr_azure.py", line 139, in <module>
main(sys.argv)
File "fr_azure.py", line 133, in main
invoiceResults = parseInvoiceResults(resp_json)
File "fr_azure.py", line 84, in parseInvoiceResults
invoiceResult[fieldName] = fieldValue["text"]
KeyError: 'text'
请帮助解决错误