我对 Python 编程相当陌生,到目前为止,我一直是以前的开发人员制作的逆向工程代码,或者我自己拼凑了一些功能。
脚本本身有效;长话短说,它旨在解析 CSV 并(a)创建和/或更新在 CSV 中找到的联系人,以及(b)将联系人正确分配给他们的关联公司。全部使用HubSpot API。为了实现这一点,我还导入了requests和csvmapper。
我有以下问题:
- 我怎样才能改进这个脚本以使它更pythonic?
- 使该脚本在远程服务器上运行的最佳方法是什么,请记住请求和 CSVMapper 可能未安装在该服务器上,并且我很可能没有安装它们的权限-最好的方法是什么要“打包”此脚本,还是将 Requests 和 CSVMapper 上传到服务器?
非常感谢任何建议。
#!/usr/bin/env python
# -*- coding: utf-8 -*-
from __future__ import print_function
import sys, os.path, requests, json, csv, csvmapper, glob, shutil
from time import sleep
major, minor, micro, release_level, serial = sys.version_info
# Client Portal ID
portal = "XXXXXX"
# Client API Key
hapikey = "XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX"
# This attempts to find any file in the directory that starts with "note" and ends with ".CSV"
# Server Version
# findCSV = glob.glob('/home/accountName/public_html/clientFolder/contact*.CSV')
# Local Testing Version
findCSV = glob.glob('contact*.CSV')
for i in findCSV:
theCSV = i
csvfileexists = os.path.isfile(theCSV)
# Prints a confirmation if file exists, prints instructions if it doesn't.
if csvfileexists:
print ("\nThe \"{csvPath}\" file was found ({csvSize} bytes); proceeding with sync ...\n".format(csvSize=os.path.getsize(theCSV), csvPath=os.path.basename(theCSV)))
else:
print ("File not found; check the file name to make sure it is in the same directory as this script. Exiting ...")
sys.exit()
# Begin the CSVmapper mapping... This creates a virtual "header" row - the CSV therefore does not need a header row.
mapper = csvmapper.DictMapper([
[
{'name':'account'}, #"Org. Code"
{'name':'id'}, #"Hubspot Ref"
{'name':'company'}, #"Company Name"
{'name':'firstname'}, #"Contact First Name"
{'name':'lastname'}, #"Contact Last Name"
{'name':'job_title'}, #"Job Title"
{'name':'address'}, #"Address"
{'name':'city'}, #"City"
{'name':'phone'}, #"Phone"
{'name':'email'}, #"Email"
{'name':'date_added'} #"Last Update"
]
])
# Parse the CSV using the mapper
parser = csvmapper.CSVParser(os.path.basename(theCSV), mapper)
# Build the parsed object
obj = parser.buildObject()
def contactCompanyUpdate():
# Open the CSV, use commas as delimiters, store it in a list called "data", then find the length of that list.
with open(os.path.basename(theCSV),"r") as f:
reader = csv.reader(f, delimiter = ",", quotechar="\"")
data = list(reader)
# For every row in the CSV ...
for row in range(0, len(data)):
# Set up the JSON payload ...
payload = {
"properties": [
{
"name": "account",
"value": obj[row].account
},
{
"name": "id",
"value": obj[row].id
},
{
"name": "company",
"value": obj[row].company
},
{
"property": "firstname",
"value": obj[row].firstname
},
{
"property": "lastname",
"value": obj[row].lastname
},
{
"property": "job_title",
"value": obj[row].job_title
},
{
"property": "address",
"value": obj[row].address
},
{
"property": "city",
"value": obj[row].city
},
{
"property": "phone",
"value": obj[row].phone
},
{
"property": "email",
"value": obj[row].email
},
{
"property": "date_added",
"value": obj[row].date_added
}
]
}
nameQuery = "{first} {last}".format(first=obj[row].firstname, last=obj[row].lastname)
# Get a list of all contacts for a certain company.
contactCheck = "https://api.hubapi.com/contacts/v1/search/query?q={query}&hapikey={hapikey}".format(hapikey=hapikey, query=nameQuery)
# Convert the payload to JSON and assign it to a variable called "data"
data = json.dumps(payload)
# Defined the headers content-type as 'application/json'
headers = {'content-type': 'application/json'}
contactExistCheck = requests.get(contactCheck, headers=headers)
for i in contactExistCheck.json()[u'contacts']:
# ... Get the canonical VIDs
canonicalVid = i[u'canonical-vid']
if canonicalVid:
print ("{theContact} exists! Their VID is \"{vid}\"".format(theContact=obj[row].firstname, vid=canonicalVid))
print ("Attempting to update their company...")
contactCompanyUpdate = "https://api.hubapi.com/companies/v2/companies/{companyID}/contacts/{vid}?hapikey={hapikey}".format(hapikey=hapikey, vid=canonicalVid, companyID=obj[row].id)
doTheUpdate = requests.put(contactCompanyUpdate, headers=headers)
if doTheUpdate.status_code == 200:
print ("Attempt Successful! {theContact}'s has an updated company.\n".format(theContact=obj[row].firstname))
break
else:
print ("Attempt Failed. Status Code: {status}. Company or Contact not found.\n".format(status=doTheUpdate.status_code))
def createOrUpdateClient():
# Open the CSV, use commas as delimiters, store it in a list called "data", then find the length of that list.
with open(os.path.basename(theCSV),"r") as f:
reader = csv.reader(f, delimiter = ",", quotechar="\"")
data = list(reader)
# For every row in the CSV ...
for row in range(0, len(data)):
# Set up the JSON payload ...
payloadTest = {
"properties": [
{
"property": "email",
"value": obj[row].email
},
{
"property": "firstname",
"value": obj[row].firstname
},
{
"property": "lastname",
"value": obj[row].lastname
},
{
"property": "website",
"value": None
},
{
"property": "company",
"value": obj[row].company
},
{
"property": "phone",
"value": obj[row].phone
},
{
"property": "address",
"value": obj[row].address
},
{
"property": "city",
"value": obj[row].city
},
{
"property": "state",
"value": None
},
{
"property": "zip",
"value": None
}
]
}
# Convert the payload to JSON and assign it to a variable called "data"
dataTest = json.dumps(payloadTest)
# Defined the headers content-type as 'application/json'
headers = {'content-type': 'application/json'}
#print ("{theContact} does not exist!".format(theContact=obj[row].firstname))
print ("Attempting to add {theContact} as a contact...".format(theContact=obj[row].firstname))
createOrUpdateURL = 'http://api.hubapi.com/contacts/v1/contact/createOrUpdate/email/{email}/?hapikey={hapikey}'.format(email=obj[row].email,hapikey=hapikey)
r = requests.post(createOrUpdateURL, data=dataTest, headers=headers)
if r.status_code == 409:
print ("This contact already exists.\n")
elif (r.status_code == 200) or (r.status_code == 202):
print ("Success! {firstName} {lastName} has been added.\n".format(firstName=obj[row].firstname,lastName=obj[row].lastname, response=r.status_code))
elif r.status_code == 204:
print ("Success! {firstName} {lastName} has been updated.\n".format(firstName=obj[row].firstname,lastName=obj[row].lastname, response=r.status_code))
elif r.status_code == 400:
print ("Bad request. You might get this response if you pass an invalid email address, if a property in your request doesn't exist, or if you pass an invalid property value.\n")
else:
print ("Contact Marko for assistance.\n")
if __name__ == "__main__":
# Run the Create or Update function
createOrUpdateClient()
# Give the previous function 5 seconds to take effect.
sleep(5.0)
# Run the Company Update function
contactCompanyUpdate()
print("Sync complete.")
print("Moving \"{something}\" to the archive folder...".format(something=theCSV))
# Cron version
#shutil.move( i, "/home/accountName/public_html/clientFolder/archive/" + os.path.basename(i))
# Local version
movePath = "archive/{thefile}".format(thefile=theCSV)
shutil.move( i, movePath )
print("Move successful! Exiting...\n")
sys.exit()