我正在尝试创建一个动态监视 jupyter 笔记本的进程,在修改时编译它们并将它们导入到我当前的文件中,但是我似乎无法执行更新的代码。它只执行加载的第一个版本。
有一个名为的文件producer.py
重复调用此函数:
import fs.fs_util as fs_util
while(True):
fs_util.update_feature_list()
在fs_util.py
我执行以下操作:
from fs.feature import Feature
import inspect
from importlib import reload
import os
def is_subclass_of_feature(o):
return inspect.isclass(o) and issubclass(o, Feature) and o is not Feature
def get_instances_of_features(name):
module = __import__(COMPILED_MODULE, fromlist=[name])
module = reload(module)
feature_members = getattr(module, name)
all_features = inspect.getmembers(feature_members, predicate=is_subclass_of_feature)
return [f[1]() for f in all_features]
此函数由以下方式调用:
def update_feature_list(name):
os.system("jupyter nbconvert --to script {}{} --output {}{}"
.format(PATH + "/" + s3.OUTPUT_PATH, name + JUPYTER_EXTENSION, PATH + "/" + COMPILED_PATH, name))
features = get_instances_of_features(name)
for f in features:
try:
feature = f.create_feature()
except Exception as e:
print(e)
还有其他不相关的代码可以检查文件是否已被修改等。
我可以告诉文件正在正确重新加载,因为当我inspect.getsource(f.create_feature)
在类上使用它时,它会显示更新的源代码,但是在执行期间它会返回旧值。我已经通过更改打印语句以及比较返回值来验证这一点。
此外,对于更多上下文,我尝试导入的文件:
from fs.feature import Feature
class SubFeature(Feature):
def __init__(self):
Feature.__init__(self)
def create_feature(self):
return "hello"
我想知道我做错了什么?