以下是3个选项,希望一个可以工作:
- 使用子流程的“严格”
subprocess
方法 PIPE
- 使用方法
pexpect
——“Pexpect 是一个纯 Python 模块,用于生成子应用程序;控制它们;并响应其输出中的预期模式。” -- 并不是说这是您必须从默认的 python 安装中单独获取的唯一非标准包。
- 使用伪终端和旧式
select
读取文件描述符的方法
每种方法都捆绑在一个try_[SOME APPROACH]
函数中。您应该能够更新顶部的 3 个参数,然后在底部注释/取消注释一种方法以试一试。
独立测试两半可能是值得的。换句话说,snort + my rpi.py
(下)。然后,如果可行,我的timed_printer.py
(如下)和你的 python 脚本来切换 RPi GPIO。如果它们都独立工作,那么您可以确信不需要做太多工作即可使整个工作流程正常运行。
代码
import subprocess
_cmd_lst = ['python', '-u', 'timed_printer.py'] # sudo snort -q -A console -i eth0 -c /etc/snort/snort.conf
_rpi_lst = ['python', '-u', 'rpi.py'] # python script that toggles RPi
_alert = 'TIME' # The keyword you're looking for
# in snort output
#===============================================================================
# Simple helper function that calls the RPi toggle script
def toggle_rpi():
subprocess.call(_rpi_lst)
def try_subprocess(cmd_lst, alert, rpi_lst):
p = subprocess.Popen(' '.join(cmd_lst), shell=True, stdout=subprocess.PIPE, bufsize=1)
try:
while True:
for line in iter(p.stdout.readline, b''):
print("try_subprocess() read: %s" % line.strip())
if alert in line:
print("try_subprocess() found alert: %s" % alert)
toggle_rpi()
except KeyboardInterrupt: print(" Caught Ctrl+C -- killing subprocess...")
except Exception as ex: print ex
finally:
print("Cleaning up...")
p.kill()
print("Goodbye.")
def try_pexpect(cmd_lst, alert, rpi_lst):
import pexpect # http://pexpect.sourceforge.net/pexpect.html
p = pexpect.spawn(' '.join(cmd_lst))
try:
while True:
p.expect(alert) # This blocks until <alert> is found in the output of cmd_str
print("try_pexpect() found alert: %s" % alert)
toggle_rpi()
except KeyboardInterrupt: print(" Caught Ctrl+C -- killing subprocess...")
except Exception as ex: print ex
finally:
print("Cleaning up...")
p.close(force=True)
print("Goodbye.")
def try_pty(cmd_lst, alert, rpi_lst, MAX_READ=2048):
import pty, os, select
mfd, sfd = pty.openpty()
p = subprocess.Popen(' '.join(cmd_lst), shell=True, stdout=sfd, bufsize=1)
try:
while True:
rlist, _, _, = select.select([mfd], [], [])
if rlist:
data = os.read(mfd, MAX_READ)
print("try_pty() read: %s" % data.strip())
if not data:
print("try_pty() got EOF -- exiting")
break
if alert in data:
print("try_pty() found alert: %s" % alert)
toggle_rpi()
elif p.poll() is not None:
print("try_pty() had subprocess end -- exiting")
break
except KeyboardInterrupt: print(" Caught Ctrl+C -- killing subprocess...")
except Exception as ex: print ex
finally:
print("Cleaning up...")
os.close(sfd)
os.close(mfd)
p.kill()
print("Goodbye.")
#===============================================================================
try_subprocess(_cmd_lst, _alert, _rpi_lst)
#try_pexpect(_cmd_lst, _alert, _rpi_lst)
#try_pty(_cmd_lst, _alert, _rpi_lst)
测试笔记
为了模拟你的 snort 脚本(一个“挂起”的脚本,然后打印一些东西,然后回到挂起等),我编写了这个简单的 python 脚本,我称之为timed_printer.py
:
import time
while True:
print("TIME: %s" % time.time())
time.sleep(5)
我的rpi.py
文件很简单:
print("TOGGLING OUTPUT PIN")
这里没有明确的输出刷新,试图最好地模拟正常输出。
最后的考虑
第一种方法将一次读取整行。因此,如果您希望您alert
的内容包含在一行中,那么您会没事的。
第二种方法 ( pexpect
) 将阻塞直到alert
遇到。
第三种方法将在数据可用时立即读取,我应该指出这不一定是完整的行。如果您看到try_pty() read:
snort 输出行的片段,导致您错过警报,则需要添加某种缓冲解决方案。
文档
参考文献: 1 , 2