这是我对这个问题的看法。要求用户脚本在 vanilla CPython 中运行意味着您需要为您的迷你语言编写解释器,或者将其编译为 Python 字节码(或使用 Python 作为源语言),然后在执行之前“清理”字节码。
我举了一个简单的例子,假设用户可以用 Python 编写他们的脚本,并且源代码和字节码可以通过从解析树中过滤不安全语法和/或从字节码。
解决方案的第二部分要求用户脚本字节码被看门狗任务定期中断,这将确保用户脚本不超过某些操作码限制,并且所有这些都可以在普通 CPython 上运行。
我的尝试总结,主要集中在问题的第二部分。
- 用户脚本是用 Python 编写的。
- 使用byteplay过滤和修改字节码。
- 检测用户的字节码以插入操作码计数器并调用上下文切换到看门狗任务的函数。
- 使用greenlet执行用户的字节码,在用户脚本和看门狗协程之间进行 yield 切换。
- 看门狗对在引发错误之前可以执行的操作码数量实施预设限制。
希望这至少朝着正确的方向发展。当您到达解决方案时,我有兴趣了解更多有关您的解决方案的信息。
源代码lowperf.py
:
# std
import ast
import dis
import sys
from pprint import pprint
# vendor
import byteplay
import greenlet
# bytecode snippet to increment our global opcode counter
INCREMENT = [
(byteplay.LOAD_GLOBAL, '__op_counter'),
(byteplay.LOAD_CONST, 1),
(byteplay.INPLACE_ADD, None),
(byteplay.STORE_GLOBAL, '__op_counter')
]
# bytecode snippet to perform a yield to our watchdog tasklet.
YIELD = [
(byteplay.LOAD_GLOBAL, '__yield'),
(byteplay.LOAD_GLOBAL, '__op_counter'),
(byteplay.CALL_FUNCTION, 1),
(byteplay.POP_TOP, None)
]
def instrument(orig):
"""
Instrument bytecode. We place a call to our yield function before
jumps and returns. You could choose alternate places depending on
your use case.
"""
line_count = 0
res = []
for op, arg in orig.code:
line_count += 1
# NOTE: you could put an advanced bytecode filter here.
# whenever a code block is loaded we must instrument it
if op == byteplay.LOAD_CONST and isinstance(arg, byteplay.Code):
code = instrument(arg)
res.append((op, code))
continue
# 'setlineno' opcode is a safe place to increment our global
# opcode counter.
if op == byteplay.SetLineno:
res += INCREMENT
line_count += 1
# append the opcode and its argument
res.append((op, arg))
# if we're at a jump or return, or we've processed 10 lines of
# source code, insert a call to our yield function. you could
# choose other places to yield more appropriate for your app.
if op in (byteplay.JUMP_ABSOLUTE, byteplay.RETURN_VALUE) \
or line_count > 10:
res += YIELD
line_count = 0
# finally, build and return new code object
return byteplay.Code(res, orig.freevars, orig.args, orig.varargs,
orig.varkwargs, orig.newlocals, orig.name, orig.filename,
orig.firstlineno, orig.docstring)
def transform(path):
"""
Transform the Python source into a form safe to execute and return
the bytecode.
"""
# NOTE: you could call ast.parse(data, path) here to get an
# abstract syntax tree, then filter that tree down before compiling
# it into bytecode. i've skipped that step as it is pretty verbose.
data = open(path, 'rb').read()
suite = compile(data, path, 'exec')
orig = byteplay.Code.from_code(suite)
return instrument(orig)
def execute(path, limit = 40):
"""
This transforms the user's source code into bytecode, instrumenting
it, then kicks off the watchdog and user script tasklets.
"""
code = transform(path)
target = greenlet.greenlet(run_task)
def watcher_task(op_count):
"""
Task which is yielded to by the user script, making sure it doesn't
use too many resources.
"""
while 1:
if op_count > limit:
raise RuntimeError("script used too many resources")
op_count = target.switch()
watcher = greenlet.greenlet(watcher_task)
target.switch(code, watcher.switch)
def run_task(code, yield_func):
"This is the greenlet task which runs our user's script."
globals_ = {'__yield': yield_func, '__op_counter': 0}
eval(code.to_code(), globals_, globals_)
execute(sys.argv[1])
这是一个示例用户脚本user.py
:
def otherfunc(b):
return b * 7
def myfunc(a):
for i in range(0, 20):
print i, otherfunc(i + a + 3)
myfunc(2)
这是一个示例运行:
% python lowperf.py user.py
0 35
1 42
2 49
3 56
4 63
5 70
6 77
7 84
8 91
9 98
10 105
11 112
Traceback (most recent call last):
File "lowperf.py", line 114, in <module>
execute(sys.argv[1])
File "lowperf.py", line 105, in execute
target.switch(code, watcher.switch)
File "lowperf.py", line 101, in watcher_task
raise RuntimeError("script used too many resources")
RuntimeError: script used too many resources