我的目标是使用代码列表和 TWS API 来提取公司快照的部分(reqFundamentalData() -> "ReportSnapshot")和这些代码的财务报表(reqFundamentalData() -> "ReportsFinStatements"),转换进入数据框并将其存储为镶木地板文件。
我试图合并提供的解决方案:
- 使用交易代码列表 TWS API 下载股票基本数据只运行第一个数据条目并忽略其他数据。谁来解决这个问题?
- 将 XML 存储为数据框 将 XML 转换为 Pandas
- 存储数据 将数据从 TWS API 保存到 csv 文件
代码:
from datetime import datetime
from bs4 import BeautifulSoup as bs
import pandas as pd
from ibapi.client import EClient
from ibapi.contract import Contract
from ibapi.wrapper import EWrapper
import logging
import random
import pathlib
import time
from datetime import date
import datetime
from pathlib import Path
class TestApp(EWrapper, EClient):
def __init__(self, addr, port, client_id):
EWrapper.__init__(self) # new - book
EClient.__init__(self, self)
self.firstReqId = 8001
self.contracts = {} # keep in dict so you can lookup
self.contNumber = self.firstReqId
# add dataframes to store the result
self.df_company_info = pd.DataFrame(data=None, index=None, columns=None)
self.df_fin_stmts = pd.DataFrame(data=None, index=None, columns=None)
def addContracts(self, cont):
self.contracts[self.contNumber] = cont # add to dict using 8001 first time
self.contNumber += 1 # next id will be 8002 etc.
def nextValidId(self, orderId: int):
# now you are connected, ask for data, no need for sleeps
# this isn't the only way to know the api is started but it's what IB recommends
self.contNumber = self.firstReqId # start with first reqId
self.getNextData()
def error(self, reqId, errorCode, errorString):
print("Error: ", reqId, "", errorCode, "", errorString)
# if there was an error in one of your requests, just contimue with next id
if reqId > 0 and self.contracts.get(self.contNumber):
# err in reqFundametalData based on reqid existing in map
print('err in', self.contracts[reqId].symbol)
self.getNextData() # try next one
def fundamentalData(self, reqId, fundamental_data):
self.fundamentalData = fundamental_data
try:
if self.fundamentalData is not None:
# convert XML to dictionary entry
dict_company_info = self.CompanyInfoXMLtoDict(self.fundamentalData)
# add dict entry to dataframe
df_add_row = pd.DataFrame([dict_company_info])
self.df_company_info = self.df_company_info.append(df_add_row, ignore_index=True)
except KeyError:
print('Ticker: ' + str(self.contNumber) + ' could not get company_info')
except TypeError:
print('Ticker: ' + str(self.contNumber) + ' could not get company_info')
except ValueError:
print('Ticker: ' + str(self.contNumber) + ' could not get company_info')
except IndexError:
print('Ticker: ' + str(self.contNumber) + ' could not get company_info')
self.getNextData()
def getNextData(self):
if self.contracts.get(self.contNumber): # means a contract exists
# so req data
self.reqFundamentalData(self.contNumber, self.contracts[self.contNumber], "ReportSnapshot", [])
self.contNumber += 1 # now get ready for next request
else: # means no more sequentially numbered contracts
print('done')
self.disconnect() # just exit
def CompanyInfoXMLtoDict(self, fundamentals):
soup = bs(fundamentals, 'xml')
df_company_info = pd.DataFrame(data=None, index=None, columns=None)
ticker = ''
longName = ''
fullTimeEmployees = 0
# search for a tag e.g. </IssueID>
for issues in soup.find_all('IssueID'):
# within this tag -> search of unique ID e.g. IssueID type=...
if issues.get('Type') == "Ticker":
ticker = issues.get_text()
break
for coID_i in soup.find_all('CoID'):
if coID_i.get('Type') == "CompanyName":
longName = coID_i.get_text()
break
for employees_i in soup.find_all('Employees'):
fullTimeEmployees = employees_i.get_text()
break
# create result entry row
if ticker is not None and ticker != '':
new_row_dict = {'ticker': ticker, 'name': longName,
'num_employees': fullTimeEmployees}
else:
new_row_dict = {}
return new_row_dict
def FinStmtsXMLtoDF(self, fundamentals, ticker, stmts_type):
today = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
today_date = date.today().strftime("%Y-%m-%d")
if stmts_type == 'annual':
period_type = 'Annual'
else:
period_type = 'Interim'
soup = bs(fundamentals, 'xml')
# build dict
stmts_terms = {}
for terms in soup.find_all("mapItem"):
# add entry to dict -> dict for maping of code to description
stmts_terms[terms.get('coaItem')] = terms.get_text()
bal_l = []
inc_l = []
cas_l = []
for period in soup.find_all('FiscalPeriod'):
# quarterly vs. annually
if period.get('Type') == period_type:
for statement in period.find_all('Statement'):
if statement.find('UpdateType').get('Code') != 'CLA':
dic = {}
stmts_type = statement.get('Type')
# source_date = statement.find('Source').get('Date')
statement_date = statement.find('StatementDate').text
# dic['date'] = source_date
dic['rep_date'] = statement_date
for item in statement.find_all('lineItem'):
# dic[item.get('coaCode')] = item.text
dic[stmts_terms.get(item.get('coaCode'), 'DEFAULT')] = item.text
if stmts_type == 'BAL':
bal_l.append(dic)
# print(stmts_type, date, dic)
elif stmts_type == 'INC':
inc_l.append(dic)
elif stmts_type == 'CAS':
cas_l.append(dic)
df_balance_sheet = pd.DataFrame(bal_l).sort_values('rep_date')
df_income_statement = pd.DataFrame(inc_l).sort_values('rep_date')
df_cash_flow = pd.DataFrame(cas_l).sort_values('rep_date')
# merge all stmts for same rep_date
df_fin_stmts = pd.DataFrame(data=None, index=None, columns=None)
df_fin_stmts = df_balance_sheet.merge(df_income_statement, how='left',
left_on=['rep_date'],
right_on=['rep_date'])
df_fin_stmts = df_fin_stmts.merge(df_cash_flow, how='left',
left_on=['rep_date'],
right_on=['rep_date'])
df_fin_stmts.insert(loc=0, column='ticker', value=ticker)
df_fin_stmts.insert(loc=1, column='date_updated', value=today_date)
return df_fin_stmts
def main():
# ----- config
project_data_folder = '/home/data/'
project_data_folder = Path(project_data_folder)
# ticker are stored in a csv file
csv_master_ticker = Path('home/data/ticker/ticker-list.csv')
# load list of tickers
df = pd.read_csv(csv_master_ticker)
list_master_ticker = df['ticker'].tolist()
fusion_company_info = pd.DataFrame(data=None, index=None, columns=None)
fusion_fin_stmts = pd.DataFrame(data=None, index=None, columns=None)
fusion_q_fin_stmts = pd.DataFrame(data=None, index=None, columns=None)
client = TestApp('127.0.0.1', 7496, 0)
for ticker in list_master_ticker:
# remove additional postfix for exchange e.g. XYZ.F -> XYZ
ticker_clean = ticker.rstrip('.')
contract = Contract()
contract.symbol = ticker_clean
contract.secType = 'STK'
contract.exchange = "SMART"
contract.currency = 'USD'
client.addContracts(contract)
client.connect('127.0.0.1', 7496, 0)
client.run()
if fusion_company_info.empty:
fusion_company_info = client.df_company_info
else:
fusion_company_info = pd.concat([fusion_company_info, client.df_company_info])
tws_company_info_file_name = 'tws_company_info.parquet'
file_name = project_data_folder / tws_company_info_file_name
try:
if fusion_company_info is not None:
if not fusion_company_info.empty:
fusion_company_info.to_parquet(file_name, engine='pyarrow')
# financial statements - annual
tws_fin_stmts_file_name = 'tws_fin_stmts.parquet'
file_name = project_data_folder / tws_fin_stmts_file_name
try:
if fusion_fin_stmts is not None:
if not fusion_fin_stmts.empty:
fusion_fin_stmts.to_parquet(file_name, engine='pyarrow')
我收到一条错误消息
Traceback (most recent call last):
File "...\ibapi\client.py", line 239, in run
self.decoder.interpret(fields)
File "...\ibapi\decoder.py", line 1278, in interpret
self.interpretWithSignature(fields, handleInfo)
File "...\ibapi\decoder.py", line 1259, in interpretWithSignature
method(*args)
TypeError: 'str' object is not callable
python-BaseException
有人可以帮我解决此错误消息吗?如果我删除 for 循环并仅针对单个代码运行它,例如
client.contracts = {}
contract = Contract()
contract.symbol = 'AMD'
contract.secType = 'STK'
contract.currency = 'USD'
contract.exchange = "SMART"
client.addContracts(contract)
client.connect('127.0.0.1', 7496, 0)
client.run()
我没有收到错误消息,并且数据框 self.company_info 填充了 AMD 的正确数据。
一般的问题:
是否可以通过 reqFundamentalData() 在一次请求/运行中不仅获得公司信息“ReportSnapshot”,还可以获得财务报表“ReportsFinStatements”(df_fin_stmts 和函数“FinStmtsXMLtoDF”)?
我是 python 新手,希望函数仅在代码中调用函数时才被执行,但不知何故,使用 TWS API(套接字,reqID)它似乎工作不同,我不清楚何时调用哪个函数一个又一个之后。例如,我怎么知道通过执行 reqFundamentalData() 函数 basicData() 被调用。或者例如 nextValidID() 以某种方式被触发,但在程序中没有显式调用。有没有好的教程介绍什么函数按什么顺序调用的过程?
非常感谢