3

在下面的课程foofoomodule.py,我在run_with_multiprocessing方法中遇到错误。该方法将记录数分解self._data为块并 somefunc()使用数据的子集进行调用,例如somefunc(data[0:800], 800)在第一次迭代中, if limit = 800

run_with_multiprocessing我这样做了,因为运行 10 * 1k 记录与 1 * 10k 记录相比,在执行相同操作的函数变体中显示了巨大的性能改进,只是没有多处理。现在我想用它multiprocessing来看看我是否可以进一步提高性能。

我在 Windows 8.1 上运行 python 3.8.2。我对 python 和多处理相当陌生。非常感谢你的帮助。

# foomodule.py
import multiprocessing

class foo:
    def __init__(self, data, record_count):
        self._data = data
        self._record_count = record_count

    def some_func(self, data, record_count):
        # looping through self._data and doing some work    


    def run_with_multiprocessing(self, limit):
        step = 0
        while step < self._record_count:
            if self._record_count - step < limit:
                proc = multiprocessing.Process(target=self.some_func, args=(self._data[step:self._record_count], self._record_count-step))
                proc.start()
                proc.join()
                step = self._record_count
                break

            proc = multiprocessing.Process(target=self.some_func, args=(self._data[step:self._record_count], self._record_count-step))
            proc.start()
            proc.join()
            step += limit
        return

在中使用类时script.py,出现以下错误:

import foomodule

# data is a mysql result set with, say, 10'000 rows
start = time.time()
bar = foomodule.foo(data, 10000)
limit = 800
bar.run_with_multiprocessing(limit)
end = time.time()
print("finished after " + str(round(end-start, 2)) + "s")

Traceback (most recent call last):
  File "C:/coding/python/project/script.py", line 29, in <module>
    bar.run_with_multiprocessing(limit)
  File "C:\coding\python\project\foomodule.py", line 303, in run_with_multiprocessing
    proc.start()
  File "C:\...\Python\Python38-32\lib\multiprocessing\process.py", line 121, in start
    self._popen = self._Popen(self)
  File "C:\...\Python\Python38-32\lib\multiprocessing\context.py", line 224, in _Popen
    return _default_context.get_context().Process._Popen(process_obj)
  File "C:\...\Python\Python38-32\lib\multiprocessing\context.py", line 326, in _Popen
    return Popen(process_obj)
  File "C:\...\Python\Python38-32\lib\multiprocessing\popen_spawn_win32.py", line 93, in __init__
    reduction.dump(process_obj, to_child)
  File "C:\...\Python\Python38-32\lib\multiprocessing\reduction.py", line 60, in dump
    ForkingPickler(file, protocol).dump(obj)
  File "C:\...\Python\Python38-32\lib\socket.py", line 272, in __getstate__
    raise TypeError(f"cannot pickle {self.__class__.__name__!r} object")
TypeError: cannot pickle 'SSLSocket' object
4

1 回答 1

1

你分裂,你会赢

问题

如果您将SSLSocket对象作为参数添加到 中multiprocessing.Process(),则SSLSocket不能被序列化

解决方案

由于您无法序列化 SSLSocket,因此您可以在子进程中进行(作为参数传递的函数multiprocessing.Process()

服务器

#!/usr/bin/python3
import ssl,multiprocessing
from sources.ClientListener import ClientListener

class SocketServer:
    def __init__(self,**kwargs):
        self.args = kwargs["args"]
        self.__createSocket()

    def __handlerClients(self):
        try:
            while self.sock:
                # The sock.accept() allows create a subprocess when there is a connection established
                # IMPORTANT: I don't add SSL at socket object because else the SSLSocket object can't pickle when pass it by argument in processing.Process()
                conn,addr = self.sock.accept()
                eventChildStop = multiprocessing.Event()
                subprocess = multiprocessing.Process(target=ClientListener, name="client", args=(conn,addr,eventChildStop))
                # This thread is responsible of close the client's child process
                threading.Thread(target=ClientListener.exitSubprocess,name="closeChildProcess",args=(eventChildStop,subprocess,)).start()
                subprocess.start()
                time.sleep(1)
        except:
            None

    def __createSocket(self):
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        #this allows address/port to be reused immediately instead before of the TIME_WAIT state
        # https://stackoverflow.com/questions/12362542/python-server-only-one-usage-of-each-socket-address-is-normally-permitted
        # #sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        self.sock.bind(("",self.PORT))
        self.sock.listen(self.CLIENTS)
        print(logFile().message(f"Good days. I am running ClassAdmin server, listenning {self.CLIENTS} clients by port {self.PORT}...",True,"INFO"))
        #self.sockSSL = self.context.wrap_socket(sock,server_side=True)
        self.__handlerClients()

if __name__=="__main__":
    SocketServer(args=sys.argv)

如您所见,在__handlerClients(self)方法中。我做了一个套接字对象的while循环。对于每次迭代,我都知道是否建立了连接,这要归功于:

conn,addr = self.sock.accept()

所以,我在 , 中传递conn变量multiprocessing.Process(),因为conn它是一个套接字对象。conn和之间的区别self.sockconn有 raddr 参数,self.sock没有它,而 laddr 是 0.0.0.0

self.sock

<socket.socket fd=3, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('0.0.0.0', 7788)>

康恩

<socket.socket fd=5, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=('192.168.0.3', 7788), raddr=('192.168.0.20', 53078)>

多处理

subprocess = multiprocessing.Process(target=ClientListener, name="client", args=(conn,addr,eventChildStop))

是同一个对象。

现在去 ClientListener

客户端监听器

class ClientListener:
    def __init__(self,conn,addr,event):
         # Get the connection's socket object and I in this connection add secure traffic encrypted with SSL thanks to object SSLSocket of socket module
         self.addr = addr
         self.conn = self.__SSLTunnel(conn)
         self.nick = ""
         self.__listenData()

    # This creates a ssl tunnel with the ClassAdmin's certificate and private key
    def __SSLTunnel(self,sock):
        context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
        context.load_cert_chain(Environment.SSL("crt"),Environment.SSL("key"))
        return context.wrap_socket(sock,server_side=True)

    def __listenData(self,sock):
       # [...]

正如您可以在__init__(self,conn,addr,event)I get the connvariable of previous code 中看到的那样。并在self.conn保存相同的对象但通过SSLSocket传递

self.conn = self.__SSLTunnel(conn)
    def __SSLTunnel(self,sock):
        context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
        context.load_cert_chain(Environment.SSL("crt"),Environment.SSL("key"))
        return context.wrap_socket(sock,server_side=True)

重要的

声明SSLSocket是因为它self.conn可以与send()andrecv()方法一起使用。

data = self.conn.recv(1024)

self.conn.send("sig.SystemExit(-5000,'The nick exists and is connected',True)".encode("utf-8"))

self.sock变量不能允许accept()方法。

这会引发错误:

[Errno 22] Invalid argument in /etc/ClassAdmin/sources/ClientListener.py:14

你有什么美好的一天。

于 2021-12-25T00:54:17.050 回答