我最近使用 Postman 将联系人记录上传到 HubSpot。这是我用来成功上传联系人的原始 JSON 数据示例和 POST 方法:
https://api.hubapi.com/crm/v3/objects/contacts?hapikey={{hapikey}}
{properties": {
"smbapi": "yes",
"email": "fcgrinding@junkstermail.com",
"business_name":"Forest City Grinding Inc",
"srvc_address_1":"3544 Stenstrom Rd",
"srvc_city_1":"",
"srvc_state_1":"IL",
"srvc_zip_1":"61109",
"proposal_date":"2021-12-07",
"proposal_start_date": "2022-01-01",
"udc_code_1": "COMED",
"eog":"electric",
"fixedprice_1_gas_mcf": 6.63,
"fixedprice_2_gas_mcf": 6.11,
"fixedprice_3_gas_mcf": 5.9,
"term_1": 12,
"term_2": 24,
"term_3": 36,
"smb_bdm_name": "Timothy Chin",
"smb_bdm_phone": "833-999-9999",
"smb_bdm_email": "tim.chin@junkstermail.com"
}
}
接下来,我创建了一个 python lambda 函数来自动化这个过程,因为我们想要提取可能有很多记录要提取的 CSV 文件。因此,我构建了字典,使其看起来与上面的字符串相同,后者在 Postman 中工作得很好/很好。但是,当我尝试使用我的字典有效负载对 HubSpot 执行 Post 方法 API 调用时,我收到此错误:
输入 JSON 无效:无法构建 ObjectSchemaEgg,第 1 行第 2 列的输入 JSON 无效:无法构建 ObjectSchemaEgg,未设置某些必需属性 [名称,标签]
这是我的代码为 API 调用构建的已处理字典字符串:
{'properties': '{"smbapi": "yes", "business_name": "Forest City Grinding Inc", "srvc_address_1": "4844 Stenstrom Rd", "srvc_state_1": "IL", "srvc_zip_1": "61109", "proposal_date": "2021-12-07", "proposal_start_date": "2022-01-01", "udc_code_1": "COMED", "fixedprice_1": "6.63", "fixedprice_2": "6.11", "fixedprice_3": "5.9", "term_2": "24", "term_3": "36", "smb_bdm_name": "Gary Wysong", "smb_bdm_phone": "833-389-0881", "smb_bdm_email": "gary.wysong@constellation.com"}'}
这是我的完整 Lambda 代码(特别注意对 post_to_hubspot() 的调用以及 post_to_hubspot() 函数本身)。加载发电机表的代码工作正常。:
import boto3
import json
import decimal
from botocore.exceptions import ClientError
from boto3.dynamodb.conditions import Key, Attr
import re
import pandas as pd
import numpy as np
import os
import datetime
from os import urandom
import email
import base64
import requests
from datetime import datetime, timedelta, timezone
import mailparser
import calendar
global payload_data
landing_zone_bucket_name = str(os.environ['BUCKETNAME'])
s3 = boto3.resource('s3')
landing_zone_bucket = s3.Bucket(landing_zone_bucket_name )
s3r = boto3.client('s3')
dynamodb = boto3.resource('dynamodb', region_name='us-west-2')
table = dynamodb.Table(str(os.environ['DYNAMOTABLE']))
unprocessed_records_table = dynamodb.Table(str(os.environ['UNPROCESSEDTABLE']))
email_table = dynamodb.Table(str(os.environ['EMAILSTATUSTABLE']))
endpoint_url=os.environ['ENDPOINT_URL']
access_key = os.environ['ACCESSKEY']
now = datetime.now()
today_date = datetime.strftime(now,'%d')
today_month = datetime.strftime(now,'%m')
today_year = datetime.strftime(now,'%Y')
time_stamp = datetime.now().strftime('%Y%m%d%H%M%S')
payload_data = {}
#WRITE RECORDS TO DYNAMO
def dynamoPut(dObj,table_name=None):
try:
for each in list(dObj['Information']):
if dObj['Information'][each]:
dObj['Information'][each] = str(dObj['Information'][each])
else:
del dObj['Information'][each]
dObj['Month'] = today_month
dObj['Year'] = today_year
dObj['Day'] = today_date
for each in list(dObj):
if dObj[each] != '':
dObj[each] = dObj[each]
else:
del dObj[each]
if table_name != None:
response = unprocessed_records_table.put_item(Item = dObj)
else:
response = table.put_item(Item = dObj)
if response['ResponseMetadata']['HTTPStatusCode'] == 200:
return True
else:
return False
except Exception as e:
print(e)
return False
def dynamoPutFileName(filename,source_type):
try:
dObj = {}
dObj['id'] = urandom(20).hex()
dObj['CreatedAt'] = str(datetime.now())
dObj['FileName'] = filename
dObj['Type'] = source_type
dObj['EmailSent'] = False
response = email_table.put_item(Item = dObj)
if response['ResponseMetadata']['HTTPStatusCode'] == 200:
return True
else:
return False
except Exception as e:
print(e)
return False
def parse_csv_hubspot(event, obj):
#parsing CSV file to write to dynamo
try:
def auto_truncate(val):
return val[:255 ]
print('<< IN PARSE CSV HUBSPOT >>')
print(event)
csv = pd.read_csv(obj['Body'], encoding = "ISO-8859-1")
csv_nn = csv.replace(np.nan, 'null', regex=True)
d = csv_nn.to_dict(orient='records')
source_id = urandom(20).hex()
file_name = event['file_path'].split('/')[-1]
print('<< FILE NAME >>', file_name)
for each in d:
try:
dbObj = {}
#PASSING THE EXTERNAL KEY
UniqueKey = ''
if 'smbapi' in each and each['smbapi'] != 'null':
dbObj['smbapi' ] = each['smbapi']
print('<< SMB API>>', dbObj['smbapi' ])
if 'business_name' in each and each['business_name'] != 'null':
dbObj['business_name'] = each['business_name']
print('<< BUSINESS NAME >>', dbObj['business_name'])
if 'srvc_address_1' in each and each['srvc_address_1'] != 'null':
dbObj['srvc_address_1'] = each['srvc_address_1']
print('<< ADDRESS 1 >>', dbObj['srvc_address_1'])
if 'srvc_city_1' in each and each['srvc_city_1'] != 'null':
dbObj['srvc_city_1'] = each['srvc_city_1']
if 'srvc_state_1' in each and each['srvc_state_1'] != 'null':
dbObj['srvc_state_1'] = each['srvc_state_1']
if 'srvc_zip_1' in each and each['srvc_zip_1'] != 'null':
dbObj['srvc_zip_1']= str(each['srvc_zip_1']).zfill(5)
if 'proposal_date' in each and each['proposal_date'] != 'null':
dbObj['proposal_date']= try_parsing_date(each['proposal_date']).date().isoformat()
if 'proposal_start_date' in each and each['proposal_start_date'] != 'null':
dbObj['proposal_start_date']= try_parsing_date(each['proposal_start_date']).date().isoformat()
if 'udc_code_1' in each and each['udc_code_1'] != 'null':
dbObj['udc_code_1']= each['udc_code_1']
if 'eog' in each and each['eog'] != 'null':
dbObj['eog']= each['eog']
if 'fixedprice_1' in each and each['fixedprice_1'] != 'null':
dbObj['fixedprice_1']= each['fixedprice_1']
if 'fixedprice_2' in each and each['fixedprice_2'] != 'null':
dbObj['fixedprice_2']= each['fixedprice_2']
if 'fixedprice_3' in each and each['fixedprice_3'] != 'null':
dbObj['fixedprice_3']= each['fixedprice_3']
if 'fixedprice_1_gas_therm' in each and each['fixedprice_1_gas_therm'] != 'null':
dbObj['fixedprice_1_gas_therm']= each['fixedprice_1_gas_therm']
if 'fixedprice_2_gas_therm' in each and each['fixedprice_2_gas_therm'] != 'null':
dbObj['fixedprice_2_gas_therm']= each['fixedprice_2_gas_therm']
if 'fixedprice_3_gas_therm' in each and each['fixedprice_3_gas_therm'] != 'null':
dbObj['fixedprice_3_gas_therm']= each['fixedprice_3_gas_therm']
if 'fixedprice_1_gas_ccf' in each and each['fixedprice_1_gas_ccf'] != 'null':
dbObj['fixedprice_1_gas_ccf']= each['fixedprice_1_gas_ccf']
if 'fixedprice_2_gas_ccf' in each and each['fixedprice_2_gas_ccf'] != 'null':
dbObj['fixedprice_2_gas_ccf']= each['fixedprice_2_gas_ccf']
if 'fixedprice_3_gas_ccf' in each and each['fixedprice_3_gas_ccf'] != 'null':
dbObj['fixedprice_3_gas_ccf']= each['fixedprice_3_gas_ccf']
if 'fixedprice_1_gas_dth' in each and each['fixedprice_1_gas_dth'] != 'null':
dbObj['fixedprice_1_gas_dth']= each['fixedprice_1_gas_dth']
if 'fixedprice_2_gas_dth' in each and each['fixedprice_2_gas_dth'] != 'null':
dbObj['fixedprice_2_gas_dth']= each['fixedprice_2_gas_dth']
if 'fixedprice_3_gas_dth' in each and each['fixedprice_3_gas_dth'] != 'null':
dbObj['fixedprice_3_gas_dth']= each['fixedprice_3_gas_dth']
if 'fixedprice_1_gas_mcf' in each and each['fixedprice_1_gas_mcf'] != 'null':
dbObj['fixedprice_1_gas_mcf']= each['fixedprice_1_gas_mcf']
if 'fixedprice_2_gas_mcf' in each and each['fixedprice_2_gas_mcf'] != 'null':
dbObj['fixedprice_2_gas_mcf']= each['fixedprice_2_gas_mcf']
if 'fixedprice_3_gas_mcf' in each and each['fixedprice_3_gas_mcf'] != 'null':
dbObj['fixedprice_3_gas_mcf']= each['fixedprice_3_gas_mcf']
if 'term_1' in each and each['term_1'] != 'null':
dbObj['term_1']= each['term_1']
if 'term_2' in each and each['term_2'] != 'null':
dbObj['term_2']= each['term_2']
if 'term_3' in each and each['term_3'] != 'null':
dbObj['term_3']= each['term_3']
if 'smb_bdm_name' in each and each['smb_bdm_name'] != 'null':
dbObj['smb_bdm_name']= each['smb_bdm_name']
if 'smb_bdm_phone' in each and each['smb_bdm_phone'] != 'null':
if '.' in str(each['smb_bdm_phone']):
dbObj['smb_bdm_phone']= str(int(float(each['smb_bdm_phone'])))
else:
dbObj['smb_bdm_phone']= str(each['smb_bdm_phone'])
if 'smb_bdm_email' in each and each['smb_bdm_email'] != 'null' and each['smb_bdm_email'].strip() != '' and each['smb_bdm_email'] != None:
dbObj['smb_bdm_email']= each['smb_bdm_email']
print('<< OBJ >> ',dbObj)
N = urandom(20).hex()
now = str(datetime.now())
#<< END of HUBSPOT INGESTION >>
# table.put_item(
Item = {
'CogId' : str(N),
'CreatedAt': now,
'ExternalId': UniqueKey,
'Information' : dbObj,
'SourceBucket': landing_zone_bucket_name,
'SourcePath' : event['file_path'],
'Source' : 'HubSpot',
'SourceId' : source_id,
'SourceFileName': time_stamp + '_' + file_name
}
#WRITE-TO-DYNAMO
files_processing = dynamoPut(Item)
if not files_processing:
print('Writing {} record to dynamodb Failed'.format(Item))
except Exception as e:
print(e)
N = urandom(20).hex()
Item = {
'CogId' : str(N),
'CreatedAt': now,
'Information' : each,
'SourceBucket': landing_zone_bucket_name,
'SourcePath' : event['file_path'],
'Source' : 'HubSpot',
'message': str(e),
'SourceId' : source_id,
'ExternalId': UniqueKey
}
files_processing = dynamoPut(Item,'Fail')
pass
temp_file_name = time_stamp + '_' + file_name
isert_file_name = dynamoPutFileName(temp_file_name,'HubSpot')
post_to_hubspot(dbObj)
return True
except Exception as e:
print(e)
new_folder_path = os.environ['CSV_NEW_FOLDER_HUBSPOT']
unprocessed_folder_path = os.environ['CSV_ERROR_FOLDER_HUBSPOT']
# MOVING PROCESSED FILES FROM NEW TO UNPROCESSED FOLDER
move_file_to_processed = moving_files_new_to_processed(event, new_folder_path,unprocessed_folder_path)
return False
def try_parsing_date(text):
for fmt in ('%m/%d/%Y','%Y-%m-%dT%H:%M:%S-%f', '%m/%d/%y', '%Y-%m-%d', '%m.%d.%Y','%Y-%m-%dT%I', '%Y-%m-%dT%I%p', '%Y-%m-%dT%H:%M:%S.%f', '%Y-%m-%dT%H:%M:%S.%f+','%Y-%m-%dT%H:%M:%S'):#2018-11-20T08:05:54-0500
try:
return datetime.strptime(text, fmt)
except ValueError:
print('in except')
pass
return ValueError('no valid date format found')
def post_to_hubspot(list_contacts):
print('<< IN POST-To-HUBSPOT >>')
data_string = **json.dumps(list_contacts)**
payload_data = {"properties": data_string}
print('<< dbOBJ LIST >> ',payload_data)
response = requests.request("POST", endpoint_url+access_key, headers={'Content-Type': 'application/json'}, data=payload_data)
token_response=json.loads(response.text)
print('<< TOKEN RESPONSE >>',token_response)
def moving_files_new_to_processed(event, new_folder,processed_folder):
#MOVING-FILES-TO-PROCESSED
try:
copy_source = {
'Bucket': landing_zone_bucket_name,
'Key': event['file_path']
}
path = event['file_path']
processed_folder = processed_folder + time_stamp + '_'
new_key = path.replace(new_folder, processed_folder)
new_obj = landing_zone_bucket.Object(new_key)
new_obj.copy(copy_source)
s3.Object(landing_zone_bucket_name, event['file_path']).delete()
return True
except Exception as e:
print(e)
return False
def lambda_handler(event,context):
print("Starting to Push Records to Dynamo Lambda")
print(event)
try:
parse_flag = False
new_folder_path = ''
processed_folder_path = ''
#Gets file path and calls required function to parse it out
key = str(os.environ['CSV_NEW_FOLDER_HUBSPOT'])
obj = s3r.get_object(Bucket=landing_zone_bucket_name, Key=event['file_path'])
print('after obj')
print(os.environ['CSV_NEW_FOLDER_HUBSPOT'])
print('in HubSpot parse_csv')
parse_csv_func = parse_csv_hubspot(event, obj)
# Checks if parse_csv return empty dictionary
if parse_csv_func:
parse_flag = True
new_folder_path = os.environ['CSV_NEW_FOLDER_HUBSPOT']
processed_folder_path = os.environ['CSV_PROCESSED_FOLDER_HUBSPOT']
else:
print('File Format not Supported for {}'.format(event['file_path']))
if parse_flag:
# UPLOADING CONTACT.MOVING PROCESSED FILES FROM NEW TO PROCESSED FOLDER
#print('<< PAYLOAD >> ',payload)
#response = requests.request("POST", "https://api.hubapi.com/crm/v3/schemas/?hapikey="+access_key, headers={'Content-Type': 'application/json'}, data=json.dumps(str(payload)))
#token_response=json.loads(response.text)
#print('<< TOKEN RESPONSE >>',token_response)
#MOVING PROCESSED FILES FROM NEW TO PROCESSED FOLDER
move_file_to_processed = moving_files_new_to_processed(event, new_folder_path,processed_folder_path)
if move_file_to_processed:
print('File {} moved Successfully from {} to {}'.format(event['file_path'],new_folder_path,processed_folder_path))
else:
print('Moving {} file from new to processing folder Failed'.format(event['file_path']))
except Exception as e:
print(e)
可能是什么问题呢?谢谢你的帮助。