我正在开发一个使用平板扫描仪的 Django 应用程序。为了保持与扫描仪的连接,使用了 RPyC。(连接到扫描仪需要很长时间,所以我想多次重新使用连接。)
连接到扫描仪并保持服务运行似乎工作正常。我可以打电话给扫描仪,它会立即做出反应。但我无法保存 PIL 图像文件,请参阅下面的错误。没有 RPyC 就能够保存 PIL 图像文件。
Traceback (most recent call last):
File "/usr/lib/python3.6/code.py", line 91, in runcode
exec(code, self.locals)
File "<console>", line 1, in <module>
File "/home/user/Projects/django-project/project/appname/models.py", line 55, in scan_preview
image_file.save(file_path)
File "/home/user/Envs/project/lib/python3.6/site-packages/rpyc/core/netref.py", line 166, in __getattr__
return syncreq(self, consts.HANDLE_GETATTR, name)
File "/home/user/Envs/project/lib/python3.6/site-packages/rpyc/core/netref.py", line 76, in syncreq
return conn.sync_request(handler, proxy, *args)
File "/home/user/Envs/project/lib/python3.6/site-packages/rpyc/core/protocol.py", line 469, in sync_request
return self.async_request(handler, *args, timeout=timeout).value
File "/home/user/Envs/project/lib/python3.6/site-packages/rpyc/core/async_.py", line 102, in value
raise self._obj
AttributeError: cannot access 'save'
看起来 RPyC 正在尝试保存图像,我不明白为什么。在保存图像之前关闭连接会导致有关流被关闭的错误。
我想要的是 RPyC 在返回 PIL 图像文件后停止打扰我。我希望它返回没有流 mambo jumbo 的 PIL 图像。(我不知道正确的术语,RPyC 的新手。)
扫描仪服务.py
import rpyc
import sane
class Scanner(object):
"""
The Scanner class is used to interact with flatbed scanners.
"""
def __init__(self):
sane.init()
self.device_name = None
self.error_message = None
self.device = self.get_device()
def get_device(self):
"""
Return the first detected scanner and set the name.
@return: sane.SaneDev
"""
devices = sane.get_devices()
print('Available devices:', devices)
# Empty list means no scanner is connect or the connected scanner is
# already being used
if not devices:
self.error_message = 'Scanner disconnect or already being used.'
return None
# open the first scanner you see
try:
device = sane.open(devices[0][0])
except Exception as e:
self.error_message = e
print(e)
return None
brand = devices[0][1]
model = devices[0][2]
self.device_name = "{brand}_{model}".format(
brand=brand,
model=model
)
print("Device name:", self.device_name)
# set to color scanning mode, this is not always the default mode
device.mode = 'color'
return device
def print_options(self):
"""Print the device's options. Useful for development."""
for opt in self.device.get_options():
print(opt, '\n')
def scan_image(self, dpi):
"""
Scan an image.
@param dpi: integer
@return: PIL image
"""
self.device.resolution = dpi
self.device.start()
image = self.device.snap()
return image
def scan_roi(
self,
top_left_x, top_left_y,
bottom_right_x, bottom_right_y,
dpi=2400
):
"""
Scan a Region of Interest.
The ROI is selected by giving two x/y coordinates, one for the top left
and one for the bottom right. This creates a square that the scanner
will scan.
To create a ROI measure the amount of millimeters starting at the top
left corner of the document.
http://www.sane-project.org/html/doc014.html
@param top_left_x: integer
@param top_left_y: integer
@param bottom_right_x: integer
@param bottom_right_y: integer
@param dpi: integer
@return: PIL image
"""
self.device.resolution = dpi
# top left x/y
self.device.tl_x = top_left_x
self.device.tl_y = top_left_y
# bottom right x/y
self.device.br_x = bottom_right_x
self.device.br_y = bottom_right_y
self.device.start()
image = self.device.snap()
return image
class ScannerService(rpyc.Service):
"""
Run the scanner as a service.
Multiple clients can connect to the Scanner Service to use the flatbed
scanner without having to wait for sane to setup a connection.
"""
def __init__(self):
self.scanner = Scanner()
def on_connect(self, conn):
pass
def on_disconnect(self, conn):
pass
def exposed_get_device_name(self):
"""
Return the name of the scanner
@return: string
"""
return self.scanner.device_name
def exposed_scan_image(self, dpi):
"""
Scan an image.
@param dpi: integer
@return: PIL image
"""
image = self.scanner.scan_image(dpi)
return image
def exposed_scan_roi(
self,
top_left_x, top_left_y,
bottom_right_x, bottom_right_y,
dpi=2400
):
"""
Scan a Region of Interest.
@param top_left_x: integer
@param top_left_y: integer
@param bottom_right_x: integer
@param bottom_right_y: integer
@param dpi: integer
@return: PIL image
"""
image = self.scanner.scan_roi(
top_left_x, top_left_y,
bottom_right_x, bottom_right_y,
dpi
)
return image
if __name__ == "__main__":
from rpyc.utils.server import ThreadedServer
t = ThreadedServer(ScannerService(), port=18861)
t.start()
实用程序.py
class ScannerServiceConnection(object):
"""Connect to and interact with the scanner service."""
def __init__(self):
self.connection = self.get_connection()
@staticmethod
def get_connection():
"""
Connect to the scanner service
@return: ScannerService object
"""
connection = rpyc.connect("localhost", 18861)
return connection
def get_device_name(self):
"""
Return the name of the scanner
@return: string
"""
return self.connection.root.exposed_get_device_name()
def scan_image(self, dpi):
"""
Scan an image.
@param dpi: integer
@return: PIL image
"""
image = self.connection.exposed_scan_image(dpi)
return image
def scan_roi(
self,
top_left_x, top_left_y,
bottom_right_x, bottom_right_y,
dpi=2400
):
"""
Scan a Region of Interest.
@param top_left_x: integer
@param top_left_y: integer
@param bottom_right_x: integer
@param bottom_right_y: integer
@param dpi: integer
@return: PIL image
"""
image = self.connection.root.exposed_scan_roi(
top_left_x, top_left_y,
bottom_right_x, bottom_right_y,
dpi
)
return image