我正在尝试模拟创建管道并使其保持打开状态。在 Bash 中,我会mkfifo /tmp/myfifo; cat > /tmp/myfifo.
在 Python 中,我有:
import sys
import os
import string
import subprocess
import time
from multiprocessing import Process
from termSize import getTerminalSize
from redirect import RedirectStream
def viewer():
path = "/tmp/myfifo"
file = open(path, "w")
subprocess.Popen(["cat"], stdout=file)
def base_menu(func):
def menu():
python_path = sys.executable
fifo_path = "/tmp/myfifo"
called_function = func()
path = called_function[0]
menu_name = called_function[1]
header = "Foobar YO"
term = getTerminalSize()
sys.stderr.write("\x1b[2J\x1b[H")
print header.center(term, '*')
print menu_name.center(term, '+')
print "Please choose which option:"
files = os.listdir(path)
for i in files:
print (str(files.index(i)+1)) + ") "\
+ i.strip(".py")
while True:
choice = raw_input("\nPlease Choose[ENTER for Main]: ")
try:
choice = int(choice)- 1
with open(fifo_path, "w") as file:
with RedirectStream(file):
print(time.strftime("%H:%M:%S",\
time.localtime(time.time()))\
+" -------------------")
except IndexError:
print "error"
return menu
def main_menu():
header = "Foobar YO"
menu_name = "Main Menu"
term = getTerminalSize()
sys.stderr.write("\x1b[2J\x1b[H")
print header.center(term, '*')
print menu_name.center(term, '+')
print "Please choose which option:"
menus = (
servers_menu,
database_menu,
files_menu,
networks_menu,
dns_menu,
blockstorage_menu,
loadbalancers_menu
)
text = "1) Servers\n"\
"2) Databases\n"\
"3) Files\n"\
"4) Networks\n"\
"5) DNS\n"\
"6) Block Storage\n"\
"7) Load Balancers\n"\
"Please Choose: "
value = raw_input(text)
return value, menus
@base_menu
def database_menu():
path = "/home/work/cloud_databases/"
menu_name = "Database Menu"
return path, menu_name
@base_menu
def blockstorage_menu():
path = "/home/work/cloud_blockstorage/"
menu_name = "Block Storage Menu"
return path, menu_name
@base_menu
def dns_menu():
path = "/home/work/cloud_dns/"
menu_name = "DNS Menu"
return path, menu_name
@base_menu
def loadbalancers_menu():
path = "/home/work/cloud_loadbalancers/"
menu_name = "Load Balancers Menu"
return path, menu_name
@base_menu
def networks_menu():
path = "/home/work/cloud_networks/"
menu_name = "Networks Menu"
return path, menu_name
@base_menu
def files_menu():
path = "/home/work/cloudfiles/"
menu_name = "Files Menu"
return path, menu_name
@base_menu
def servers_menu():
path = "/home/work/cloudservers/"
menu_name = "Servers Menu"
return path, menu_name
def main():
p1 = Process(target=viewer)
p1.start()
while True:
value, menus = main_menu()
try:
value = int(value)-1
menus[value]()
except ValueError:
print "oops-ee"
except IndexError:
print "oops-ee"
main()
现在,我有另一个使用 cat < /tmp/myfifo 运行的 shell 会话来保持读取。
问题是,当试图让这个函数 viewer() 与 main 中的其他所有东西并行运行时,我没有得到预期/相同的行为,就像我手动制作、打开和读取 shell 中的管道一样。