我有一个Job Distributor
在不同的网站上发布消息的人Channels
。
此外,我希望有两个(以及未来更多)Consumers
从事不同的任务并在不同的机器上运行。(目前我只有一个,需要扩展它)
让我们为这些任务命名(只是示例):
FIBONACCI
(生成斐波那契数)RANDOMBOOKS
(生成随机句子来写一本书)
这些任务最多运行 2-3 个小时,应该平均分配给每个Consumer
.
每个消费者都可以有x
并行线程来处理这些任务。所以我说:(这些数字只是示例,将被变量替换)
- 机器 1 可以消耗 3 个并行作业
FIBONACCI
和 5 个并行作业RANDOMBOOKS
- 机器 2 可以消耗 7个并行作业
FIBONACCI
和 3 个并行作业RANDOMBOOKS
我怎样才能做到这一点?
我是否必须x
为每个线程启动每个线程Channel
才能收听Consumer
?
我什么时候必须承认这一点?
我目前的唯一方法Consumer
是:为每个任务启动x
线程 - 每个线程都是一个 Defaultconsumer 实现Runnable
。在handleDelivery
方法中,我打电话basicAck(deliveryTag,false)
然后做工作。
进一步:我想将一些任务发送给一个特殊的消费者。如何结合上述公平分配来实现这一目标?
这是我的代码publishing
String QUEUE_NAME = "FIBONACCI";
Channel channel = this.clientManager.getRabbitMQConnection().createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
channel.basicPublish("", QUEUE_NAME,
MessageProperties.BASIC,
Control.getBytes(this.getArgument()));
channel.close();
这是我的代码Consumer
public final class Worker extends DefaultConsumer implements Runnable {
@Override
public void run() {
try {
this.getChannel().queueDeclare(this.jobType.toString(), true, false, false, null);
this.getChannel().basicConsume(this.jobType.toString(), this);
this.getChannel().basicQos(1);
} catch (IOException e) {
// catch something
}
while (true) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Control.getLogger().error("Exception!", e);
}
}
}
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] bytes) throws IOException {
String routingKey = envelope.getRoutingKey();
String contentType = properties.getContentType();
this.getChannel().basicAck(deliveryTag, false); // Is this right?
// Start new Thread for this task with my own ExecutorService
}
}
在这种情况下,课程Worker
开始了两次:一次为FIBUNACCI
一次,一次为RANDOMBOOKS
更新
正如答案所述,RabbitMQ 不是最好的解决方案,但 Couchbase 或 MongoDB 拉取方法是最好的。我是这些系统的新手,有没有人可以向我解释,这将如何实现?