4

我的 viacsv.py 文件中有以下代码,旨在允许摄取自定义包:

#
# Ingest stock csv files to create a zipline data bundle

import os

import numpy  as np
import pandas as pd
import datetime

boDebug=True # Set True to get trace messages

from zipline.utils.cli import maybe_show_progress

def viacsv(symbols,start=None,end=None):

    # strict this in memory so that we can reiterate over it.
    # (Because it could be a generator and they live only once)
    tuSymbols = tuple(symbols)

    if boDebug:
        print "entering viacsv.  tuSymbols=",tuSymbols

    # Define our custom ingest function
    def ingest(environ,
               asset_db_writer,
               minute_bar_writer,  # unused
               daily_bar_writer,
               adjustment_writer,
               calendar,
               cache,
               show_progress,
               output_dir,
               # pass these as defaults to make them 'nonlocal' in py2
               start=start,
               end=end):

        if boDebug:
            print "entering ingest and creating blank dfMetadata"

        dfMetadata = pd.DataFrame(np.empty(len(tuSymbols), dtype=[
            ('start_date', 'datetime64[ns]'),
            ('end_date', 'datetime64[ns]'),
            ('auto_close_date', 'datetime64[ns]'),
            ('symbol', 'object'),
        ]))

        if boDebug:
            print "dfMetadata",type(dfMetadata)
            print dfMetadata.describe
            print

        # We need to feed something that is iterable - like a list or a generator -
        # that is a tuple with an integer for sid and a DataFrame for the data to
        # daily_bar_writer

        liData=[]
        iSid=0
        for S in tuSymbols:
            IFIL="~/notebooks/csv/"+S+".csv"
            if boDebug:
               print "S=",S,"IFIL=",IFIL
            dfData=pd.read_csv(IFIL,index_col='Date',parse_dates=True).sort_index()
            if boDebug:
               print "read_csv dfData",type(dfData),"length",len(dfData)
               print
            dfData.rename(
                columns={
                    'Open': 'open',
                    'High': 'high',
                    'Low': 'low',
                    'Close': 'close',
                    'Volume': 'volume',
                    'Adj Close': 'price',
                },
                inplace=True,
            )
            dfData['volume']=dfData['volume']/1000
            liData.append((iSid,dfData))

            # the start date is the date of the first trade and
            start_date = dfData.index[0]
            if boDebug:
                print "start_date",type(start_date),start_date

            # the end date is the date of the last trade
            end_date = dfData.index[-1]
            if boDebug:
                print "end_date",type(end_date),end_date

            # The auto_close date is the day after the last trade.
            ac_date = end_date + pd.Timedelta(days=1)
            if boDebug:
                print "ac_date",type(ac_date),ac_date

            # Update our meta data
            dfMetadata.iloc[iSid] = start_date, end_date, ac_date, S

            iSid += 1

        if boDebug:
            print "liData",type(liData),"length",len(liData)
            print liData
            print
            print "Now calling daily_bar_writer"

        daily_bar_writer.write(liData, show_progress=False)

        # Hardcode the exchange to "YAHOO" for all assets and (elsewhere)
        # register "YAHOO" to resolve to the NYSE calendar, because these are
        # all equities and thus can use the NYSE calendar.
        dfMetadata['exchange'] = "YAHOO"

        if boDebug:
            print "returned from daily_bar_writer"
            print "calling asset_db_writer"
            print "dfMetadata",type(dfMetadata)
            print dfMetadata
            print

        # Not sure why symbol_map is needed
        symbol_map = pd.Series(dfMetadata.symbol.index, dfMetadata.symbol)
        if boDebug:
            print "symbol_map",type(symbol_map)
            print symbol_map
            print

        asset_db_writer.write(equities=dfMetadata)

        if boDebug:
            print "returned from asset_db_writer"
            print "calling adjustment_writer"

        adjustment_writer.write()

        if boDebug:
            print "returned from adjustment_writer"
            print "now leaving ingest function"

    if boDebug:
       print "about to return ingest function"
    return ingest

我的问题是我输入的数据不是美国数据,而是澳大利亚股票数据。因此,它遵守澳大利亚的假期,而不是美国的假期。不知何故,下面的代码似乎默认使用美国交易日历,并告诉我在美国市场本应关闭的日子里我无法传递数据,反之亦然。如何调整上述代码以接收自定义日历?要摄取捆绑包,我在终端上运行以下命令:

zipline ingest -b CBA.csv

想法?

4

1 回答 1

9

您需要在 中定义自己的日历zipline/utils/calendars:只需创建一个现有文件的副本(例如exchange_calendar_nyse.py)并使用所需的假期进行编辑。假设您调用此文件my_own_calendar.py和类MyOwnCalendar

请注意,您还需要采取其他 2(或 3)个步骤:

  1. 在 中注册您的日历zipline/util/calendars/calendar_utils.py:您可以在 中添加条目_default_calendar_factories,如果您需要别名,则可以在 中添加条目_default_calendar_aliases。例如,要映射my_own_calendar.py到“OWN”并使用别名“MY_CALENDAR”:

    _default_calendar_factories = {
     'NYSE': NYSEExchangeCalendar,
     'CME': CMEExchangeCalendar,
     ...
     'OWN': MyOwnCalendar
    }
    
    _default_calendar_aliases = {
     'NASDAQ': 'NYSE',
     ...
     'MY_CALENDAR': 'OWN'
    }
    
  2. 你需要编辑.zipline/extension.py(你会.zipline在你的主目录中找到 - 在 Windows 下查看你的主目录,打开一个 dos shell 并键入echo %USERPROFILE%

    # List the tickers of the market you defined
    tickers_of_interest = {'TICKER1', 'TICKER2', ...}
    
    register('my_market', viacsv(tickers_of_interest), calendar_name="OWN")
    

通过这些步骤,您应该能够简单地输入您的捆绑包zipline ingest -b my_market

  1. 我个人遇到的问题是,我需要对交易日历有更多的控制权,因为超级类 TradingCalendar 假设周六/周日是非交易日,而并非每个市场/资产类别都是如此。日历定义错误会导致摄取时出现异常。例如,要为 7/7 24/24 交易的市场设置日历,我对日历进行了如下修改:

    from datetime import time
    from pytz import timezone
    from pandas import date_range
    from .trading_calendar import TradingCalendar, HolidayCalendar
    
    from zipline.utils.memoize import lazyval
    
    from pandas.tseries.offsets import CustomBusinessDay
    
    class MyOwnCalendar(TradingCalendar):
        """
        Round the clock calendar: 7/7, 24/24
        """
    
        @property
        def name(self):
            return "OWN"
    
        @property
        def tz(self):
            return timezone("Europe/London")
    
        @property
        def open_time(self):
            return time(0)
    
        @property
        def close_time(self):
            return time(23, 59)
    
        @property
        def regular_holidays(self):
            return []
    
        @property
        def special_opens(self):
            return []
    
        def sessions_in_range(self, start_session, last_session):
            return date_range(start_session, last_session)
    
        @lazyval
        def day(self):
            return CustomBusinessDay(holidays=self.adhoc_holidays,
            calendar=self.regular_holidays,weekmask="Mon Tue Wed Thu Fri Sat Sun")
    
于 2017-11-05T10:09:47.087 回答