使用 RxPY 进行说明。
我想从一个函数创建一个 observable,但该函数必须带参数。这个特定示例必须以随机间隔返回我要发送给它的许多预定义代码之一。到目前为止,我的解决方案是使用闭包:
from __future__ import print_function
from rx import Observable
import random
import string
import time
def make_tickers(n = 300, s = 123):
""" generates up to n unique 3-letter strings geach makde up of uppsercase letters"""
random.seed(s)
tickers = [''.join(random.choice(string.ascii_uppercase) for _ in range(3)) for y in range(n)]
tickers = list(set(tickers)) # unique
print(len(tickers))
return(tickers)
def spawn_prices_fn(tickers):
""" returns a function that will return a random element
out of tickers every 20-100 ms, and takes an observable parameter """
def spawner(observer):
while True:
next_tick = random.choice(tickers)
observer.on_next(next_tick)
time.sleep(random.randint(20, 100)/1000.0)
return(spawner)
if __name__ == "__main__":
spawned = spawn_prices_fn(make_tickers())
xx = Observable.create(spawned)
xx.subscribe(lambda s: print(s))
有没有更简单的方法?是否可以将不需要闭包的更多参数发送到 Observable.create 的第一个参数函数?什么是规范建议?