我在 Windows 中有一个交互式控制台程序。我需要按'e''c'之类的击键到cmd窗口。人工操作方便,但程序自动化很难。
现在,我想包装控制台程序,以便更容易在其他程序(如python)中进行操作。
但是,控制台程序使用 'getch()' 来获取键盘输入,这不是标准输入。所以,我不能简单地将密钥发送到标准输入。
有没有人遇到过这个问题?谢谢。
class ThreadWorker(threading.Thread):
def __init__(self, callable, *args, **kwargs):
super(ThreadWorker, self).__init__()
self.callable = callable
self.args = args
self.kwargs = kwargs
self.setDaemon(True)
def run(self):
try:
self.callable(*self.args, **self.kwargs)
except Exception, e:
print e
def start(self):
global running
running = True
threading.Thread.start(self)
def console_presskey(char):
proc.stdin.write(char)
proc.stdin.flush()
def start():
def worker(pipe):
while running:
line = pipe.readline()
if line == '':
break
else:
print line,
proc = Popen("something.exe",stdout=PIPE,stdin=PIPE)
stdout_worker = ThreadWorker(worker, proc.stdout)
stderr_worker = ThreadWorker(worker, proc.stderr)
stdout_worker.start()
stderr_worker.start()
if __name__ == "__main__":
start()
sleep(2)
console_presskey('e')
sleep(2)
console_presskey('c')
编辑:
最后,我使用 win32 SendMessage 函数来完成任务。我分叉了一个新的子进程,然后隐藏它,获取它的 hWnd 和 pid。
这是代码:
import threading
import re
import os
from subprocess import Popen, PIPE,STDOUT,CREATE_NEW_CONSOLE,STARTUPINFO,STARTF_USESHOWWINDOW,SW_HIDE
from time import sleep
import win32process,win32con,win32gui,win32api
TargetPower = 'N/A'
Mode = 'N/A'
Channel = 'N/A'
con_stdin = ''
con_stdout = ''
stdout_worker = ''
stdin_woker = ''
running = False
proc = ''
Console_hwnd = ''
#status = 'NotStarted' # or Running
class ThreadWorker(threading.Thread):
def __init__(self, callable, *args, **kwargs):
super(ThreadWorker, self).__init__()
self.callable = callable
self.args = args
self.kwargs = kwargs
self.setDaemon(True)
def run(self):
try:
self.callable(*self.args, **self.kwargs)
except Exception, e:
print e
def start(self):
global running
running = True
threading.Thread.start(self)
def get_hwnds_for_pid (pid):
def callback (hwnd, hwnds):
#if win32gui.IsWindowVisible(hwnd) and win32gui.IsWindowEnabled(hwnd):
_, found_pid = win32process.GetWindowThreadProcessId(hwnd)
#print hwnd
if found_pid == pid:
hwnds.append(hwnd)
return True
hwnds = []
win32gui.EnumWindows(callback, hwnds)
#if hwnds == []:
#raise
return hwnds
def sendkey(char):
global Console_hwnd
hwnd = Console_hwnd[0]
code = ord(char)
win32api.SendMessage(hwnd, win32con.WM_CHAR, code, 0)
print '[*]',char,'Pressed.'
#Another Keypress example. only works with keycode
#win32api.PostMessage(hwnd, win32con.WM_KEYDOWN, win32con.VK_F9, 0)
#print char,code,"down"
#win32api.PostMessage(hwnd, win32con.WM_KEYUP, code, 0)
#print char,code,"up"
def sendesc():
global Console_hwnd
hwnd = Console_hwnd[0]
win32api.PostMessage(hwnd, win32con.WM_KEYDOWN, win32con.VK_ESCAPE, 0)
print '[*]',"Esc down"
win32api.PostMessage(hwnd, win32con.WM_KEYUP, win32con.VK_ESCAPE, 0)
print '[*]',"Esc up"
def start():
def worker(pipe):
global TargetPower
global Mode
global Channel
global running
while running:
line = pipe.readline()
if line == '':
break
elif line.startswith('|') or line.startswith('==='):
pass
elif line.startswith("Operating in"):
info = line
for i in range(7):
info += pipe.readline()
#print 'ART> '+info,
try:
TargetPower = eval(re.search(r'(Output|Target) (power|Power) = .{4}',info).group(0).split('=')[1])
#27.0
Mode = re.search(r'gOffRate = (.+?),',info).group(1).lstrip().rstrip()
#6MBps
Channel = re.search(r'channel ([\d.]+GHz),',info).group(1)
#2.412GHz
except Exception,e:
TargetPower = 'N/A'
Mode = 'N/A'
Channel = 'N/A'
print e
elif line =='\r\n':
print '>',
else:
print 'ART>'+line,
print 'worker done.'
global proc
si = STARTUPINFO()
si.dwFlags |= STARTF_USESHOWWINDOW
si.wShowWindow = SW_HIDE
#proc = Popen("art.bat",stdout=PIPE,creationflags=CREATE_NEW_CONSOLE) #qt works!
proc = Popen("art.bat",stdout=PIPE,creationflags=CREATE_NEW_CONSOLE,startupinfo=si)
#proc = Popen("art.bat",stdout=PIPE,startupinfo=si) #hidden
#proc = Popen("cmd") #for test
sleep(2)
print '[*] pid: ',proc.pid
global Console_hwnd
Console_hwnd = get_hwnds_for_pid(proc.pid)
print '[*] hwnd:',Console_hwnd[0]
global stdout_worker
global stderr_worker
stdout_worker = ThreadWorker(worker, proc.stdout)
stderr_worker = ThreadWorker(worker, proc.stderr)
stdout_worker.start()
stderr_worker.start()
def stop():
global stdout_worker
global stderr_worker
global running
print 'stop'
global proc
sendesc()
sendesc()
sendesc()
Popen("taskkill /F /T /PID %i"%proc.pid , shell=True)
try:
running = False
TargetPower = 'N/A'
Mode = 'N/A'
Channel = 'N/A'
except Exception,e:
print e
if __name__ == "__main__":
start()
sleep(1)
sendkey('e')
sleep(1)
sendkey('c')
sleep(10)
stop()
while True:
pass