0

我最近使用 Postman 将联系人记录上传到 HubSpot。这是我用来成功上传联系人的原始 JSON 数据示例和 POST 方法:

https://api.hubapi.com/crm/v3/objects/contacts?hapikey={{hapikey}}
{properties": {
    "smbapi": "yes",
    "email": "fcgrinding@junkstermail.com",
    "business_name":"Forest City Grinding Inc",
    "srvc_address_1":"3544 Stenstrom Rd",
    "srvc_city_1":"",
    "srvc_state_1":"IL",
    "srvc_zip_1":"61109",
    "proposal_date":"2021-12-07",
    "proposal_start_date": "2022-01-01",
    "udc_code_1": "COMED",
    "eog":"electric",
    "fixedprice_1_gas_mcf": 6.63,
    "fixedprice_2_gas_mcf": 6.11,
    "fixedprice_3_gas_mcf": 5.9,
    "term_1": 12,
    "term_2": 24,
    "term_3": 36,
    "smb_bdm_name": "Timothy Chin",
    "smb_bdm_phone": "833-999-9999",
    "smb_bdm_email": "tim.chin@junkstermail.com"
  }
}

接下来,我创建了一个 python lambda 函数来自动化这个过程,因为我们想要提取可能有很多记录要提取的 CSV 文件。因此,我构建了字典,使其看起来与上面的字符串相同,后者在 Postman 中工作得很好/很好。但是,当我尝试使用我的字典有效负载对 HubSpot 执行 Post 方法 API 调用时,我收到此错误:

输入 JSON 无效:无法构建 ObjectSchemaEgg,第 1 行第 2 列的输入 JSON 无效:无法构建 ObjectSchemaEgg,未设置某些必需属性 [名称,标签]

这是我的代码为 API 调用构建的已处理字典字符串:

{'properties': '{"smbapi": "yes", "business_name": "Forest City Grinding Inc", "srvc_address_1": "4844 Stenstrom Rd", "srvc_state_1": "IL", "srvc_zip_1": "61109", "proposal_date": "2021-12-07", "proposal_start_date": "2022-01-01", "udc_code_1": "COMED", "fixedprice_1": "6.63", "fixedprice_2": "6.11", "fixedprice_3": "5.9", "term_2": "24", "term_3": "36", "smb_bdm_name": "Gary Wysong", "smb_bdm_phone": "833-389-0881", "smb_bdm_email": "gary.wysong@constellation.com"}'}

这是我的完整 Lambda 代码(特别注意对 post_to_hubspot() 的调用以及 post_to_hubspot() 函数本身)。加载发电机表的代码工作正常。:

import boto3
import json
import decimal
from botocore.exceptions import ClientError
from boto3.dynamodb.conditions import Key, Attr
import re
import pandas as pd
import numpy as np
import os
import datetime
from os import urandom
import email
import base64
import requests
from datetime import datetime, timedelta, timezone
import mailparser
import calendar
global payload_data

landing_zone_bucket_name = str(os.environ['BUCKETNAME'])
s3 = boto3.resource('s3')
landing_zone_bucket = s3.Bucket(landing_zone_bucket_name )
s3r = boto3.client('s3')
dynamodb = boto3.resource('dynamodb', region_name='us-west-2')
table = dynamodb.Table(str(os.environ['DYNAMOTABLE']))
unprocessed_records_table = dynamodb.Table(str(os.environ['UNPROCESSEDTABLE']))
email_table = dynamodb.Table(str(os.environ['EMAILSTATUSTABLE']))
endpoint_url=os.environ['ENDPOINT_URL']
access_key = os.environ['ACCESSKEY']
now = datetime.now()
today_date = datetime.strftime(now,'%d')
today_month = datetime.strftime(now,'%m')
today_year = datetime.strftime(now,'%Y')
time_stamp = datetime.now().strftime('%Y%m%d%H%M%S')
payload_data = {}

#WRITE RECORDS TO DYNAMO
def dynamoPut(dObj,table_name=None):
    try:
        for each in list(dObj['Information']):
            if dObj['Information'][each]:
                dObj['Information'][each] = str(dObj['Information'][each])
            else:
                del dObj['Information'][each]
        dObj['Month'] =  today_month
        dObj['Year'] =  today_year
        dObj['Day'] = today_date
        for each in list(dObj):
            if dObj[each] != '':
                dObj[each] = dObj[each]
            else:
                del dObj[each]
        if table_name != None:
            response = unprocessed_records_table.put_item(Item = dObj)
        else:
            response = table.put_item(Item = dObj)
        if response['ResponseMetadata']['HTTPStatusCode'] == 200:
            return True
        else:
            return False
    except Exception as e:
        print(e)
        return False

def dynamoPutFileName(filename,source_type):
    try:
        dObj = {}
        dObj['id'] =  urandom(20).hex()
        dObj['CreatedAt'] =  str(datetime.now())
        dObj['FileName'] = filename
        dObj['Type'] = source_type
        dObj['EmailSent'] = False
        response = email_table.put_item(Item = dObj)
        if response['ResponseMetadata']['HTTPStatusCode'] == 200:
            return True
        else:
            return False
    except Exception as e:
        print(e)
        return False


def parse_csv_hubspot(event, obj):
    #parsing CSV file to write to dynamo
    try:
        def auto_truncate(val):
            return val[:255 ]
        print('<<  IN PARSE CSV HUBSPOT >>')
        print(event)
        csv = pd.read_csv(obj['Body'], encoding = "ISO-8859-1") 
        csv_nn = csv.replace(np.nan, 'null', regex=True)
        d = csv_nn.to_dict(orient='records')
        source_id = urandom(20).hex()
        file_name = event['file_path'].split('/')[-1] 
        print('<< FILE NAME >>', file_name)
        for each in d:
            try:
                dbObj = {}
                #PASSING THE EXTERNAL KEY
                UniqueKey = ''
                if 'smbapi' in each and each['smbapi'] != 'null':
                    dbObj['smbapi' ] = each['smbapi']
                    print('<< SMB API>>', dbObj['smbapi' ])

                if 'business_name' in each and each['business_name'] != 'null':
                    dbObj['business_name'] = each['business_name']
                    print('<< BUSINESS NAME >>', dbObj['business_name'])

                if 'srvc_address_1' in each and each['srvc_address_1'] != 'null':
                    dbObj['srvc_address_1'] = each['srvc_address_1']
                    print('<< ADDRESS 1 >>', dbObj['srvc_address_1'])

                if 'srvc_city_1' in each and each['srvc_city_1'] != 'null':
                    dbObj['srvc_city_1'] = each['srvc_city_1']

                if 'srvc_state_1' in each and each['srvc_state_1'] != 'null':
                    dbObj['srvc_state_1'] = each['srvc_state_1']

              
                if 'srvc_zip_1' in each and each['srvc_zip_1'] != 'null':
                    dbObj['srvc_zip_1']= str(each['srvc_zip_1']).zfill(5)
                
                if 'proposal_date' in each and each['proposal_date'] != 'null':
                    dbObj['proposal_date']= try_parsing_date(each['proposal_date']).date().isoformat()

                if 'proposal_start_date' in each and each['proposal_start_date'] != 'null':
                    dbObj['proposal_start_date']= try_parsing_date(each['proposal_start_date']).date().isoformat()   

                if 'udc_code_1' in each and each['udc_code_1'] != 'null':
                    dbObj['udc_code_1']= each['udc_code_1']

                if 'eog' in each and each['eog'] != 'null':
                    dbObj['eog']= each['eog']

                if 'fixedprice_1' in each and each['fixedprice_1'] != 'null':
                    dbObj['fixedprice_1']= each['fixedprice_1']

                if 'fixedprice_2' in each and each['fixedprice_2'] != 'null':
                    dbObj['fixedprice_2']= each['fixedprice_2']      
                
                if 'fixedprice_3' in each and each['fixedprice_3'] != 'null':
                    dbObj['fixedprice_3']= each['fixedprice_3']

                if 'fixedprice_1_gas_therm' in each and each['fixedprice_1_gas_therm'] != 'null':
                    dbObj['fixedprice_1_gas_therm']= each['fixedprice_1_gas_therm']

                if 'fixedprice_2_gas_therm' in each and each['fixedprice_2_gas_therm'] != 'null':
                    dbObj['fixedprice_2_gas_therm']= each['fixedprice_2_gas_therm']

                if 'fixedprice_3_gas_therm' in each and each['fixedprice_3_gas_therm'] != 'null':
                    dbObj['fixedprice_3_gas_therm']= each['fixedprice_3_gas_therm']

                if 'fixedprice_1_gas_ccf' in each and each['fixedprice_1_gas_ccf'] != 'null':
                    dbObj['fixedprice_1_gas_ccf']= each['fixedprice_1_gas_ccf']

                if 'fixedprice_2_gas_ccf' in each and each['fixedprice_2_gas_ccf'] != 'null':
                    dbObj['fixedprice_2_gas_ccf']= each['fixedprice_2_gas_ccf'] 

                if 'fixedprice_3_gas_ccf' in each and each['fixedprice_3_gas_ccf'] != 'null':
                    dbObj['fixedprice_3_gas_ccf']= each['fixedprice_3_gas_ccf']

                if 'fixedprice_1_gas_dth' in each and each['fixedprice_1_gas_dth'] != 'null':
                    dbObj['fixedprice_1_gas_dth']= each['fixedprice_1_gas_dth']

                if 'fixedprice_2_gas_dth' in each and each['fixedprice_2_gas_dth'] != 'null':
                    dbObj['fixedprice_2_gas_dth']= each['fixedprice_2_gas_dth']

                if 'fixedprice_3_gas_dth' in each and each['fixedprice_3_gas_dth'] != 'null':
                    dbObj['fixedprice_3_gas_dth']= each['fixedprice_3_gas_dth']

                if 'fixedprice_1_gas_mcf' in each and each['fixedprice_1_gas_mcf'] != 'null':
                    dbObj['fixedprice_1_gas_mcf']= each['fixedprice_1_gas_mcf']

                if 'fixedprice_2_gas_mcf' in each and each['fixedprice_2_gas_mcf'] != 'null':
                    dbObj['fixedprice_2_gas_mcf']= each['fixedprice_2_gas_mcf']

                if 'fixedprice_3_gas_mcf' in each and each['fixedprice_3_gas_mcf'] != 'null':
                    dbObj['fixedprice_3_gas_mcf']= each['fixedprice_3_gas_mcf']

                if 'term_1' in each and each['term_1'] != 'null':
                    dbObj['term_1']= each['term_1']

                if 'term_2' in each and each['term_2'] != 'null':
                    dbObj['term_2']= each['term_2']

                if 'term_3' in each and each['term_3'] != 'null':
                    dbObj['term_3']= each['term_3']
               
                if 'smb_bdm_name' in each and each['smb_bdm_name'] != 'null':
                    dbObj['smb_bdm_name']= each['smb_bdm_name']

                if 'smb_bdm_phone' in each and each['smb_bdm_phone'] != 'null':
                    if '.' in str(each['smb_bdm_phone']):
                            dbObj['smb_bdm_phone']= str(int(float(each['smb_bdm_phone'])))
                    else:
                            dbObj['smb_bdm_phone']= str(each['smb_bdm_phone'])

                if 'smb_bdm_email' in each and each['smb_bdm_email'] != 'null' and each['smb_bdm_email'].strip() != '' and each['smb_bdm_email'] != None:
                    dbObj['smb_bdm_email']= each['smb_bdm_email']
                print('<< OBJ >> ',dbObj)
                
                N = urandom(20).hex()
                now = str(datetime.now())
                #<< END of HUBSPOT INGESTION >>
                # table.put_item(
                Item =  {
                    'CogId' : str(N),
                    'CreatedAt': now,
                    'ExternalId': UniqueKey,
                    'Information' : dbObj,
                    'SourceBucket': landing_zone_bucket_name,
                    'SourcePath' : event['file_path'],
                    'Source' : 'HubSpot',
                    'SourceId' : source_id,
                    'SourceFileName': time_stamp + '_' + file_name
                }
                #WRITE-TO-DYNAMO
                files_processing = dynamoPut(Item)
                if not files_processing:
                    print('Writing {} record to dynamodb Failed'.format(Item))

            except Exception as e:
                print(e)
                N = urandom(20).hex()
                Item =  {
                    'CogId' : str(N),
                    'CreatedAt': now,
                    'Information' : each,
                    'SourceBucket': landing_zone_bucket_name,
                    'SourcePath' : event['file_path'],
                    'Source' : 'HubSpot',
                    'message': str(e),
                    'SourceId' : source_id,
                    'ExternalId': UniqueKey
                }
                files_processing = dynamoPut(Item,'Fail')
                pass
        temp_file_name = time_stamp + '_' + file_name
        isert_file_name = dynamoPutFileName(temp_file_name,'HubSpot')
        post_to_hubspot(dbObj)
        return True
    except Exception as e:
        print(e)
        new_folder_path = os.environ['CSV_NEW_FOLDER_HUBSPOT']
        unprocessed_folder_path = os.environ['CSV_ERROR_FOLDER_HUBSPOT']
        # MOVING PROCESSED FILES FROM NEW TO UNPROCESSED FOLDER
        move_file_to_processed = moving_files_new_to_processed(event, new_folder_path,unprocessed_folder_path)
        return False 


def try_parsing_date(text):
    for fmt in ('%m/%d/%Y','%Y-%m-%dT%H:%M:%S-%f', '%m/%d/%y', '%Y-%m-%d', '%m.%d.%Y','%Y-%m-%dT%I', '%Y-%m-%dT%I%p', '%Y-%m-%dT%H:%M:%S.%f', '%Y-%m-%dT%H:%M:%S.%f+','%Y-%m-%dT%H:%M:%S'):#2018-11-20T08:05:54-0500
        try:
            return datetime.strptime(text, fmt)
        except ValueError:
            print('in except')
            pass
    return ValueError('no valid date format found')

def post_to_hubspot(list_contacts):
    print('<< IN POST-To-HUBSPOT >>')
    data_string = **json.dumps(list_contacts)**
    payload_data = {"properties": data_string}
    print('<< dbOBJ LIST >> ',payload_data)
    
    response = requests.request("POST", endpoint_url+access_key, headers={'Content-Type': 'application/json'}, data=payload_data)
    token_response=json.loads(response.text)
    print('<< TOKEN RESPONSE >>',token_response)

def moving_files_new_to_processed(event, new_folder,processed_folder):
    #MOVING-FILES-TO-PROCESSED
    try:
        copy_source = {
            'Bucket': landing_zone_bucket_name,
                'Key': event['file_path']
            }
        path = event['file_path']
        processed_folder = processed_folder + time_stamp + '_'
        new_key = path.replace(new_folder, processed_folder)
        new_obj = landing_zone_bucket.Object(new_key)
        new_obj.copy(copy_source)
        s3.Object(landing_zone_bucket_name, event['file_path']).delete()
        return True
    except Exception as e:
        print(e)
        return False


def lambda_handler(event,context):
    print("Starting to Push Records to Dynamo Lambda")
    print(event) 
    try:
        parse_flag = False
        new_folder_path = ''
        processed_folder_path = ''
        #Gets file path and calls required function to parse it out
        key = str(os.environ['CSV_NEW_FOLDER_HUBSPOT'])
        obj = s3r.get_object(Bucket=landing_zone_bucket_name, Key=event['file_path'])
        
        print('after obj')
        print(os.environ['CSV_NEW_FOLDER_HUBSPOT']) 
        print('in HubSpot parse_csv')
        parse_csv_func = parse_csv_hubspot(event, obj)
        # Checks if parse_csv return empty dictionary
        if parse_csv_func:
            parse_flag = True
            new_folder_path = os.environ['CSV_NEW_FOLDER_HUBSPOT']
            processed_folder_path = os.environ['CSV_PROCESSED_FOLDER_HUBSPOT']
        else:
            print('File Format not Supported for {}'.format(event['file_path']))
        if parse_flag:
            # UPLOADING CONTACT.MOVING PROCESSED FILES FROM NEW TO PROCESSED FOLDER
            #print('<< PAYLOAD >> ',payload)
            #response = requests.request("POST", "https://api.hubapi.com/crm/v3/schemas/?hapikey="+access_key, headers={'Content-Type': 'application/json'}, data=json.dumps(str(payload)))
            #token_response=json.loads(response.text)
            #print('<< TOKEN RESPONSE >>',token_response)
            #MOVING PROCESSED FILES FROM NEW TO PROCESSED FOLDER
            move_file_to_processed = moving_files_new_to_processed(event, new_folder_path,processed_folder_path)
            if move_file_to_processed:
                print('File {} moved Successfully from {} to {}'.format(event['file_path'],new_folder_path,processed_folder_path))
            else:
                print('Moving {} file from new to processing folder Failed'.format(event['file_path']))
            
    except Exception as e:
        print(e) 

可能是什么问题呢?谢谢你的帮助。

4

1 回答 1

0

问题是由两个问题引起的:

  1. 字典应该放在json.dumps()中,以便在执行 POST 时将其转换为 JSON 字符串,因此字典不需要更改其结构。以下是 POST 的回复:

    << TOKEN RESPONSE >> { "id": "135120801", "properties": { "business_name": "Millers Brand Oats", "createdate": "2021-12-21T02:31:12.452Z", "fixedprice_1" :“6.63”、“fixedprice_2”:“6.11”、“fixedprice_3”:“5.9”、“hs_all_contact_vids”:“135120801”、“hs_is_contact”:“真”、“hs_is_unworked”:“真”、“hs_marketable_until_renewal”:“ false”、“hs_object_id”:“135120801”、“hs_pipeline”:“联系人生命周期管道”、“lastmodifieddate”:“2021-12-21T02:31:12.452Z”、“proposal_date”:“2021-12-07 ", "proposal_start_date": "2022-01-01", "smb_bdm_email”:“Tim.Chu@junkster.com”,“smb_bdm_name”:“Tim Chu”,“smb_bdm_phone”:“833-999-9999”,“smbapi”:“是”,“srvc_address_1”:“4844 Stenstrom Rd ”,“srvc_state_1”:“IL”,“srvc_zip_1”:“61109”,“term_2”:“24”,“term_3”:“36”,“udc_code_1”:“COMED”},“createdAt”:“2021- 12-21T02:31:12.452Z", "updatedAt": "2021-12-21T02:31:12.452Z", "归档": false }srvc_state_1”:“IL”,“srvc_zip_1”:“61109”,“term_2”:“24”,“term_3”:“36”,“udc_code_1”:“COMED”},“createdAt”:“2021-12-21T02 :31:12.452Z", "updatedAt": "2021-12-21T02:31:12.452Z", "归档": false }srvc_state_1”:“IL”,“srvc_zip_1”:“61109”,“term_2”:“24”,“term_3”:“36”,“udc_code_1”:“COMED”},“createdAt”:“2021-12-21T02 :31:12.452Z", "updatedAt": "2021-12-21T02:31:12.452Z", "归档": false }

  2. 我使用了错误的端点:

https://api.hubapi.com/crm/v3/schemas/

代替:

https://api.hubapi.com/crm/v3/objects/contacts/

现在我只需要找出为什么 AWS Lambda POST 允许在 HubSpot 中创建重复的联系人,而 Postman POST 禁止创建重复的联系人。

于 2021-12-21T03:26:16.647 回答