1

前言

我想并行处理数据库表中列出的任务。不寻找工作代码。

设置

  • 1 PostgreSQL 数据库服务器D
  • 1 处理服务器P
  • 1 用户终端T

使用 Python 3.6、psycopg2.7.6、PostgreSQL 11

D保存有要处理的数据的表和一个tasks表。T 的用户ssh进入P,可以发出以下命令:

python -m core.utils.task

task.py脚本本质上是一个while循环,它tDtasks上的表中获取状态为“新”的任务,直到没有新任务为止。任务基本上是另一个名为 的函数的一组参数。它本身将与D建立许多连接以获取需要处理的数据,并在完成后将任务设置为状态“完成”——循环重新开始并获得一个新任务。tdo_something(t)do_something(t)while

为了python -m core.utils.task多次运行,我打开了多个ssh连接。不太好,我知道;threading或者multiprocessing会更好。但他只是为了测试我是否可以两次运行上述命令。

有一个脚本可以管理所有调用的数据库交互pgsql.py,这是获取任务所需的,然后由do_something(t). 我从这篇 SE 帖子中改编了一个单例模式。

伪代码(大部分)

任务.py

import mymodule
import pgsql

def main():
    while True:
        r, c = pgsql.SQL.select_task()  # rows and columns
        task = dotdict(dict(zip(c, r[0])))
        mymodule.do_something(task)

if __name__ == "__main__":
    main()

我的模块.py

import pgsql

def do_something(t):
    input = pgsql.SQL.get_images(t.table,t.schema,t.image_id,t.image_directory)
    some_other_function(input)
    pgsql.SQL.task_status(t.task_id,'done')

pgsql.py

import psycopg2 as pg

class Postgres(object):
    """Adapted from https://softwareengineering.stackexchange.com/a/358061/348371"""
    _instance = None

    def __new__(cls):
        if cls._instance is None:
            cls._instance = object.__new__(cls)
            db_config = {'dbname': 'dev01', 'host': 'XXXXXXXX',
                         'password': 'YYYYY', 'port': 5432, 'user': 'admin'}
            try:
                print('connecting to PostgreSQL database...')
                connection = Postgres._instance.connection = pg.connect(**db_config)
                connection.set_session(isolation_level='READ COMMITTED', autocommit=True)
            except Exception as error:
                print('Error: connection not established {}'.format(error))
                Postgres._instance = None

            else:
                print('connection established')

        return cls._instance

    def __init__(self):
        self.connection = self._instance.connection

    def query(self, query):
        try:
            with self.connection.cursor() as cur:
                cur.execute(query)
                rows = cur.fetchall()
                cols = [desc[0] for desc in cur.description]
        except Exception as error:
            print('error execting query "{}", error: {}'.format(query, error))
            return None
        else:
            return rows, cols

    def __del__(self):
        self.connection.close()

db = Postgres()
class SQL():
    def select_task():
        s = """
            UPDATE schema.tasks
               SET status = 'ready'
             WHERE task_id = (  SELECT task_id
                                  FROM schema.tasks
                                 WHERE tasks.status = 'new'
                                 LIMIT 1)
            RETURNING *
            ;
            """.format(m=mode)
        return Postgres.query(db, s)


    def task_status(id,status):
        s = """
            UPDATE
                schema.tasks
            SET
                status = '{s}'
            WHERE
                tasks.task_id = '{id}'
            ;
            """.format(s=status,
                       id=id)
        return Postgres.query(db, s)

问题

这适用于一个ssh连接。任务从数据库中检索并处理,一旦完成,任务设置为“完成”。一旦我在第二个终端中打开第二个ssh连接以运行python -m core.utils.task(也就是说,并行),任务表的完全相同的行都会在两者中处理 - 忽略它们已被更新。

问题

你有什么建议让它发挥作用?有数百万个任务,我需要并行运行它们。在实施之前threading或者multiprocessing我想先用多个ssh连接测试它,坏主意?我摆弄了's中的isolation levelsandautocommit设置,但没有运气。我检查了数据库服务器中的会话,可以看到每个进程都有自己的 PID,只连接一次,就像这个单例模式应该工作一样。非常感谢任何如何处理这个问题的想法或指示!psycopg2set_session()python -m core.utils.task

4

1 回答 1

2

主要问题是执行一项任务不是原子操作。因此,在不同的 ssh 会话中,同一个任务可能会被处理多次。

在此实现中,您可以尝试对任务使用"INPROGRESS"状态,以免检索已在处理的任务(带有"INPROGRESS"状态)。但一定要使用自动提交。

但我会使用线程和数据库连接池来实现这一点。OFFSET并将使用和批量提取任务LIMIT。,do_somethingselect_task函数task_status将实现批量任务。

此外,无需将Postgres类实现为单例。


修改(见下面的评论)

  • 您可以添加FOR UPDATE SKIP LOCKED到当前实现中的 SQL 查询(参见url)。
  • 如果您想使用批处理,则通过一些串行列分隔数据(好吧,或者只是对表中的数据进行排序)。
  • 我使用批处理的实现
  • 这可以使用ThreadPoolExecutor和来实现PersistentConnectionPool
于 2019-10-11T11:01:59.880 回答