5

我有个问题。我想使用 python 在一定时间内(比如说 1 分钟)向某个主机发送连续的字节流。

到目前为止,这是我的代码:

#! /usr/bin/env python                                                          

import socket
import thread
import time

IP = "192.168.0.2"
PADDING = "a" * 1000 #assume the MTU is slighly above 1000
DATA = PADDING + "this is sentence number = "
PORT = 14444
killed = False
test_time = 60 #60 seconds of testing

def send_data():
  s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  s.connect((IP, PORT))
  count = 1
  starttime = time.clock()
  while elapsed < test_time:
    sent = s.send(DATA + str(count) + "\n")
    if sent == 0: break # assume that if nothing is sent -> connection died
    count = count+1
    elapsed = time.clock() - starttime
    if killed:
      break
  s.close()
  print str(count) + " has been sent"

print "to quit type quit"
thread.start_new_thread(send_data, ())

while True:
  var = raw_input("Enter something: ")
  if var == "quit":
    killed = True

很少有人问,除了每次轮询 time.clock 之外,有没有更好的方法让线程在 60 秒后死掉?当我运行这个程序时,它会正确发送字节,但是当我输入退出时,另一个线程不会死,即使我设置了 var kill = True。我想知道这是为什么?var Killed 的范围应该到达另一个线程吧?

谢谢

4

7 回答 7

5

I recommned using threading module. Even more benefit is to use InterruptableThread for terminating the thread. You do not have to use flag for terminating your thread but exception will occur if you call terminate() on this thread from parent. You can handle exception or not.

import threading, ctypes

class InterruptableThread(threading.Thread):
@classmethod
def _async_raise(cls, tid, excobj):
    res = ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, ctypes.py_object(excobj))
    if res == 0:
        raise ValueError("nonexistent thread id")
    elif res > 1:
        ctypes.pythonapi.PyThreadState_SetAsyncExc(tid, 0)
        raise SystemError("PyThreadState_SetAsyncExc failed")

def raise_exc(self, excobj):
    assert self.isAlive(), "thread must be started"
    for tid, tobj in threading._active.items():
        if tobj is self:
            self._async_raise(tid, excobj)
            return

def terminate(self):
    self.raise_exc(SystemExit)

EDIT: You can rewrite your code like this using another thread that is waiting 1 minute and then killing your other thread

def send_data:
    IP = ...
    # other vars

    ...
    s = socket.socket(.....)

    # no killed checking
    # no time checking
    # just do your work here
    ...
    s.close()


my_thread = InterruptableThread(target=send_data)
my_thread.start()

def one_minute_kill(who):
   time.sleep(60)
   who.terminate()

killer_thread = InterruptableThread(target=one_minute_kill, args=[my_thread])
killer.start()

print "to quit type quit"
while my_thread.isAlive():
  if raw_input("Enter something: ") == "quit":
    my_thread.terminate()
于 2009-03-03T10:20:58.130 回答
2

我不知道如何使用“线程”模块来做到这一点,但我可以使用“线程”模块来做到这一点。我认为这段代码完成了你想要的。

有关线程模块的文档:http: //docs.python.org/library/threading.html

#!/usr/bin/python

import time
from threading import Thread
import threading
import sys

test_time = 10
killed = False

class SillyThread( threading.Thread ):
    def run(self):
        global killed
        starttime = time.time()
        counter = 0
        while (time.time() - starttime) < test_time:
            if killed:
                break
            counter = counter + 1
            time.sleep(0.1)
        print "I did %d loops" % counter

class ManageThread( threading.Thread ):
    def run(self):
        global killed
        while True:
            var = raw_input("Enter something: ")
            if var == "quit":
                killed = True
                break
        print "Got var [%s]" % var

silly = SillyThread()
silly.start()
ManageThread().start()
Thread.join(silly)
print "bye bye"
sys.exit(0)

请注意,我使用 time.time() 而不是 time.clock()。time.clock() 给出了 Unix 上经过的处理器时间(参见http://docs.python.org/library/time.html)。我认为 time.clock() 应该在任何地方都可以使用。我将 test_time 设置为 10 秒,因为我一分钟都没有耐心。

如果我让它运行整整 10 秒,会发生以下情况:

leif@peacock:~/tmp$ ./test.py
Enter something: I did 100 loops
bye bye

如果我输入“退出”,会发生以下情况:

leif@peacock:~/tmp$ ./test.py
Enter something: quit
Got var [quit]
I did 10 loops
bye bye

希望这可以帮助。

于 2009-03-03T05:48:02.203 回答
1

你可以很容易地做到这一点,而无需线程。例如,使用 Twisted,您只需设置一个定时调用和一个生产者:

from twisted.internet.protocol import ClientFactory, Protocol
from twisted.internet import reactor

class Noisy(Protocol):
    def __init__(self, delay, data):
        self.delay = delay
        self.data = data

    def stop(self):
        self.transport.unregisterProducer()
        self.transport.loseConnection()
        reactor.stop()

    def resumeProducing(self):
        self.transport.write(self.data)

    def connectionMade(self):
        self.transport.registerProducer(self, False)
        reactor.callLater(self.delay, self.stop)

factory = ClientFactory()
factory.protocol = lambda: Noisy(60, "hello server")
reactor.connectTCP(host, port, factory)
reactor.run()

与线程方法相比,这具有各种优势。它不依赖守护线程,因此您实际上可以清理网络连接(例如,在必要时发送关闭消息)而不是依赖平台来破坏它。它为您处理所有实际的低级网络代码(您的原始示例在 socket.send 返回 0 的情况下做错了事情;此代码将正确处理这种情况)。您也不必依赖 ctypes 或晦涩的 CPython API 来在另一个线程中引发异常(因此它可以移植到更多版本的 Python 并且实际上可以立即中断阻塞的发送,这与其他一些建议的方法不同)。

于 2009-11-14T02:05:32.617 回答
1

如上所述,使用threading模块,它更容易使用并提供了几个同步原语。它还提供了一个在指定时间后运行的Timer类。

如果您只想让程序退出,您可以简单地将发送线程设置为守护进程。您可以通过在调用 start() 之前调用 setDaemon(True) 来做到这一点(2.6 可能会使用守护进程属性)。只要非守护线程正在运行,Python 就不会退出。

于 2009-03-03T06:11:08.777 回答
0

确保“退出”正常工作,并添加一个小字来测试输入是否正常工作。

if var == "quit":
 print "Hey we got quit"
于 2009-03-03T03:43:47.370 回答
0

变量 elapsed 未初始化。在 while 循环上方将其设置为零。

于 2009-03-03T04:11:21.093 回答
0

测试范围很容易killed

>>> import thread
>>> killed = False
>>> import time
>>> def test():
...  while True:
...   time.sleep(1)
...   if killed:
...     print 'Dead.'
...     break
... 
>>> thread.start_new_thread(test, ())
25479680
>>> time.sleep(3)
>>> killed = True
>>> Dead.
于 2009-03-04T03:54:32.547 回答