问题定义:我需要使用python获取一些数据大约40分钟。以下工作正常,但唯一的问题是我想在采集期间查看数据。有没有办法以一定的刷新率绘制数据?
下面的代码是从这篇文章修改而来的。
谢谢你们!
# This works for a NI USB-6251
import nidaqmx
from nidaqmx.constants import AcquisitionType
from nidaqmx import stream_readers
import numpy as np
import csv
import time
from datetime import datetime
sample_rate = 2000
samples_to_acq = 2000
wait_time = samples_to_acq/sample_rate # Data Acquisition Time in s
cont_mode = AcquisitionType.CONTINUOUS
iterations = 10
with nidaqmx.Task() as task:
now = datetime.now()
military = now.strftime('%H:%M:%S')
first_header = ['Event 1']
second_header = [f'T. Captura: {military}']
# Two voltage channels
task.ai_channels.add_ai_voltage_chan("Dev1/ai0")
task.ai_channels.add_ai_voltage_chan("Dev1/ai1")
total_wait_time = wait_time * iterations # We will only take 10 measurements, 10 s for this example
samples_to_acq_new = samples_to_acq * iterations # Also multiply by 10 to keep the same ratio, it should be
# Sets source of sample clock, its rate, and number of samples to aquire = buffer size
task.timing.cfg_samp_clk_timing(sample_rate, sample_mode = cont_mode, samps_per_chan = samples_to_acq_new)
start = time.time()
print ('Starting task...') # Just to keep a control in your console
data = np.ndarray((2, samples_to_acq_new), dtype = np.float64) #Creates an array, 2 columns
nidaqmx.stream_readers.AnalogMultiChannelReader(task.in_stream).read_many_sample(data, samples_to_acq_new, timeout = 10) # it should't take that long for this example, check out time for other exercises
# Saving the 2 channels to a csv file. This file is overwritten everytime the program is executed.
# It should appear in the same folder that your program is located.
with open('data.csv', 'w', newline = '') as f:
writer = csv.writer(f)
writer.writerow(first_header)
writer.writerow(second_header)
# adding some blank spaces in btw
writer.writerow('')
writer.writerow('')
x = np.linspace(0, total_wait_time, samples_to_acq_new) # Your x axis (ms), starts from 0, final time is total_wait_time, equally divided by the number of samples you'll capture
for value in range(len(x)):
writer.writerow([x[value], data[0][value], data[1][value]])
elapsed_time = (time.time() - start)
print (f'done in {elapsed_time}')