我正在构建一个工作/任务分配器,它至少在原则上与你的爬虫一样工作。以下是我学到的一些东西:
定义所有事件
服务器和爬虫之间的通信将基于系统中发生的不同事情,例如从服务器分派工作到爬虫,或者爬虫向服务器发送心跳消息。定义系统的事件类型;它们是用例:
DISPATCH_WORK_TO_CRAWLER_EVENT
CRAWLER_NODE_STATUS_EVENT
...
定义消息标准
服务器和爬虫之间的所有通信都应该使用 ZMsg 完成,因此定义一个组织框架的标准,如下所示:
Frame1: "Crawler v1.0" //this is a static header
Frame2: <event type> //ex: "CRAWLER_NODE_STATUS_EVENT"
Frame3: <content xml/json/binary> //content that applies to this event (if any)
现在您可以创建消息验证器来验证对等点之间收到的 ZMsg,因为您有一个标准约定,所有消息都必须遵循。
服务器
使用服务器上的单个ROUTER
与爬虫进行异步和双向通信。此外,使用PUB
套接字来广播心跳消息。
不要阻塞 ROUTER 套接字,POLLER
每 5 秒使用一次循环或其他什么,这允许服务器定期执行其他操作,例如向爬虫广播心跳事件;像这样的东西:
Socket rtr = .. //ZMQ.ROUTER
Socket pub = .. //ZMQ.PUB
ZMQ.Poller poller = new ZMQ.Poller(2)
poller.register( rtr, ZMQ.Poller.POLLIN)
poller.register( pub, ZMQ.Poller.POLLIN)
while (true) {
ZMsg msg = null
poller.poll(5000)
if( poller.pollin(0)){
//messages from crawlers
msg = ZMsg.recvMsg(rtr)
}
//send heartbeat messages
ZMsg hearbeatMsg = ...
//create message content here,
//publish to all crawlers
heartbeatMsg.send(pub)
}
为了解决您关于工人意识的问题,一种简单有效的方法是使用 FIFO 堆栈和心跳消息;像这样的东西:
- 服务器在内存中维护一个简单的 FIFO 堆栈
- 服务器发出心跳;爬虫用他们的节点名称响应;ROUTER 也会自动将节点的地址放入消息中(阅读消息包络)
- 将 1 个对象推送到包含节点名称和节点地址的堆栈上
- 当服务器想要将工作分派给爬虫时,只需从堆栈中弹出下一个对象,创建消息并且地址正确(使用节点地址),然后将其发送给该工作者
- 以同样的方式将更多的工作分派给其他爬虫;当爬虫响应服务器时,只需将另一个具有节点名称/地址的对象推回堆栈;其他工作人员在他们回应之前将无法使用,因此我们不会打扰他们。
这是一种基于工人可用性分配工作的简单但有效的方法,而不是盲目地派出工作。查看lbbroker.php的例子,概念是一样的。
履带式(工人)
工作人员应使用单个DEALER
套接字和SUB
. DEALER
是异步通信的主套接字,SUB 订阅来自服务器的心跳消息。当worker接收到心跳消息时,它会在DEALER套接字上响应服务器。
Socket dlr = .. //ZMQ.DEALER
Socket sub = .. //ZMQ.SUB
ZMQ.Poller poller = new ZMQ.Poller(2)
poller.register( dlr, ZMQ.Poller.POLLIN)
poller.register( sub, ZMQ.Poller.POLLIN)
while (true) {
ZMsg msg = null
poller.poll(5000)
if( poller.pollin(0)){
//message from server
msg = ZMsg.recvMsg(dlr)
}
if( poller.pollin(1)){
//heartbeat message from server
msg = ZMsg.recvMsg(sub)
//reply back with status
ZMsg statusMsg = ...
statusMsg.send(dlr)
}
其余的你可以自己解决。完成PHP示例,构建东西,破坏它,构建更多,这是您学习的唯一方法!
玩得开心,希望对你有帮助!