5

我正在使用 while 循环读取串行数据。但是,我无法控制采样率。

代码本身似乎需要 0.2 秒才能运行,所以我知道我不会比这更快。但我希望能够精确控制采样的速度。

我觉得我可以使用“睡眠”来做到这一点,但问题是循环本身在不同点可能需要更长的时间来读取(具体取决于通过串行数据传输的内容),所以代码会有弥补余额。

例如,假设我想每 1 秒采样一次,循环运行时间从 0.2 秒到 0.3 秒不等。我的代码需要足够聪明才能睡 0.8 秒(如果循环需要 0.2 秒)或 0.7 秒(如果循环需要 0.3 秒)。

import serial
import csv
import time

#open serial stream
    while True:

        #read and print a line
        sample_value=ser.readline()
        sample_time=time.time()-zero
        sample_line=str(sample_time)+','+str(sample_value)
        outfile.write(sample_line)
        print 'time: ',sample_time,', value: ',sample_value
4

3 回答 3

11

只需测量运行代码所花费的每次循环迭代的时间,并sleep相应地:

import time

while True:
    now = time.time()            # get the time
    do_something()               # do your stuff
    elapsed = time.time() - now  # how long was it running?
    time.sleep(1.-elapsed)       # sleep accordingly so the full iteration takes 1 second

当然不是 100% 完美(可能不时关闭一毫秒或另一毫秒),但我想这已经足够好了。


另一个不错的方法是使用twistedLoopingCall

from twisted.internet import task
from twisted.internet import reactor

def do_something():
    pass # do your work here

task.LoopingCall(do_something).start(1.0)
reactor.run()
于 2012-11-02T15:06:25.530 回答
2

一个相当优雅的方法是你在 UNIX 上工作:使用信号库

编码 :

import signal


def _handle_timeout():
    print "timeout hit" # Do nothing here

def second(count):
    signal.signal(signal.SIGALRM, _handle_timeout)
    signal.alarm(1)
    try:
        count += 1 # put your function here
        signal.pause()

    finally:
        signal.alarm(0)
        return count


if __name__ == '__main__':

    count = 0
    count = second(count)
    count = second(count)
    count = second(count)
    count = second(count)
    count = second(count)

    print count

和时间:

 georgesl@cleese:~/Bureau$ time python timer.py
 5

 real   0m5.081s
 user   0m0.068s
 sys    0m0.004s

但是有两个警告:它只适用于 *nix,而且它不是多线程安全的。

于 2012-11-02T16:35:50.023 回答
0

在循环开始时检查是否经过了适当的时间。如果还没有,sleep.

# Set up initial conditions for sample_time outside the loop
sample_period = ???
next_min_time = 0
while True:
    sample_time = time.time() - zero
    if sample_time < next_min_time:
        time.sleep(next_min_time - sample_time)
        continue
    # read and print a line
    sample_value = ser.readline()
    sample_line = str(sample_time)+','+str(sample_value)
    outfile.write(sample_line)
    print 'time: {}, value: {}'.format(sample_time, sample_value)
    next_min_time = sample_time + sample_period
于 2012-11-02T15:01:26.980 回答