我在 Simpy 中遇到了同步问题。我的意思是,计算机没有按照我想要的顺序处理事件。我一直在寻找比 Simpy 文档中关于计算机如何对事件进行排队、排序和处理的更多信息。我到处都发现它是根据它们必须被触发的时间进行排序的。在阅读其余内容之前,有人有任何链接或文件可以建议我吗?
更具体地说,我正在尝试建模和模拟现实世界的系统(PoolSystem 类的一个实例),它是一个子系统池,可以进一步分解为子子系统或能够失败(最后一类系统称为原子系统)。总而言之,PoolSystem 由可以是 PoolSystem 或 AtomicSystem 的子系统组成。
例如,一辆汽车可以是这个 PoolSystem 类的一个实例,其中一个引擎作为子系统。但是发动机可以分解成其他几个子系统,例如活塞或火花塞,这些子系统实际上可能会发生故障。在这种情况下,引擎将被定义为 PoolSystem 实例,活塞和火花塞被定义为 AtomicSystem 实例。
AtomicSystem 和 PoolSystem 类基于相同的标准模型。他们都有:
- 如果给定子系统的故障导致整个系统的故障(这意味着必须中断所有其他子系统),则为“关键”布尔属性,该属性为“真”
- 一个“update_order”事件,它充当系统与其子系统(如果有)通信的信号
- 一个“dysfunction_signal”事件,它是子系统告诉他们的系统他们已经失败的信号
- 当给定系统无法正常工作或被更高级别系统中断时触发的“中断”事件
- 一个“update_end”事件,它充当子系统的信号,告诉其更高级别的系统它已完成更新
- 一个“生命周期”属性,它是模拟给定系统的操作服务的过程
我希望以下架构可以帮助您理解刚刚阅读的内容: 定义为池系统的汽车故障
在这个模式中,汽车被定义为一个 PoolSystem 实例,它的子系统是引擎和轮胎。轮胎可能是汽车故障的重要原因,因此它被定义为 AtomicSystem 实例。引擎被定义为另一个 PoolSystem,其子系统是活塞和火花塞,可能会发生故障,因此被定义为 AtomicSystem 实例。
类 AtomicSystem 可以在下面找到:
class AtomicSystem(object):
def __init__(self, env, mtbd, backlog, user_defined_critical=True, ids=None):
self.env = env # environment()
self.mtbd = mtbd # mean time between dysfunction
self.critical = user_defined_critical # boolean
self.ids = ids # list of strings
self.ttd = self.time_to_dysfunction() # time before dysfunction
self.update_order = self.env.event()
self.dysfunction_signal = self.env.event()
self.interrupted = self.env.event()
self.update_end = self.env.event()
self.lifecycle = self.env.process(self.run(backlog))
def time_to_dysfunction(self):
return self.mtbd
def run(self, backlog):
# the atomic system starts service when its update_order event is triggered
yield self.update_order
print("t = " + str(self.env.now) + " : " + self.ids[-1] + " starts service.")
self.update_order = self.env.event()
# atomic system specifies to higher level system that it has started service
self.update_end.succeed()
self.update_end = self.env.event()
try:
# as long as the atomic system remains in this while loop, it is said to be in service.
while True:
start = self.env.now
time_out = self.env.timeout(self.ttd)
# wait for a dysfunction (time_out) or interruption (interrupted) or an update from a higher level system (update_order)
result = yield time_out | self.interrupted | self.update_order
if time_out in result:
print("t = " + str(self.env.now) + " : " + self.ids[-1] + " fails.")
# if the atomic system fails, trigger dysfunction_signal event destined to be detected by higher level system
self.dysfunction_signal.succeed()
# when the atomic system fails, its interrupted event is automatically triggered
self.interrupted.succeed()
if self.ttd > 0:
backlog.append({"Dysfunction time": self.env.now, "IDs": self.ids})
self.ttd = 0
if self.interrupted.triggered:
print("t = " + str(self.env.now) + " : " + self.ids[-1] + " interrupts service.")
if self.ttd > 0:
operation_duration = self.env.now - start
self.ttd -= operation_duration
# the atomic system waits for update_order trigger when it has been interrupted
yield self.update_order
if self.update_order.triggered:
# here, the atomic system returns to service
print("t = " + str(self.env.now) + " : " + self.ids[-1] + " is updated.")
if self.ttd > 0:
operation_duration = self.env.now - start
self.ttd -= operation_duration
self.update_end.succeed()
self.update_order = self.env.event()
self.dysfunction_signal = self.env.event()
self.interrupted = self.env.event()
self.update_end = self.env.event()
except:
# here the atomic system is terminated (end of service)
print("t = " + str(self.env.now) + " : " + self.ids[-1] + " is terminated.")
self.env.exit()
可以在下面找到类 PoolSystem:
class PoolSystem(object):
def __init__(self, env, id, init_subsystems, user_defined_critical=True):
self.env = env
self.id = id
self.subsystems = init_subsystems
self.working_subsystems = [self.subsystems[key] for key in self.subsystems.keys()]
self.critical = user_defined_critical
self.update_order = self.env.event()
self.dysfunction_signal = simpy.AnyOf(self.env, [syst.dysfunction_signal for syst in self.working_subsystems])
self.interrupted = self.env.event()
self.update_end = self.env.event()
self.lifecycle = self.env.process(self.run())
def start_subsystems(self):
for key in self.subsystems.keys():
self.subsystems[key].update_order.succeed()
def run(self):
user_defined_critical = self.critical
# the pool system is started here when its update_order event is triggered
yield self.update_order
print("t = " + str(self.env.now) + " : " + self.id + " starts service.")
self.update_order = self.env.event()
# Here, the pool system starts all of its subsystems (which can be atomic and/or pool systems)
self.start_subsystems()
# here, update_end is triggered if all the update_end events of the subsystems have been triggered
self.update_end = simpy.AllOf(self.env, [self.subsystems[key].update_end for key in self.subsystems.keys()])
yield self.update_end
try:
while True:
# wait for a dysfunction (dysfunction_signal), interruption (interrupted) or an update from a higher level system (update_order)
yield self.dysfunction_signal | self.interrupted | self.update_order
if self.dysfunction_signal.triggered:
crit = []
for syst in self.working_subsystems:
if syst.dysfunction_signal.triggered:
crit.append(syst.critical)
if True in crit: # if one of the failed subsystems is critical (critical = True), then trigger interrupted event()
print("t = " + str(self.env.now) + " : " + self.id + " fails completely.")
# pool system is interrupted
self.critical = user_defined_critical
self.interrupted.succeed()
else:
# no critical subsystem has failed yet so the pool system can continue working (no interruption here)
self.critical = False
self.working_subsystems = [self.subsystems[key] for key in self.subsystems.keys() if
not self.subsystems[key].interrupted.triggered]
if len(self.working_subsystems) is not 0:
print("t = " + str(self.env.now) + " : " + self.id + " fails partially.")
self.dysfunction_signal = simpy.AnyOf(self.env, [syst.dysfunction_signal for syst in
self.working_subsystems])
else:
# pool system is interrupted if all of its subsystems have failed
print("t = " + str(self.env.now) + " : " + self.id + " fails completely (no working EUs).")
self.interrupted.succeed()
if self.interrupted.triggered:
print("t = " + str(self.env.now) + " : " + self.id + " interrupts service.")
# interrupt all subsystems
for key in self.subsystems.keys():
if not self.subsystems[key].interrupted.triggered:
self.subsystems[key].interrupted.succeed()
# waits for update_order from higher level system
yield self.update_order
if self.update_order.triggered:
print("t = " + str(self.env.now) + " : " + self.id + " is updated.")
# update_order has been troggered by higher level system
self.update_order = self.env.event()
self.start_subsystems()
self.update_end = simpy.AllOf(self.env,
[self.subsystems[key].update_end for key in self.subsystems.keys()])
# wait for the end of the update of the subsystems
yield self.update_end
print("t = " + str(self.env.now) + " : " + self.id + " receives update-end signal.")
self.working_subsystems = [self.subsystems[key] for key in self.subsystems.keys()]
self.dysfunction_signal = simpy.AnyOf(self.env,
[syst.dysfunction_signal for syst in self.working_subsystems])
self.interrupted = self.env.event()
except simpy.Interrupt:
# here the pool system is terminated, it leaves service.
for key in self.subsystems.keys():
self.subsystems[key].lifecycle.interrupt()
self.env.exit()
我定义了另外两个类,Eu(继承自 AtomicSystem)和 ModSat(继承自 PoolSystem)。基本上,我正在用几个 Eu 对象(只有两个系统级别)构建一个 modsat 对象。我已经发布了下面的代码:
class Eu(AtomicSystem):
def __init__(self, env, identity, mtbd, backlog, critical=True, ids=None):
self.id = identity
ids.append(self.id)
AtomicSystem.__init__(self, env, mtbd, backlog, critical, ids)
class ModSat(PoolSystem):
def __init__(self, env, digit_id, eu_mtbds_criticals, backlog, critical=True):
identity = "ModSat" + str(digit_id)
self.eus = self.initialize(env, identity, eu_mtbds_criticals, backlog)
PoolSystem.__init__(self, env, identity, self.eus, critical)
def initialize(self, env, identity, eu_mtbds_criticals, backlog):
eus = {}
for i in range(1, len(eu_mtbds_criticals) + 1):
eu_id = "EU" + str(i) + ":" + identity
eu = Eu(env, eu_id, eu_mtbds_criticals[i - 1][0], backlog, eu_mtbds_criticals[i - 1][1], [identity])
eus[eu_id] = eu
return eus
最后,我想测试 ModSat 对象,看看是否可以轻松替换 modsat 对象的故障子系统之一(Eu 类型),而不会影响 modsat 的良好行为。我创建了一个模拟函数,使我能够与 modsat 对象进行交互。我使用以下定义的 2 个 modsat 对象运行测试:
backlog = []
eu_mtbds_criticals1 = [[5, False], [11, False], [19, False]]
eu_mtbds_criticals2 = [[4, False], [27, False], [38, False]]
env = simpy.Environment()
sat1 = ModSat(env, 1, eu_mtbds_criticals1, backlog, True)
sat2 = ModSat(env, 2, eu_mtbds_criticals2, backlog, True)
constellation = {'ModSat1': sat1, 'ModSat2': sat2}
env.process(simulate(constellation, env, backlog))
env.run(until=100)
第一个测试非常简单,使用以下模拟功能:
def simulate(constellation, env, backlog):
for key in constellation.keys():
# start service of each ModSat object included in the constellation dictionary,
# by triggering their update_order event.
constellation[key].update_order.succeed()
# wait for a while to be sure that the modsat objects have been completely simulated.
yield env.timeout(50)
输出是我想要的,因为所有事件似乎都已被计算机以正确的顺序触发和处理:
# the 1st update_order event of PoolSystem is triggered
t = 0 : ModSat1 starts service.
t = 0 : ModSat2 starts service.
# the 1st update_order event of AtomicSystem is triggered
t = 0 : EU1:ModSat1 starts service.
t = 0 : EU3:ModSat1 starts service.
t = 0 : EU2:ModSat1 starts service.
t = 0 : EU2:ModSat2 starts service.
t = 0 : EU1:ModSat2 starts service.
t = 0 : EU3:ModSat2 starts service.
# 1st failure here. Since critical attribute of EU1:ModSat2 is set to False ModSat2 is not interrupted (partial failure)
t = 4 : EU1:ModSat2 fails.
t = 4 : EU1:ModSat2 interrupts service.
t = 4 : ModSat2 fails partially.
# 2nd failure here
t = 5 : EU1:ModSat1 fails.
t = 5 : EU1:ModSat1 interrupts service.
t = 5 : ModSat1 fails partially.
t = 11 : EU2:ModSat1 fails.
t = 11 : EU2:ModSat1 interrupts service.
t = 11 : ModSat1 fails partially.
# here the last failure of ModSat1: ModSat1 is interrupted because it has no more working Eus
t = 19 : EU3:ModSat1 fails.
t = 19 : EU3:ModSat1 interrupts service.
t = 19 : ModSat1 fails completely (no working EUs).
t = 19 : ModSat1 interrupts service.
t = 27 : EU2:ModSat2 fails.
t = 27 : EU2:ModSat2 interrupts service.
t = 27 : ModSat2 fails partially.
# here the last failure of ModSat2: ModSat2 is interrupted because it has no more working Eus
t = 38 : EU3:ModSat2 fails.
t = 38 : EU3:ModSat2 interrupts service.
t = 38 : ModSat2 fails completely (no working EUs).
t = 38 : ModSat2 interrupts service.
现在,我想使用以下模拟函数测试我的代码:
def simulate(constellation, env, backlog):
for key in constellation.keys():
# start service of each ModSat object included in the constellation dictionary,
# by triggering their update_order event.
constellation[key].update_order.succeed()
# detect failure
request_signal = simpy.AnyOf(env, [constellation[key].dysfunction_signal for key in constellation.keys()])
yield request_signal
# The servicer's backlog is updated with the first item of the backlog list
print("t = " + str(env.now) + " : a service request is detected.")
servicer_backlog = []
servicer_backlog.append(backlog[0])
del backlog[0]
# the next line models the servicer time of service
yield env.timeout(5)
# The servicer gets the ID of the failed Eu to replace from its backlog
sat_id = servicer_backlog[0]['IDs'][0]
eu_id = servicer_backlog[0]['IDs'][1]
failed_eu = constellation[sat_id].eus[eu_id]
# the servicer gives the values of the attributes of the failed EU to the new EU
new_eu = Eu(failed_eu.env, failed_eu.id, failed_eu.mtbd, backlog, failed_eu.critical, failed_eu.ids)
# the failed eu is terminated (its service ends)
failed_eu.lifecycle.interrupt()
# the new EU replaces the failed_eu
constellation[sat_id].eus[eu_id] = new_eu
# the modsat concerned by the replacement has its update_order event triggered
constellation[sat_id].update_order.succeed()
print("t = " + str(env.now) + " : a service is provided")
上面的模拟函数只是模拟了用新的替换第一个失败的 Eu。输出是:
# the 1st update_order event of PoolSystem is triggered
t = 0 : ModSat1 starts service.
t = 0 : ModSat2 starts service.
# the 1st update_order event of AtomicSystem is triggered
t = 0 : EU3:ModSat1 starts service.
t = 0 : EU2:ModSat1 starts service.
t = 0 : EU1:ModSat1 starts service.
t = 0 : EU1:ModSat2 starts service.
t = 0 : EU2:ModSat2 starts service.
t = 0 : EU3:ModSat2 starts service.
t = 0 : ModSat1 receives update-end signal.
t = 0 : ModSat2 receives update-end signal.
# the first Eu of modsat2 fails, and its failure is detected by the simulate function
t = 4 : EU1:ModSat2 fails.
t = 4 : EU1:ModSat2 interrupts service.
t = 4 : a service request is detected.
t = 4 : ModSat2 fails partially.
# HERE IS MY CONCERN: at time t = 5, EU1 of modsat1 fails and interrupts service. However, there should be a line "t = 5 : ModSat1 fails partially" which does not appear...
t = 5 : EU1:ModSat1 fails.
t = 5 : EU1:ModSat1 interrupts service.
t = 9 : a service is provided
t = 9 : EU1:ModSat2 is terminated.
t = 9 : ModSat2 is updated.
t = 9 : EU1:ModSat2 starts service.
t = 9 : EU2:ModSat2 is updated.
t = 9 : EU3:ModSat2 is updated.
t = 9 : ModSat2 receives update-end signal.
t = 11 : EU2:ModSat1 fails.
t = 11 : EU2:ModSat1 interrupts service.
t = 13 : EU1:ModSat2 fails.
t = 13 : EU1:ModSat2 interrupts service.
t = 13 : ModSat2 fails partially.
t = 19 : EU3:ModSat1 fails.
t = 19 : EU3:ModSat1 interrupts service.
t = 27 : EU2:ModSat2 fails.
t = 27 : EU2:ModSat2 interrupts service.
t = 27 : ModSat2 fails partially.
t = 38 : EU3:ModSat2 fails.
t = 38 : EU3:ModSat2 interrupts service.
t = 38 : ModSat2 fails completely (no working EUs).
t = 38 : ModSat2 interrupts service.
如上所述,在“t = 5 : EU1:ModSat1 中断服务”行之后应该有一行“t = 5 : ModSat1 部分失败”。但相反,计算机直接跳转到模拟函数的“yield env.timeout(5)”之后的第一行。
我不明白这里发生了什么,我认为这是因为我对 Simpy 如何定义和排序事件队列缺乏了解。我在网上找不到任何关于这里发生了什么的提示。我在 stackoverflow 和其他论坛上没有看到任何此类问题。我很乐意为您提供任何帮助。
我的代码解释起来很长,所以我希望我发布的代码中的注释足够:\
非常感谢!