我很惊讶这被证明很难找到。
我需要检测何时使用 python3 添加(插入)具有特定分区标签的 USB 块设备。
有没有办法使用 pyudev 提供 USB 块设备列表?如何使用子系统 =“块”和子系统 =“usb”指定过滤器,它们似乎是互斥的过滤器。
当插入具有名为“XYZ”的分区的 USB 设备时,我需要运行一个脚本来挂载它并运行一个使用该分区上数据的程序。
我很惊讶这被证明很难找到。
我需要检测何时使用 python3 添加(插入)具有特定分区标签的 USB 块设备。
有没有办法使用 pyudev 提供 USB 块设备列表?如何使用子系统 =“块”和子系统 =“usb”指定过滤器,它们似乎是互斥的过滤器。
当插入具有名为“XYZ”的分区的 USB 设备时,我需要运行一个脚本来挂载它并运行一个使用该分区上数据的程序。
从各种 udev 规则、systemd 单元、许多脚本及其组合,我尝试了太多变体,但直到我使用以下代码才取得任何成功。它工作但导致 100% CPU 负载。当我在最后的 while 循环中添加睡眠时间时,它根本不再起作用,甚至还阻止了 PCmanFM 自动挂载。
问题出在 usbEvent.py 进程中。我可以从命令行运行它,它工作得很好。它所做的第一件事是使用 Popen 调用“grep devName /proc/mounts”来等待自动挂载程序挂载分区。Popen 在循环中被调用并添加一些 time.sleep 消除了 CPU 负担,这令人惊讶,因为安装点会在几秒钟内出现。
systemd 运行下面的代码和它产生的 usbEvent.py 进程之间似乎存在一些我不完全理解的相互作用。它们是独立的过程,所以我认为它们应该彼此完全独立。
usbEvent.py 处理程序有效,但识别安装并继续需要更长的时间。它在运行时消耗大约 5% 的 CPU,完成时仅消耗 0.3。为什么超时结束时它没有结束一定是由于p.communicate,但是如果p.poll没有返回None,则该过程应该是完整的并且不应该阻塞......但确实如此!为什么?
该平台是具有 8GB RAM 和 2021 年 1 月发布的 Raspbery Pi OS 的 Raspberry Pi4。
#!/usr/bin/env python3
import os
import time
import subprocess as sp
import pyudev
# This code is run on boot via systemd to detect when
# my custom USB storage device (USB stick, SSD etc)
# is inserted or removed. It spawns a new process to
# handle the event.
context = pyudev.Context()
monitor = pyudev.Monitor.from_netlink(context)
monitor.filter_by('block', device_type="partition")
def log_event(action, device):
devName = device.get('DEVNAME')
devLabel = device.get('ID_FS_LABEL')
if devLabel == "MY_CUSTOM_USB":
sp.Popen(["/home/user/bin/customUSB/usbEvent.py",
action, devName, devLabel],
stdin=sp.DEVNULL, stdout=sp.DEVNULL, stderr=sp.DEVNULL)
observer = pyudev.MonitorObserver(monitor, log_event)
observer.start()
while True:
# pass
time.sleep(0.1)
这是我为使其工作而更改的 usbEvent.py 处理程序的一部分:
# Waits for mount point of "dev" to appear and returns it. Communnicate
def getMountPoint(dev):
out = ""
interval = 0.1
timeout = 5 / interval
while timeout > 0:
p = sp.Popen(["grep", dev, "/proc/mounts"],
text=True, stdout=sp.PIPE, stderr=sp.PIPE)
retCode = p.poll()
if retCode is None:
time.sleep(interval)
else:
out, err = p.communicate() # This should not block but does!
if retCode == 0 and len(out) > 0:
out = out.split()[1]
break
else:
lg.info(f"exit code: {retCode} Error: {err}")
exit(1)
if timeout == 0:
p.terminate()
return out