前言
我想并行处理数据库表中列出的任务。不寻找工作代码。
设置
- 1 PostgreSQL 数据库服务器D
- 1 处理服务器P
- 1 用户终端T
使用 Python 3.6、psycopg2.7.6、PostgreSQL 11
D保存有要处理的数据的表和一个tasks表。T 的用户ssh进入P,可以发出以下命令:
python -m core.utils.task
该task.py脚本本质上是一个while循环,它t从Dtasks上的表中获取状态为“新”的任务,直到没有新任务为止。任务基本上是另一个名为 的函数的一组参数。它本身将与D建立许多连接以获取需要处理的数据,并在完成后将任务设置为状态“完成”——循环重新开始并获得一个新任务。tdo_something(t)do_something(t)while
为了python -m core.utils.task多次运行,我打开了多个ssh连接。不太好,我知道;threading或者multiprocessing会更好。但他只是为了测试我是否可以两次运行上述命令。
有一个脚本可以管理所有调用的数据库交互pgsql.py,这是获取任务所需的,然后由do_something(t). 我从这篇 SE 帖子中改编了一个单例模式。
伪代码(大部分)
任务.py
import mymodule
import pgsql
def main():
while True:
r, c = pgsql.SQL.select_task() # rows and columns
task = dotdict(dict(zip(c, r[0])))
mymodule.do_something(task)
if __name__ == "__main__":
main()
我的模块.py
import pgsql
def do_something(t):
input = pgsql.SQL.get_images(t.table,t.schema,t.image_id,t.image_directory)
some_other_function(input)
pgsql.SQL.task_status(t.task_id,'done')
pgsql.py
import psycopg2 as pg
class Postgres(object):
"""Adapted from https://softwareengineering.stackexchange.com/a/358061/348371"""
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = object.__new__(cls)
db_config = {'dbname': 'dev01', 'host': 'XXXXXXXX',
'password': 'YYYYY', 'port': 5432, 'user': 'admin'}
try:
print('connecting to PostgreSQL database...')
connection = Postgres._instance.connection = pg.connect(**db_config)
connection.set_session(isolation_level='READ COMMITTED', autocommit=True)
except Exception as error:
print('Error: connection not established {}'.format(error))
Postgres._instance = None
else:
print('connection established')
return cls._instance
def __init__(self):
self.connection = self._instance.connection
def query(self, query):
try:
with self.connection.cursor() as cur:
cur.execute(query)
rows = cur.fetchall()
cols = [desc[0] for desc in cur.description]
except Exception as error:
print('error execting query "{}", error: {}'.format(query, error))
return None
else:
return rows, cols
def __del__(self):
self.connection.close()
db = Postgres()
class SQL():
def select_task():
s = """
UPDATE schema.tasks
SET status = 'ready'
WHERE task_id = ( SELECT task_id
FROM schema.tasks
WHERE tasks.status = 'new'
LIMIT 1)
RETURNING *
;
""".format(m=mode)
return Postgres.query(db, s)
def task_status(id,status):
s = """
UPDATE
schema.tasks
SET
status = '{s}'
WHERE
tasks.task_id = '{id}'
;
""".format(s=status,
id=id)
return Postgres.query(db, s)
问题
这适用于一个ssh连接。任务从数据库中检索并处理,一旦完成,任务设置为“完成”。一旦我在第二个终端中打开第二个ssh连接以运行python -m core.utils.task(也就是说,并行),任务表的完全相同的行都会在两者中处理 - 忽略它们已被更新。
问题
你有什么建议让它发挥作用?有数百万个任务,我需要并行运行它们。在实施之前threading或者multiprocessing我想先用多个ssh连接测试它,坏主意?我摆弄了's中的isolation levelsandautocommit设置,但没有运气。我检查了数据库服务器中的会话,可以看到每个进程都有自己的 PID,只连接一次,就像这个单例模式应该工作一样。非常感谢任何如何处理这个问题的想法或指示!psycopg2set_session()python -m core.utils.task