使用 dspace 1.8.x 我试图了解消费者生产者机制,例如注册一个合适的消费者。
然而,在 BasicDispatcher 类中发生了一些奇怪的事情。
它在循环中重复消费者执行,我不知道为什么。
这里令人不安的线和代码可以在下面找到
while (!vnts.isEmpty() && dispatchLoop < ConfigurationManager.getIntProperty("maxDispatchLoops", 3))
就在顶部。
/**
* Dispatch all events added to this Context according to configured
* consumers.
*
* @param ctx
* the execution context
*/
public void dispatch(Context ctx)
{
if (!consumers.isEmpty())
{
List<Event> vnts = ctx.getEvents();
int dispatchLoop=0;
while (!vnts.isEmpty() && dispatchLoop < ConfigurationManager.getIntProperty("maxDispatchLoops", 3)) {
List<Event> events=new LinkedList<Event>(vnts);
log.debug(dispatchLoop + " - " + ctx.getEvents().size());
vnts.clear();
log.debug(ctx.getEvents().size());
if (events == null)
{
return;
}
if (log.isDebugEnabled())
{
log.debug("Processing queue of "
+ String.valueOf(events.size()) + " events.");
}
// transaction identifier applies to all events created in
// this context for the current transaction. Prefix it with
// some letters so RDF readers don't mistake it for an integer.
String tid = "TX" + Utils.generateKey();
for (Event event : events)
{
event.setDispatcher(getIdentifier());
event.setTransactionID(tid);
if (log.isDebugEnabled())
{
log.debug("Iterating over "
+ String.valueOf(consumers.values().size())
+ " consumers...");
}
for (Iterator ci = consumers.values().iterator(); ci.hasNext();)
{
ConsumerProfile cp = (ConsumerProfile) ci.next();
if (event.pass(cp.getFilters()))
{
if (log.isDebugEnabled())
{
log.debug("Sending event to \"" + cp.getName()
+ "\": " + event.toString());
}
try
{
consume(cp, event, ctx);
}
catch (Exception e)
{
log.error("Consumer(\"" + cp.getName()
+ "\").consume threw: " + e.toString(), e);
}
}
}
}
// Call end on the consumers that got synchronous events.
for (Iterator ci = consumers.values().iterator(); ci.hasNext();)
{
ConsumerProfile cp = (ConsumerProfile) ci.next();
if (cp != null)
{
if (log.isDebugEnabled())
{
log.debug("Calling end for consumer \"" + cp.getName()
+ "\"");
}
try
{
if (cp.isAsynchronous()) {
if (thread == null || !thread.isAlive()) {
thread = new ConsumerThread();
thread.queue.add(new ConsumerStuff(cp));
thread.start();
} else
thread.queue.add(new ConsumerStuff(cp));
} else {
cp.getConsumer().end(ctx);
}
}
catch (Exception e)
{
log.error("Error in Consumer(\"" + cp.getName()
+ "\").end: " + e.toString(), e);
}
}
}
dispatchLoop++;
if(ctx.getEvents()!=null)
vnts = Collections.synchronizedList(ctx.getEvents());
}
}
}
有没有人知道原因。还有人怎么能改变maxDispatchLoops,发生率是多少?
消费者可以只执行一次吗?