0

我一直在寻找一种方法来直接更改正在运行的模块中的变量。我想要实现的是正在运行负载测试,并且我可以手动调整呼叫速度或其他任何东西。

下面是我刚刚创建的一些代码(未经测试),只是为了给你一个想法。

class A():
    def __init__(self):
        self.value = 1
    def runForever(self):
        while(1):
            print self.value
    def setValue(self, value):
        self.value = value

if __name__ == '__main__':
    #Some code to create the A object and directly apply the value from an human's input
    a = A()

    #Some parallelism or something has to be applied.
    a.runForever()
    a.setValue(raw_input("New value: "))

编辑#1:是的,我知道现在我永远不会点击 a.setValue() :-)

4

2 回答 2

0

您编写的伪代码与 Python 中的 Threading / Multiprocessing 工作方式非常相似。您将希望启动一个(例如)“永远运行”的线程,然后不是直接修改内部速率值,您可能只是通过一个提供新值的队列发送一条消息。

看看这个问题

这是执行您所要求的操作的演示。我更喜欢使用队列直接调用线程/进程。

import Queue  # !!warning. if you use multiprocessing, use multiprocessing.Queue
import threading
import time


def main():
    q = Queue.Queue()
    tester = Tester(q)
    tester.start()
    while True:
        user_input = raw_input("New period in seconds or (q)uit: ")
        if user_input.lower() == 'q':
            break
        try:
            new_speed = float(user_input)
        except ValueError:
            new_speed = None  # ignore junk
        if new_speed is not None:
            q.put(new_speed)
    q.put(Tester.STOP_TOKEN)

class Tester(threading.Thread):
    STOP_TOKEN = '<<stop>>'
    def __init__(self, q):
        threading.Thread.__init__(self)
        self.q = q
        self.speed = 1
    def run(self):
        while True:
            # get from the queue
            try:
                item = self.q.get(block=False)  # don't hang
            except Queue.Empty:
                item = None  # do nothing
            if item:
                # stop when requested
                if item == self.STOP_TOKEN:
                    break  # stop this thread loop
                # otherwise check for a new speed
                try:
                    self.speed = float(item)
                except ValueError:
                    pass  # whatever you like with unknown input
            # do your thing
            self.main_code()
    def main_code(self):
        time.sleep(self.speed)  # or whatever you want to do


if __name__ == '__main__':
    main()
于 2013-10-14T08:08:00.817 回答
0

这是一个多线程示例。此代码将适用于 python 解释器,但不适用于 IDLE 的 Python Shell,因为该raw_input函数的处理方式不同。

from threading import Thread
from time import sleep

class A(Thread):
    def __init__(self):
        Thread.__init__(self)
        self.value = 1
        self.stop_flag = False

    def run(self):
        while not self.stop_flag:
            sleep(1)
            print(self.value)

    def set_value(self, value):
        self.value = value

    def stop(self):
        self.stop_flag = True


if __name__ == '__main__':
    a = A()
    a.start()
    try:
        while 1:
            r = raw_input()
            a.set_value(int(r))
    except:
        a.stop()
于 2013-10-14T08:35:04.670 回答