2

基本上,每个客户端(clientId与它们关联)都可以推送消息,重要的是,在第一个消息完成处理之前,不会处理来自同一客户端的第二条消息(即使客户端可以连续发送多条消息,并且它们是有序的,并且多个发送消息的客户端理想情况下不应相互干扰)。而且,重要的是,一个工作不应该被处理两次。

我认为使用 Redis 可能能够解决这个问题,我开始使用 Bull 库进行一些快速原型设计,但我显然做得不好,我希望有人知道如何继续。

这是我到目前为止所尝试的:

  1. 创建作业并将它们添加到一个进程的相同队列名称中,使用clientId作为作业名称。
  2. 在 2 个单独的进程上等待大量随机时间的同时消耗作业。
  3. 我尝试添加我正在使用的库提供的默认锁定 ( bull),但它锁定在 jobId 上,它对每个作业都是唯一的,而不是在 clientId 上。

我想要发生的事情:

  • clientId在前一个消费者完成处理之前,其中一个消费者不能从同一个消费者那里接手工作。
  • 但是,它们应该能够clientId毫无问题地(异步地)并行地从不同的 s 中获取项目。(我还没有走到这一步,我现在只处理一个clientId

我得到什么:

  • 两个消费者都从队列中消费尽可能多的项目,而无需等待前一个项目clientId完成。

Redis 甚至是适合这项工作的工具吗?

示例代码

// ./setup.ts
import Queue from 'bull';
import * as uuid from 'uuid';

// Check that when a message is taken from a place, no other message is taken

// TO do that test, have two processes that process messages and one that sets messages, and make the job take a long time

// queue for each room https://stackoverflow.com/questions/54178462/how-does-redis-pubsub-subscribe-mechanism-works/54243792#54243792
// https://groups.google.com/forum/#!topic/redis-db/R09u__3Jzfk

// Make a job not be called stalled, waiting enough time https://github.com/OptimalBits/bull/issues/210#issuecomment-190818353

export async function sleep(ms: number): Promise<void> {
  return new Promise((resolve) => {
    setTimeout(resolve, ms);
  });
}
export interface JobData {
  id: string;
  v: number;
}
export const queue = new Queue<JobData>('messages', 'redis://127.0.0.1:6379');

queue.on('error', (err) => {
  console.error('Uncaught error on queue.', err);
  process.exit(1);
});

export function clientId(): string {
  return uuid.v4();
}

export function randomWait(minms: number, maxms: number): Promise<void> {
  const ms = Math.random() * (maxms - minms) + minms;
  return sleep(ms);
}

// Make a job not be called stalled, waiting enough time https://github.com/OptimalBits/bull/issues/210#issuecomment-190818353
// eslint-disable-next-line @typescript-eslint/ban-ts-comment
//@ts-ignore
queue.LOCK_RENEW_TIME = 5 * 60 * 1000;

// ./create.ts
import { queue, randomWait } from './setup';

const MIN_WAIT = 300;
const MAX_WAIT = 1500;
async function createJobs(n = 10): Promise<void> {
  await randomWait(MIN_WAIT, MAX_WAIT);
  // always same Id
  const clientId = Math.random() > 1 ? 'zero' : 'one';
  for (let index = 0; index < n; index++) {
    await randomWait(MIN_WAIT, MAX_WAIT);
    const job = { id: clientId, v: index };
    await queue.add(clientId, job).catch(console.error);
    console.log('Added job', job);
  }
}

export async function create(nIds = 10, nItems = 10): Promise<void> {
  const jobs = [];
  await randomWait(MIN_WAIT, MAX_WAIT);
  for (let index = 0; index < nIds; index++) {
    await randomWait(MIN_WAIT, MAX_WAIT);
    jobs.push(createJobs(nItems));
    await randomWait(MIN_WAIT, MAX_WAIT);
  }
  await randomWait(MIN_WAIT, MAX_WAIT);
  await Promise.all(jobs)
  process.exit();
}

(function mainCreate(): void {
  create().catch((err) => {
    console.error(err);
    process.exit(1);
  });
})();

// ./consume.ts
import { queue, randomWait, clientId } from './setup';

function startProcessor(minWait = 5000, maxWait = 10000): void {
  queue
    .process('*', 100, async (job) => {
      console.log('LOCKING: ', job.lockKey());
      await job.takeLock();
      const name = job.name;
      const processingId = clientId().split('-', 1)[0];
      try {
        console.log('START: ', processingId, '\tjobName:', name);
        await randomWait(minWait, maxWait);
        const data = job.data;
        console.log('PROCESSING: ', processingId, '\tjobName:', name, '\tdata:', data);
        await randomWait(minWait, maxWait);
        console.log('PROCESSED: ', processingId, '\tjobName:', name, '\tdata:', data);
        await randomWait(minWait, maxWait);
        console.log('FINISHED: ', processingId, '\tjobName:', name, '\tdata:', data);
      } catch (err) {
        console.error(err);
      } finally {
        await job.releaseLock();
      }
    })
    .catch(console.error); // Catches initialization
}

startProcessor();

这是使用 3 个不同的进程运行的,您可能会这样称呼它们(尽管我使用不同的选项卡来更清楚地了解正在发生的事情)

npx ts-node consume.ts & 
npx ts-node consume.ts &
npx ts-node create.ts &
4

2 回答 2

1

我不熟悉 node.js。但是对于 Redis,我会尝试这个,

假设您有client_1,client_2,它们都是事件的发布者。你有三台机器,consumer_1、consumer_2、consumer_3。

  1. 在redis中建立一个任务列表,例如JOB_LIST。
  2. 客户将(LPUSH)作业以特定形式放入此 JOB_LIST,例如“CLIENT_1:[jobcontent]”、“CLIENT_2:[jobcontent]”
  3. 每个消费者以阻塞方式取出作业(Redis 的 RPOP 命令)并处理它们。比如consumer_1拿出一份工作,内容是CLIENT_1:[jobcontent]。它解析内容并识别它来自 CLIENT_1。然后它想检查其他消费者是否已经在处理 CLIENT_1,如果没有,它将锁定密钥以指示它正在处理 CLIENT_1。

它继续使用 Redis SETNX 命令(如果该键不存在则设置)设置一个键“CLIENT_1_PROCESSING”,内容为“consumer_1”,并具有适当的超时时间。例如,任务通常需要一分钟才能完成,您将密钥的超时设置为五分钟,以防万一 consumer_1 崩溃并无限期地持有锁。

如果 SETNX 返回 0,则表示获取 CLIENT_1 的锁失败(有人已经在处理 client_1 的作业)。然后它通过使用 Redis LPUSH 命令将作业(“CLIENT_1:[jobcontent]”的值)返回到 JOB_LIST 的左侧。然后它可能会等待一段时间(休眠几秒钟),然后从右侧 RPOP 另一个任务LIST 的一侧。如果这一次 SETNX 返回 1,则 consumer_1 获取锁。它继续处理作业,完成后,它删除“CLIENT_1_PROCESSING”的键,释放锁。然后它继续 RPOP 另一个工作,依此类推。

需要考虑的一些事项:

  1. JOB_LIST 不公平,例如,较早的作业可能会在以后处理
  2. 锁定部分有点简陋,但足够了。

- - - - - 更新 - - - - - - -

我想出了另一种保持任务井井有条的方法。

对于每个客户(生产者),建立一个列表。像“client_1_list”一样,将作业推送到列表的左侧。将所有客户端名称保存在列表“client_names_list”中,值为“client_1”、“client_2”等。

对于每个消费者(处理器),迭代“client_names_list”,例如consumer_1得到一个“client_1”,检查client_1的key是否被锁定(有人已经在处理client_1的任务),如果没有,就弹出一个值(作业)来自 client_1_list 并锁定 client_1。如果 client_1 被锁定,(可能休眠一秒钟)并迭代到下一个客户端,例如“client_2”,并检查密钥等等。

这样,每个客户(任务生产者)的任务都是按照他们的进入顺序来处理的。

于 2020-07-10T10:52:35.283 回答
0

编辑:我发现关于 BullJS 的问题是在一个处理器上并行启动作业:我们正在使用命名作业,并且在一个队列/处理器上定义了许多命名进程函数。队列/处理器的默认并发因子为 1。因此队列不应并行处理任何作业。

我们提到的设置的问题是,如果您在一个队列上定义了许多(命名的)进程处理程序,则并发性会与每个进程处理程序函数相加:因此,如果您定义三个命名的进程处理程序,则给定的并发因子为 3所有已定义的命名作业的队列。

因此,只需为不应发生并行处理的队列定义一个命名作业,并且所有作业应一个接一个地按顺序运行。

这可能很重要,例如,当将大量作业推入队列并且处理涉及 API 调用时,如果并行处理会产生错误。

以下文字是我回答操作员问题的第一种方法,仅描述了该问题的解决方法。所以最好继续我的编辑:) 并以正确的方式配置您的队列。


我找到了一个简单的解决操作员问题的方法。事实上,BullJS 在一个工作实例上并行处理许多作业:假设您有一个工作实例启动并运行,并将 10 个作业推送到队列中,而不是该工作者并行启动所有进程。

我对 BullJS-queues 的研究表明这不是预期的行为:一个工作人员(BullJS 也称为处理器)应该只在队列处于空闲状态时从队列中启动一个新作业,而不是处理以前的作业。

尽管如此,BullJS 仍然在一名工人身上同时开始工作。

在我们的实现中,在 API 调用期间导致大问题很可能是由 t00 次 API 调用引起的。测试表明,当仅启动一名工作人员时,API 调用完成得很好并给出状态 200。

那么,如果 BullJS 不为我们做这件事(正如操作员所要求的那样),一旦前一个工作完成,如何处理一个又一个工作?我们首先尝试了延迟和其他 BullJS 选项,但这是一种解决方法,而不是我们正在寻找的问题的确切解决方案。至少我们没有阻止 BullJS 一次处理多个工作。

所以我们自己做了,一个接一个地开始工作。

在查看 BullJS API 参考 ( BullJS API Ref ) 后,该解决方案对于我们的用例来说相当简单。

我们只是使用了一个for循环来一个接一个地开始工作。诀窍是使用 BullJS 的

job.finished

作业完成后获取 Promise.resolve 的方法。通过在 for 循环中使用 await,下一个作业会在job.finished Promise 等待(已解决)之后立即开始。这就是 for 循环的好处:Await在其中起作用!

这里有一个关于如何实现预期行为的小代码示例:

for (let i = 0; i < theValues.length; i++) {    

                jobCounter++

                const job = await this.processingQueue.add(
                    'update-values',
                    {
                        value: theValues[i],
            
                    },
                    {
                        // delay: i * 90000,
                        // lifo: true,
                    }
                )

                
                this.jobs[job.id] = {
                    jobType: 'socket',
                    jobSocketId: BackgroundJobTasks.UPDATE_VALUES,
                    data: {
                        value: theValues[i],
                    },
                    jobCount: theValues.length,
                    jobNumber: jobCounter,
                    cumulatedJobId
                }

                await job.finished()
                    .then((val) => {
                        console.log('job finished:: ', val)

                    })

            
        }

重要的部分是真的

await job.finished()

在 for 循环内。leasingValues.length 作业按照预期一个接一个地开始。

这样一来,就不再可能在多个工人之间横向扩展工作了。尽管如此,这种解决方法目前对我们来说还可以。

我会联系optimalbits——BullJS 的制造商,以解决问题。

于 2022-02-03T02:20:08.467 回答