2

如何在不超过最大值且不低于最小值的情况下在起始值之间生成随机游走数据?

这是我这样做的尝试,但由于某种原因,有时该系列会超过最大值或低于最小值。似乎开始和结束值受到尊重,但不是最小值和最大值。如何解决这个问题?我也想给出波动的标准偏差,但不知道如何。我使用了randomPerc波动,但这是错误的,因为我想指定标准。

import numpy as np
import matplotlib.pyplot as plt

def generateRandomData(length,randomPerc, min,max,start, end):
    data_np = (np.random.random(length) - randomPerc).cumsum()
    data_np *= (max - min) / (data_np.max() - data_np.min())
    data_np += np.linspace(start - data_np[0], end - data_np[-1], len(data_np))
    return data_np

randomData=generateRandomData(length = 1000, randomPerc = 0.5, min = 50, max = 100, start = 66, end = 80)

## print values
print("Max Value",randomData.max())
print("Min Value",randomData.min())
print("Start Value",randomData[0])
print("End Value",randomData[-1])
print("Standard deviation",np.std(randomData))

## plot values
plt.figure()
plt.plot(range(randomData.shape[0]), randomData)
plt.show()
plt.close()

这是一个简单的循环,用于检查低于最小值或高于最大值的系列。这正是我想要避免的。该系列应分布在最小值和最大值的给定限制之间。

 ## generate 1000 series and check if there are any values over the maximum limit or under the minimum limit
    for i in range(1000):
        randomData = generateRandomData(length = 1000, randomPerc = 0.5, min = 50, max = 100, start = 66, end = 80)
        if(randomData.min() < 50):
            print(i, "Value Lower than Min limit")
        if(randomData.max() > 100):
            print(i, "Value Higher than Max limit")
4

4 回答 4

4

当你对你的步行施加条件时,它不能被认为是纯粹随机的。无论如何,一种方法是迭代生成步行,并检查每次迭代的边界。但是如果你想要一个矢量化的解决方案,这里是:

def bounded_random_walk(length, lower_bound,  upper_bound, start, end, std):
    assert (lower_bound <= start and lower_bound <= end)
    assert (start <= upper_bound and end <= upper_bound)

    bounds = upper_bound - lower_bound

    rand = (std * (np.random.random(length) - 0.5)).cumsum()
    rand_trend = np.linspace(rand[0], rand[-1], length)
    rand_deltas = (rand - rand_trend)
    rand_deltas /= np.max([1, (rand_deltas.max()-rand_deltas.min())/bounds])

    trend_line = np.linspace(start, end, length)
    upper_bound_delta = upper_bound - trend_line
    lower_bound_delta = lower_bound - trend_line

    upper_slips_mask = (rand_deltas-upper_bound_delta) >= 0
    upper_deltas =  rand_deltas - upper_bound_delta
    rand_deltas[upper_slips_mask] = (upper_bound_delta - upper_deltas)[upper_slips_mask]

    lower_slips_mask = (lower_bound_delta-rand_deltas) >= 0
    lower_deltas =  lower_bound_delta - rand_deltas
    rand_deltas[lower_slips_mask] = (lower_bound_delta + lower_deltas)[lower_slips_mask]

    return trend_line + rand_deltas

randomData = bounded_random_walk(1000, lower_bound=50, upper_bound =100, start=50, end=100, std=10)

您可以将其视为几何问题的解决方案。trend_line连接您的和start点,并具有由和end定义的边距。是你的随机游走,是趋势线,是偏离趋势线。我们搭配趋势线,并希望确保增量不超过边际。当超过允许的边距时,我们将多余的部分“折叠”回边界。 lower_boundupper_boundrandrand_trendrand_deltasrandrand_deltas

最后,您将生成的随机增量添加到start=>end趋势线,从而获得所需的有界随机游走。

std参数对应于随机游走的方差量。

更新:固定断言

在这个版本中,“std”不承诺是“间隔”。

于 2017-10-29T22:34:59.490 回答
2

我注意到您使用内置函数作为不推荐的参数(最小值和最大值)(我将它们更改为 max_1 和 min_1)。除此之外,您的代码应该按预期工作:

def generateRandomData(length,randomPerc, min_1,max_1,start, end):
    data_np = (np.random.random(length) - randomPerc).cumsum()
    data_np *= (max_1 - min_1) / (data_np.max() - data_np.min())
    data_np += np.linspace(start - data_np[0], end - data_np[-1],len(data_np))
    return data_np
randomData=generateRandomData(1000, 0.5, 50, 100, 66, 80)

如果您愿意修改代码,这将起作用:

import random
for_fill=[]
# generate 1000 samples within the specified range and save them in for_fill
for x in range(1000):
    generate_rnd_df=random.uniform(50,100)
    for_fill.append(generate_rnd_df)
#set starting and end point manually
for_fill[0]=60
for_fill[999]=80
于 2017-10-26T13:36:52.733 回答
1

这是一种方式,在代码中非常粗略地表达。

>>> import random
>>> steps = 1000
>>> start = 66
>>> end = 80
>>> step_size = (50,100)

生成 1,000 步确保在所需范围内。

>>> crude_walk_steps = [random.uniform(*step_size) for _ in range(steps)]
>>> import numpy as np

将这些步骤变成散步,但请注意它们不符合要求。

>>> crude_walk = np.cumsum(crude_walk_steps)
>>> min(crude_walk)
57.099056617839288
>>> max(crude_walk)
75048.948693623403

计算一个简单的线性变换来缩放步骤。

>>> from sympy import *
>>> var('a b')
(a, b)
>>> solve([57.099056617839288*a+b-66,75048.948693623403*a+b-80])
{b: 65.9893403510312, a: 0.000186686954219243}

缩放步骤。

>>> walk = [0.000186686954219243*_+65.9893403510312 for _ in crude_walk]

验证步行现在是否在预期的位置开始和停止。

>>> min(walk)
65.999999999999986
>>> max(walk)
79.999999999999986
于 2017-10-26T14:25:30.293 回答
1

您还可以生成随机游走流并过滤掉那些不符合您的约束的游走。请注意,通过过滤它们不再是真正的“随机”。

下面的代码创建了一个无限的“有效”随机游走流。小心非常严格的约束,“下一个”调用可能需要一段时间;)。

import itertools
import numpy as np


def make_random_walk(first, last, min_val, max_val, size):
    # Generate a sequence of random steps of lenght `size-2`
    # that will be taken bewteen the start and stop values.
    steps = np.random.normal(size=size-2)

    # The walk is the cumsum of those steps
    walk = steps.cumsum()

    # Performing the walk from the start value gives you your series.
    series = walk + first

    # Compare the target min and max values with the observed ones.
    target_min_max = np.array([min_val, max_val])
    observed_min_max = np.array([series.min(), series.max()])

    # Calculate the absolute 'overshoot' for min and max values
    f = np.array([-1, 1])
    overshoot = (observed_min_max*f - target_min_max*f)

    # Calculate the scale factor to constrain the walk within the
    # target min/max values.
    # Don't upscale.
    correction_base = [walk.min(), walk.max()][np.argmax(overshoot)]
    scale = min(1, (correction_base - overshoot.max()) / correction_base)

    # Generate the scaled series
    new_steps = steps * scale
    new_walk = new_steps.cumsum()
    new_series = new_walk + first

    # Check the size of the final step necessary to reach the target endpoint.
    last_step_size = abs(last - new_series[-1]) # step needed to reach desired end

    # Is it larger than the largest previously observed step?
    if last_step_size > np.abs(new_steps).max():
        # If so, consider this series invalid.
        return None
    else:
        # Else, we found a valid series that meets the constraints.
        return np.concatenate((np.array([first]), new_series, np.array([last])))


start = 66
stop = 80
max_val = 100
min_val = 50
size = 1000

# Create an infinite stream of candidate series
candidate_walks = (
    (i, make_random_walk(first=start, last=stop, min_val=min_val, max_val=max_val, size=size))
    for i in itertools.count()
)
# Filter out the invalid ones.
valid_walks = ((i, w) for i, w in candidate_walks if w is not None)

idx, walk = next(valid_walks)  # Get the next valid series
print(
    "Walk #{}: min/max({:.2f}/{:.2f})"
    .format(idx, walk.min(), walk.max())
)
于 2017-10-29T14:42:44.120 回答