1

I made a function to evaluate the following problem experimentally, taken from a A Primer for the Mathematics of Financial Engineering.

Problem: Let X be the number of times you must flip a fair coin until it lands heads. What are E[X] (expected value) and var(X) (variance)?

Following the textbook solution, the following code yields the correct answer:

from sympy import *

k = symbols('k')

Expected_Value = summation(k/2**k, (k, 1, oo)) # Both solutions work

Variance = summation(k**2/2**k, (k, 1, oo)) - Expected_Value**2

To validate this answer, I decided to have a go at making a function to simulate this experiment. The following code is what I came up with.

def coin_toss(toss, probability=[0.5, 0.5]): 

    """Computes expected value and variance for coin toss experiment"""

    flips = [] # Collects total number of throws until heads appear per experiment.

    for _ in range(toss): # Simulate n flips

        number_flips=[] # Number of flips until heads is tossed  

        while sum(number_flips) == 0: # Continue simulation while Tails are thrown 

            number_flips.append(np.random.choice(2, p=probability)) # Append result to number_flips

        flips.append(len(number_flips)) #Append number of flips until lands heads to flips

    Expected_Value, Variance  = np.mean(flips), np.var(flips)    

    print('E[X]: {}'.format(Expected_Value), 
          '\nvar[X]: {}'.format(Variance)) # Return expected value 

The run time if I simulate 1e6 experiments, using the following code is approximately 35.9 seconds.

   from timeit import Timer

   t1 = Timer("""coin_toss(1000000)""", """from __main__ import coin_toss""")

   print(t1.timeit(1))

In the interest of developing my understanding of Python, is this a particularly efficient/pythonic way of approaching a problem like this? How can I utilise existing libraries to improve efficiency/flow execution?

4

1 回答 1

1

为了以高效和 Python 的方式进行编码,您必须查看PythonSpeedNumPy。下面可以找到一个使用 numpy 的更快代码的示例。

python+numpy中优化的abc是向量化操作,这在这种情况下是相当困难的,因为有一个while实际上可能是无限的,硬币可以连续翻转40次。但是,可以在chunksfor中完成工作,而不是使用迭代。这是问题和这种方法之间的主要区别。tosscoin_tosscoin_toss_2d

coin_toss_2d

的主要优点coin_toss_2d是按块工作,这些块的大小有一些默认值,但是可以修改它们(因为它们会影响速度)。因此,它只会迭代while current_toss<toss多次toss%flips_at_a_time。这是通过 numpy 实现的,它允许生成一个矩阵,其结果是重复flips_at_a_time掷硬币的实验flips_per_try次数。该矩阵将包含 0(尾部)和 1(头部)。

# i.e. doing only 5 repetitions with 3 flips_at_a_time
flip_events = np.random.choice([0,1],size=(repetitions_at_a_time,flips_per_try),p=probability)
# Out 
[[0 0 0] # still no head, we will have to keep trying
 [0 1 1] # head at the 2nd try (position 1 in python)
 [1 0 0]
 [1 0 1]
 [0 0 1]]

一旦得到这个结果,argmax就会被调用。这会找到对应于每行(重复)的最大值(将是 1,一个头)的索引,并且在多次出现的情况下,返回第一个,这正是所需要的,在一系列尾之后的第一个头.

maxs = flip_events.argmax(axis=1)
# Out
[0 1 0 0 2] 
# The first position is 0, however, flip_events[0,0]!=1, it's not a head!

但是,必须考虑所有行为 0 的情况。在这种情况下,最大值将为 0,它的第一次出现也将为 0,即第一列(尝试)。因此,我们检查在第一次尝试中找到的所有最大值是否对应于第一次尝试中的正面。

not_finished = (maxs==0) & (flip_events[:,0]!=1)
# Out
[ True False False False False] # first repetition is not finished

如果不是这种情况,我们会循环重复相同的过程,但仅适用于在任何尝试中都没有头部的重复。

n = np.sum(not_finished)
while n!=0: # while there are sequences without any head
    flip_events = np.random.choice([0,1],size=(n,flips_per_try),p=probability) # number of experiments reduced to n (the number of all tails sequences)
    maxs2 = flip_events.argmax(axis=1)
    maxs[not_finished] += maxs2+flips_per_try # take into account that there have been flips_per_try tries already (every iteration is added)
    not_finished2 = (maxs2==0) & (flip_events[:,0]!=1) 
    not_finished[not_finished] = not_finished2
    n = np.sum(not_finished)
# Out
# flip_events
[[1 0 1]] # Now there is a head
# maxs2
[0]
# maxs
[3 1 0 0 2] # The value of the still unfinished repetition has been updated,
# taking into account that the first position in flip_events is the 4th,
# without affecting the rest

然后存储对应于第一次出现的索引(我们必须添加 1,因为 python 索引从零开始而不是 1)。有一个try ... except ...块可以处理toss不是 的倍数的情况repetitions_at_a_time

def coin_toss_2d(toss, probability=[.5,.5],repetitions_at_a_time=10**5,flips_per_try=20):
    # Initialize and preallocate data
    current_toss = 0
    flips = np.empty(toss)
    # loop by chunks
    while current_toss<toss:
        # repeat repetitions_at_a_time times experiment "flip coin flips_per_try times"
        flip_events = np.random.choice([0,1],size=(repetitions_at_a_time,flips_per_try),p=probability)
        # store first head ocurrence
        maxs = flip_events.argmax(axis=1)
        # Check for all tails sequences, that is, repetitions were we have to keep trying to get a head
        not_finished = (maxs==0) & (flip_events[:,0]!=1)
        n = np.sum(not_finished)
        while n!=0: # while there are sequences without any head
            flip_events = np.random.choice([0,1],size=(n,flips_per_try),p=probability) # number of experiments reduced to n (the number of all tails sequences)
            maxs2 = flip_events.argmax(axis=1)
            maxs[not_finished] += maxs2+flips_per_try # take into account that there have been flips_per_try tries already (every iteration is added)
            not_finished2 = (maxs2==0) & (flip_events[:,0]!=1) 
            not_finished[not_finished] = not_finished2
            n = np.sum(not_finished)
        # try except in case toss is not multiple of repetitions_at_a_time, in general, no error is raised, that is why a try is useful
        try:
            flips[current_toss:current_toss+repetitions_at_a_time] = maxs+1
        except ValueError:
            flips[current_toss:] = maxs[:toss-current_toss]+1
        # Update current_toss and move to the next chunk
        current_toss += repetitions_at_a_time
    # Once all values are obtained, average and return them
    Expected_Value, Variance  = np.mean(flips), np.var(flips)    
    return Expected_Value, Variance

coin_toss_map

这里的代码基本相同,但是现在,intrinsec while 是在一个单独的函数中完成的,该函数是coin_toss_map使用map.

def toss_chunk(args):
    probability,repetitions_at_a_time,flips_per_try = args
    # repeat repetitions_at_a_time times experiment "flip coin flips_per_try times"
    flip_events = np.random.choice([0,1],size=(repetitions_at_a_time,flips_per_try),p=probability)
    # store first head ocurrence
    maxs = flip_events.argmax(axis=1)
    # Check for all tails sequences
    not_finished = (maxs==0) & (flip_events[:,0]!=1)
    n = np.sum(not_finished)
    while n!=0: # while there are sequences without any head
        flip_events = np.random.choice([0,1],size=(n,flips_per_try),p=probability) # number of experiments reduced to n (the number of all tails sequences)
        maxs2 = flip_events.argmax(axis=1)
        maxs[not_finished] += maxs2+flips_per_try # take into account that there have been flips_per_try tries already (every iteration is added)
        not_finished2 = (maxs2==0) & (flip_events[:,0]!=1) 
        not_finished[not_finished] = not_finished2
        n = np.sum(not_finished)
    return maxs+1
def coin_toss_map(toss,probability=[.5,.5],repetitions_at_a_time=10**5,flips_per_try=20):
    n_chunks, remainder = divmod(toss,repetitions_at_a_time)
    args = [(probability,repetitions_at_a_time,flips_per_try) for _ in range(n_chunks)]
    if remainder:
        args.append((probability,remainder,flips_per_try))
    flips = np.concatenate(map(toss_chunk,args))
    # Once all values are obtained, average and return them
    Expected_Value, Variance  = np.mean(flips), np.var(flips)    
    return Expected_Value, Variance

性能比较

在我的计算机中,我得到了以下计算时间:

In [1]: %timeit coin_toss(10**6)
# Out 
# ('E[X]: 2.000287', '\nvar[X]: 1.99791891763')
# ('E[X]: 2.000459', '\nvar[X]: 2.00692478932')
# ('E[X]: 1.998118', '\nvar[X]: 1.98881045808')
# ('E[X]: 1.9987', '\nvar[X]: 1.99508631')
# 1 loop, best of 3: 46.2 s per loop

In [2]: %timeit coin_toss_2d(10**6,repetitions_at_a_time=5*10**5,flips_per_try=4)
# Out
# 1 loop, best of 3: 197 ms per loop

In [3]: %timeit coin_toss_map(10**6,repetitions_at_a_time=4*10**5,flips_per_try=4)
# Out
# 1 loop, best of 3: 192 ms per loop

均值和方差的结果是:

In [4]: [coin_toss_2d(10**6,repetitions_at_a_time=10**5,flips_per_try=10) for _ in range(4)]
# Out
# [(1.999848, 1.9990739768960009),
#  (2.000654, 2.0046035722839997),
#  (1.999835, 2.0072329727749993),
#  (1.999277, 2.001566477271)]

In [4]: [coin_toss_map(10**6,repetitions_at_a_time=10**5,flips_per_try=4) for _ in range(4)]
# Out
# [(1.999552, 2.0005057992959996),
#  (2.001733, 2.011159996711001),
#  (2.002308, 2.012128673136001),
#  (2.000738, 2.003613455356)]
于 2018-04-14T01:45:12.740 回答