TL;DR 我正在寻求帮助来实现下面的大理石图。目的是尽可能对未排序的值进行排序,而无需在扫描执行之间等待时间。
我不是要求全面实施。欢迎任何指导。 我有一个无限热可观察的异步慢速(强制用于测试目的)扫描。这是相关代码:
thread_1_scheduler = ThreadPoolScheduler(1)
thread = ExternalDummyService()
external_obs = thread.subject.publish()
external_obs \
.flat_map(lambda msg: Observable.just(msg).subscribe_on(thread_1_scheduler)) \
.scan(seed=State(0, None), accumulator=slow_scan_msg) \
.subscribe(log, print, lambda: print("SLOW FINISHED"))
external_obs.connect()
thread.start()
def slow_scan_msg(state, msg):
sleep(0.4)
return state \
._replace(count = state.count + 1) \
._replace(last_msg = msg)
这是完整版:https ://pyfiddle.io/fiddle/781a9b29-c541-4cd2-88ba-ef90610f5dbd
这是当前输出(值是随机生成的):
emitting Msg(count=0, timestamp=14.139175415039062)
emitting Msg(count=1, timestamp=6.937265396118164)
emitting Msg(count=2, timestamp=11.461257934570312)
emitting Msg(count=3, timestamp=13.222932815551758)
emitting Msg(count=4, timestamp=5.713462829589844)
SLOW st.count=0 last_msg.counter=0 ts=14.14
SLOW st.count=1 last_msg.counter=1 ts=6.94
SLOW st.count=2 last_msg.counter=2 ts=11.46
SLOW st.count=3 last_msg.counter=3 ts=13.22
SLOW st.count=4 last_msg.counter=4 ts=5.71
SLOW FINISHED
我想对扫描执行之间的待处理消息进行排序。因此,第一个发出的消息将始终是第一个被消费的消息,但下一个消费的消息将是直到该点发出的和未消费的消息的最小值(值)(所有这些消息都在当前版本中,因为即时发射)。依此类推……我认为大理石图比我的解释要好。
请注意,扫描不是在等待完成事件,在发出最后一条消息后它没有开始的唯一原因是因为睡眠。在这里,您有另一个版本,其中睡眠已从扫描中删除并放入 ExternalDummyService。您可以看到这些值在它们发出的那一刻就被消耗掉了。这也显示在大理石图中。
我尝试了to_sorted_list,这是我在 RxPy 中找到的唯一排序方法,但我无法让它工作。
我正在寻找的是这样的:
external_obs \
.flat_map(lambda msg: Observable.just(msg).subscribe_on(thread_1_scheduler)) \
############ buffered_sort() does not exist
.buffered_sort(lambda msg: msg.timestamp) \
############
.scan(seed=State("SLOW", 0, None), accumulator=slow_scan_msg) \
.subscribe(log, print, lambda: print("SLOW FINISHED"))
谢谢