2

我有一个覆盆子 pizero W,它通过 GPIO 引脚连接到流量计和 USB 连接到条形码扫描仪。我有 python 脚本,它使用回调函数在感应到 GPIO 输入时发出警报。这个 python 脚本需要在 pizero 上持续运行,以便识别何时激活流量计并处理输入。

问题是我还有一个通过 USB 连接到 pizero 的条形码扫描仪。我希望 pizero 也能识别何时扫描条形码并处理该输入。

然后,pizero 应该发送一条消息,该消息包含来自流量计的信息和来自条形码扫描仪的信息。

有没有办法在同一个 python 脚本中做到这一点?如何让 pizero 同时监听和处理两个输入?将其分成两个不同的脚本是否更容易实现,如果是这样,我可以同时运行它们并以某种方式统一它们在第三个连续运行的脚本中提供的信息吗?

谢谢!

每个评论的一些澄清(感谢您的输入):

  • 流量计的输入引脚GPIO 17是 SPI 连接
  • 还连接了一个 5V 电源和接地引脚。

该脚本需要在系统启动时运行。我会看,systemctl因为在提到它之前我还没有听说过。

当流量计未连接时,Pi 通常将扫描的条形码识别为键盘输入(即一系列数字后跟换行符)。

当我发送包含流量计和条形码信息的消息时,我需要从 python 发送一个 JSON 对象,其中包括信息片段和接收信息的时间戳。

此 JSON 对象将通过 wifi 发送到具有静态 ip 的树莓派服务器,该服务器与 pizero 位于同一家庭网络上。树莓派服务器可以访问 Django 数据库,该数据库应该将 JSON 对象信息合并到数据库中。

4

2 回答 2

2

另一个可能更简单的选择可能是使用RedisRedis是一个非常高性能的内存数据结构服务器。在 Raspberry Pi、Mac、Linux Windows 或其他机器上安装很简单。它允许您在网络上的任意数量的客户端之间共享原子整数、字符串、列表、散列、队列、集合和有序集合。

所以这个概念可能是有一个单独的程序来监控流量计并将当前读数填充到 Redis 中,只要你喜欢。然后另一个单独的程序读取条形码并将它们填充到Redis中,只要你喜欢。最后,有一个控制程序,可能在您的网络上的其他地方,可以随意获取这两个值。

请注意,您可以在 Raspberry Pi 或任何其他机器上运行Redis服务器。

所以,这是流量计程序 - 只需更改host运行 Redis 的机器的 IP 地址:

#!/usr/bin/env python3

import redis
import time

host='localhost'
port=6379

# Connect to Redis
r = redis.Redis(host,port)

reading = 0
while True:
   # Generate synthetic reading that just increases every 500ms
   reading +=1 
   # Stuff reading into Redis as "fmReading"
   r.set("fmReading",reading)
   time.sleep(0.5)

下面是条码读取程序:

#!/usr/bin/env python3

import redis
import time
from random import random, seed

host='localhost'
port=6379

# Connect to local Redis server
r = redis.Redis(host,port)

# Generate repeatable random numbers
seed(42)

while True:
   # Synthesize barcode and change every 2 seconds
   barcode = "BC" + str(int((random()*1000)))
   # Stuff barcode into Redis as "barcode"
   r.set("barcode",barcode)
   time.sleep(2)

这是主控制程序:

#!/usr/bin/env python3

import redis
import time

host='localhost'
port=6379

# Connect to Redis server
r = redis.Redis(host,port)

while True:
   # Grab latest flowmeter reading and barcode
   fmReading = r.get("fmReading")
   barcode   = r.get("barcode")
   print(f"Main: fmReading={fmReading}, barcode={barcode}")
   time.sleep(1)

样本输出

Main: fmReading=b'10', barcode=b'BC676'
Main: fmReading=b'12', barcode=b'BC892'
Main: fmReading=b'14', barcode=b'BC892'
Main: fmReading=b'16', barcode=b'BC86'
Main: fmReading=b'18', barcode=b'BC86'
Main: fmReading=b'20', barcode=b'BC421'
Main: fmReading=b'22', barcode=b'BC421'
Main: fmReading=b'24', barcode=b'BC29'

请注意,为了调试它,您还可以使用命令行界面从网络上的任何机器获取任何读数到Redis,例如在终端中:

redis-cli get barcode
"BC775"

如果您想在用 PHP 编写的 Web 浏览器中显示值,您也可以使用 PHP 绑定到 Redis 来获取值 - 非常方便!

当然,您可以调整程序以发送每个读数的时间戳 - 可能通过使用 Redis哈希而不是简单的键。您还可以使用 Redis LPUSHBRPOP实现一个队列以在程序之间发送消息。

关键词:Redis、列表、队列、哈希、树莓派

于 2019-10-23T11:32:05.113 回答
1

更新的答案

我为条形码阅读器添加了一些代码。我这样做是为了让条形码阅读器花费可变的时间,最多需要 5 秒来读取读数,而流量计需要恒定的 0.5 秒,因此您可以看到不同的线程以不同的速率相互独立地进行。

#!/usr/bin/env python3

from threading import Lock
import threading
import time
from random import seed
from random import random

# Dummy function to read SPI as I don't have anything attached
def readSPI():
    # Take 0.5s to read
    time.sleep(0.5)
    readSPI.static += 1
    return readSPI.static
readSPI.static=0

class FlowMeter(threading.Thread):
    def __init__(self):
        super(FlowMeter, self).__init__()
        # Create a mutex
        self.mutex = Lock()
        self.currentReading = 0

    def run(self):
        # Continuously read flowmeter and safely update self.currentReading
        while True:
            value = readSPI()
            self.mutex.acquire()
            self.currentReading = value
            self.mutex.release()

    def read(self):
        # Main calls this to get latest reading, we just grab it from internal variable
        self.mutex.acquire()
        value = self.currentReading
        self.mutex.release()
        return value

# Dummy function to read Barcode as I don't have anything attached
def readBarcode():
    # Take variable time, 0..5 seconds, to read
    time.sleep(random()*5)
    result = "BC" + str(int(random()*1000))
    return result

class BarcodeReader(threading.Thread):
    def __init__(self):
        super(BarcodeReader, self).__init__()
        # Create a mutex
        self.mutex = Lock()
        self.currentReading = 0

    def run(self):
        # Continuously read barcode and safely update self.currentReading
        while True:
            value = readBarcode()
            self.mutex.acquire()
            self.currentReading = value
            self.mutex.release()

    def read(self):
        # Main calls this to get latest reading, we just grab it from internal variable
        self.mutex.acquire()
        value = self.currentReading
        self.mutex.release()
        return value

if __name__ == '__main__':

    # Generate repeatable random numbers
    seed(42)

    # Instantiate and start flow meter manager thread
    fmThread = FlowMeter()
    fmThread.daemon = True
    fmThread.start()

    # Instantiate and start barcode reader thread
    bcThread = BarcodeReader()
    bcThread.daemon = True
    bcThread.start()

    # Now you can do other things in main, but always get access to latest readings
    for i in range(20):
        fmReading = fmThread.read()
        bcReading = bcThread.read()
        print(f"Main: i = {i} FlowMeter reading = {fmReading}, Barcode={bcReading}")
        time.sleep(1)

样本输出

Main: i = 0 FlowMeter reading = 0, Barcode=0
Main: i = 1 FlowMeter reading = 1, Barcode=0
Main: i = 2 FlowMeter reading = 3, Barcode=0
Main: i = 3 FlowMeter reading = 5, Barcode=0
Main: i = 4 FlowMeter reading = 7, Barcode=BC25
Main: i = 5 FlowMeter reading = 9, Barcode=BC223
Main: i = 6 FlowMeter reading = 11, Barcode=BC223
Main: i = 7 FlowMeter reading = 13, Barcode=BC223
Main: i = 8 FlowMeter reading = 15, Barcode=BC223
Main: i = 9 FlowMeter reading = 17, Barcode=BC676
Main: i = 10 FlowMeter reading = 19, Barcode=BC676
Main: i = 11 FlowMeter reading = 21, Barcode=BC676
Main: i = 12 FlowMeter reading = 23, Barcode=BC676
Main: i = 13 FlowMeter reading = 25, Barcode=BC86
Main: i = 14 FlowMeter reading = 27, Barcode=BC86
Main: i = 15 FlowMeter reading = 29, Barcode=BC29
Main: i = 16 FlowMeter reading = 31, Barcode=BC505
Main: i = 17 FlowMeter reading = 33, Barcode=BC198
Main: i = 18 FlowMeter reading = 35, Barcode=BC198
Main: i = 19 FlowMeter reading = 37, Barcode=BC198

原始答案

我建议您查看systemdsystemctl在每次系统启动时启动您的应用程序 - 示例在这里

关于同时监控两件事,我建议你使用 Python 的threading模块。这是一个简单的示例,我创建了一个子类对象threading,通过不断读取它并将当前值保存在主程序可以随时读取的变量中来管理您的流量计。您可以启动另一个类似的管理您的条形码阅读器并并行运行它们。我不想那样做,并把你的代码加倍混淆。

#!/usr/bin/env python3

from threading import Lock
import threading
import time

# Dummy function to read SPI as I don't have anything attached
def readSPI():
    readSPI.static += 1
    return readSPI.static
readSPI.static=0

class FlowMeter(threading.Thread):
    def __init__(self):
        super(FlowMeter, self).__init__()
        # Create a mutex
        self.mutex = Lock()
        self.currentReading = 0

    def run(self):
        # Continuously read flowmeter and safely update self.currentReading
        while True:
            value = readSPI()
            self.mutex.acquire()
            self.currentReading = value
            self.mutex.release()
            time.sleep(0.01)

    def read(self):
        # Main calls this to get latest reading, we just grab it from internal variable
        self.mutex.acquire()
        value = self.currentReading
        self.mutex.release()
        return value

if __name__ == '__main__':

    # Instantiate and start flow meter manager thread
    fmThread = FlowMeter()
    fmThread.start()

    # Now you can do other things in main, but always get access to latest reading
    for i in range(100000):
        fmReading = fmThread.read()
        print(f"Main: i = {i} FlowMeter reading = {fmReading}")
        time.sleep(1)

您可以查看使用logging来协调和统一您的调试和日志记录消息 - 请参见此处

您可以查看events让其他线程知道当某事达到临界级别时需要执行的操作 -此处为示例。

于 2019-10-22T09:11:40.107 回答