0

使用 dspace 1.8.x 我试图了解消费者生产者机制,例如注册一个合适的消费者。

然而,在 BasicDispatcher 类中发生了一些奇怪的事情。

它在循环中重复消费者执行,我不知道为什么。

这里令人不安的线和代码可以在下面找到

while (!vnts.isEmpty() && dispatchLoop < ConfigurationManager.getIntProperty("maxDispatchLoops", 3))

就在顶部。

/**
 * Dispatch all events added to this Context according to configured
 * consumers.
 * 
 * @param ctx
 *            the execution context
 */
public void dispatch(Context ctx)
{
    if (!consumers.isEmpty())
    {
        List<Event> vnts = ctx.getEvents();
        int dispatchLoop=0;

        while (!vnts.isEmpty() && dispatchLoop < ConfigurationManager.getIntProperty("maxDispatchLoops", 3)) {
            List<Event> events=new LinkedList<Event>(vnts);
            log.debug(dispatchLoop + " - " + ctx.getEvents().size());
            vnts.clear();
            log.debug(ctx.getEvents().size());

            if (events == null)
            {
                return;
            }

            if (log.isDebugEnabled())
            {
                log.debug("Processing queue of "
                        + String.valueOf(events.size()) + " events.");
            }

            // transaction identifier applies to all events created in
            // this context for the current transaction. Prefix it with
            // some letters so RDF readers don't mistake it for an integer.
            String tid = "TX" + Utils.generateKey();

            for (Event event : events)
            {
                event.setDispatcher(getIdentifier());
                event.setTransactionID(tid);

                if (log.isDebugEnabled())
                {
                    log.debug("Iterating over "
                            + String.valueOf(consumers.values().size())
                            + " consumers...");
                }

                for (Iterator ci = consumers.values().iterator(); ci.hasNext();)
                {
                    ConsumerProfile cp = (ConsumerProfile) ci.next();

                    if (event.pass(cp.getFilters()))
                    {
                        if (log.isDebugEnabled())
                        {
                            log.debug("Sending event to \"" + cp.getName()
                                    + "\": " + event.toString());
                        }

                        try
                        {
                            consume(cp, event, ctx);
                        }
                        catch (Exception e)
                        {
                            log.error("Consumer(\"" + cp.getName()
                                    + "\").consume threw: " + e.toString(), e);
                        }
                    }

                }
            }

            // Call end on the consumers that got synchronous events.
            for (Iterator ci = consumers.values().iterator(); ci.hasNext();)
            {
                ConsumerProfile cp = (ConsumerProfile) ci.next();
                if (cp != null)
                {
                    if (log.isDebugEnabled())
                    {
                        log.debug("Calling end for consumer \"" + cp.getName()
                                + "\"");
                    }

                    try
                    {
                        if (cp.isAsynchronous()) {
                            if (thread == null || !thread.isAlive()) {
                                thread = new ConsumerThread();
                                thread.queue.add(new ConsumerStuff(cp));
                                thread.start();
                            } else
                              thread.queue.add(new ConsumerStuff(cp));
                        } else {
                            cp.getConsumer().end(ctx);
                        }
                    }
                    catch (Exception e)
                    {
                        log.error("Error in Consumer(\"" + cp.getName()
                                + "\").end: " + e.toString(), e);
                    }
                }
            }
            dispatchLoop++;
            if(ctx.getEvents()!=null)
            vnts = Collections.synchronizedList(ctx.getEvents());

        }
    }
}

有没有人知道原因。还有人怎么能改变maxDispatchLoops,发生率是多少?

消费者可以只执行一次吗?

4

1 回答 1

0

你能告诉我们你在哪里找到那个代码吗?它不在官方 DSpace 代码库中,至少不在dspace-1_7、dspace-1_8 或 master 分支的BasicDispatcher类中。配置属性也不存在。也许这是本地定制?

于 2015-05-25T07:02:10.107 回答