我在Pool.map
与curses
. Python
每当我使用 UI 中断计算更大的工作负载时Pool.map
:curses
它不再对默认屏幕做出反应getch
。无需立即读取任何按下的键(并继续解析它),我可以按下任意数量的键,直到按下回车键。有时(除此之外)甚至 UI 也会中断(比如显示我正常 shell 的一小部分)。
诅咒 UI 包装器
这是一个包装类 ( Screen
),它为我处理 curses UI 的东西:
# -*- coding: utf-8 -*-
import curses
class Screen(object):
def __init__(self):
# create a default screen
self.__mainscr = curses.initscr()
self.__stdscr = curses.newwin(curses.LINES - 2, curses.COLS - 2, 1, 1)
self.__max_height, self.__max_width = self.__stdscr.getmaxyx()
# start coloring
curses.start_color()
curses.use_default_colors()
# define colors
curses.init_pair(1, 197, -1) # red
curses.init_pair(2, 227, -1) # yellow
curses.init_pair(3, curses.COLOR_MAGENTA, -1)
curses.init_pair(4, curses.COLOR_GREEN, -1) # darkgreen
curses.init_pair(5, curses.COLOR_BLUE, -1)
curses.init_pair(6, curses.COLOR_BLACK, curses.COLOR_WHITE)
curses.init_pair(7, curses.COLOR_WHITE, -1)
curses.init_pair(8, curses.COLOR_CYAN, -1)
curses.init_pair(9, 209, -1) # orange
curses.init_pair(10, 47, -1) # green
def add_str_to_scr(self, add_str: str, colorpair: int = 7):
self.__stdscr.addstr(str(add_str), curses.color_pair(colorpair))
def linebreak(self):
self.__stdscr.addstr("\n")
def clear_screen(self):
self.__stdscr.clear()
def refresh_screen(self):
self.__stdscr.refresh()
def wait_for_enter_or_esc(self):
curses.noecho()
while True:
c = self.__stdscr.getch()
if c == 10 or c == 27: # 10: Enter, 27: ESC
break
curses.echo()
def get_user_input_chr(self) -> str:
return chr(self.__stdscr.getch())
def get_user_input_str(self) -> str:
return self.__stdscr.getstr().decode(encoding="utf-8")
实际程序
我写了一个小例子,因为当我Pool.map
在curses
UI 中组合并且工作量很大时,总是会发生上述故障。该代码只是在数组上计算一些无用mult
的add
东西。numpy
import curses
from screen import Screen
from multiprocessing import Pool, cpu_count
import numpy as np
s = Screen() # initializing my Screen wrapper
np.random.seed(1234) # setting the rng fixed to make results comparable
# worker function to simulate workload
def worker(arr):
return arr * 2 + 1
s.clear_screen() # cleans the screen
s.refresh_screen() # displays current buffer's content
s.add_str_to_scr("Start processing data...")
s.linebreak()
s.linebreak()
s.refresh_screen()
# data to feed worker function with (sliced by rows)
data_arr = np.random.rand(8, int(1e7)) # <-- big array for high workload
with Pool(cpu_count()) as p:
buffer = p.map(worker, [data_arr[row] for row in np.ndindex(data_arr.shape[0])])
s.add_str_to_scr("...finished processing:")
s.linebreak()
s.linebreak()
s.refresh_screen()
for row in buffer:
s.add_str_to_scr(row[0:3])
s.linebreak()
s.refresh_screen()
# *Here* the program should wait until the user presses *any* key
# and continue INSTANTLY when any key gets pressed.
# However, for big workloads, it does not react to single key presses,
# but wait for any amount of keys pressed until you hit 'Enter'
s.get_user_input_chr()
curses.endwin()
现在,当我以高工作量执行代码时(即处理一个形状数组(8, int(1e7)
等于 8 行和 10,000,000 列)curse
的getch
中断,我得到了这种行为:
如您所见,我可以q
随心所欲地按(或任何其他键),但curse
'sgetch
没有反应。我必须按 Enter 键才能识别输入。
此外,由于某种原因,第一行被我原来的 shell 输出覆盖。
Pool.map
这种行为仅在计算大约需要 1 秒或更长时间时才会发生。
当我设置data_arr
为一个小数组时,np.random.rand(8, 100)
一切都像魅力一样工作,但是一旦我提供计算需要> = 1秒的大数组,就会出现这个奇怪的错误并破坏我的curses
用户界面。
有任何想法吗?
是Pool.map
不是以某种方式正确地加入了工作进程?