1

我正在尝试从 MongoDb 中提取对象并将其添加到我当前的有效负载中并将其保存在另一个数据库中:

@Override
public void configure() throws Exception
{
    from(kafkaEndpoint)
            .convertBodyTo(DBObject.class)
            .enrich("mongodb:mongoDb?database=myDbName1&collection=UserColl&operation=findOneByQuery",
                    (original, external) -> {
                        DBObject originalBody = original.getIn().getBody(DBObject.class);
                        DBObject externalBody = external.getIn().getBody(DBObject.class);

                        Map<String, DBObject> map = new HashMap<String, DBObject>();
                        map.put("original", originalBody);
                        map.put("external", externalBody);

                        original.getIn().setBody(map);
                        return original;
                    })
            .to("mongodb:mongoDb?database=myDbName2&collection=UserColl&operation=insert");
}

丰富从包含我的原始对象的 In.body 获取查询的问题......

那么如何将查询({"entity.id": ""})传递给丰富(mongoldb:...)并保留原始对象以将其与结果合并?

谢谢。

4

1 回答 1

0
    @Override
    public void configure() throws Exception
    {
        from(kafkaEndpoint)
                .convertBodyTo(DBObject.class)
                .enrich("direct:findOneByQuery",     // <-------
                        (original, external) -> {
                            DBObject originalBody = original.getIn().getBody(DBObject.class);
                            DBObject externalBody = external.getIn().getBody(DBObject.class);

                            Map<String, DBObject> map = new HashMap<String, DBObject>();
                            map.put("original", originalBody);
                            map.put("external", externalBody);

                            original.getIn().setBody(map);
                            return original;
                        })
                .to("mongodb:mongoDb?database=myDbName2&collection=UserColl&operation=insert");

    }

    from("direct:findOneByQuery")
            .process(new Processor()
            {
                @Override
                public void process(Exchange exchange) throws Exception
                {
                    DBObject body = exchange.getIn().getBody(DBObject.class);
                    DBObject query = BasicDBObjectBuilder.start()
                            .append("entity._id", body.get("_id"))
                            .get();

                    exchange.getIn().setBody(query);
                }
            })
            .to("mongodb:mongoDb?database=myDbName1&collection=UserColl&operation=findOneByQuery");

 //
于 2016-03-24T18:48:23.083 回答