0

我有一个简单的路由来监听 Redis 频道。由于某种原因,它不起作用。这是我的路线。我验证了数据正在发布到 Redis 频道,我可以使用普通的 Jedis 订阅者将其读回。我在 Jetty 内运行 Camel,它被部署为战争。

public class RedisSubscriberRoute extends RouteBuilder{

    @Override
public void configure() throws Exception {

    from("spring-redis://localhost:6379?command=SUBSCRIBE&channels=mychannel") 
    .process(new Processor() {
            @Override
            public void process(Exchange exchange) throws Exception {
                String res = exchange.getIn().getBody().toString();
                System.out.println("************ " + res); 
                exchange.getOut().setBody(res);
            }
        })
    .to("log:foo");
}

}

更新(2013 年 5 月 10 日美国东部标准时间上午 9:56):添加版本信息

    <properties>
            <spring.version>3.2.2.RELEASE</spring.version>
            <camel.version>2.11.0</camel.version>
            <jetty.version>7.6.8.v20121106</jetty.version>
    </properties>

Redis 服务器版本为 2.6.11

示例 git 项目在这里。 https://github.com/soumyasd/camelredisdemo

2013 年 5 月 10 日更新(美国东部标准时间晚上 10:18):

正如下面评论中所建议的,我将 spring-data 的版本更改为 1.0.0.RELEASE。看起来消息正在发送给订阅者,但我仍然遇到异常。

java.lang.RuntimeException: org.springframework.data.redis.serializer.SerializationException: Cannot deserialize; nested exception is org.springframework.core.serializer.support.SerializationFailedException: Failed to deserialize payload. Is the byte array a result of corresponding serialization for DefaultDeserializer?; nested exception is java.io.StreamCorruptedException: invalid stream header: 77686174
    at org.apache.camel.component.redis.RedisConsumer.onMessage(RedisConsumer.java:73)[camel-spring-redis-2.11.0.jar:2.11.0]
    at org.springframework.data.redis.listener.RedisMessageListenerContainer.executeListener(RedisMessageListenerContainer.java:242)[spring-data-redis-1.0.0.RELEASE.jar:]
    at org.springframework.data.redis.listener.RedisMessageListenerContainer.processMessage(RedisMessageListenerContainer.java:231)[spring-data-redis-1.0.0.RELEASE.jar:]
    at org.springframework.data.redis.listener.RedisMessageListenerContainer$DispatchMessageListener$1.run(RedisMessageListenerContainer.java:726)[spring-data-redis-1.0.0.RELEASE.jar:]
    at java.lang.Thread.run(Thread.java:680)[:1.6.0_45]
4

2 回答 2

1

v 1.0.3.RELEASE 的消费者有问题,请改用 1.0.0.RELEASE。

您得到的例外情况有所不同:Camel 生产者使用 Spring RedisTemplate,而后者又使用 JdkSerializationRedisSerializer。为了使其对称,消费者默认也使用 JdkSerializationRedisSerializer 来反序列化数据。因此,如果您使用 Camel 生产者发布数据,它应该可以正常工作而无需喧嚣。但是,如果您使用其他 redis 客户端(或在您的情况下使用其他一些库)将数据发布到 redis,则必须为使用者使用另一个序列化程序。冗长的解释,但要使其工作实际上是两行:

from("spring-redis://localhost:6379?command=SUBSCRIBE&channels=mychannel&serializer=#serializer")

于 2013-05-11T14:19:19.487 回答
0

这是我必须更改才能完成这项工作的摘要。

  1. 正如@Bilgin Ibryam 指出的那样-您必须使用 spring-data-redis 的 1.0.0.RELEASE 版本(截至 2013 年 5 月 11 日)

    <dependency>
        <groupId>org.springframework.data</groupId>
        <artifactId>spring-data-redis</artifactId>
                    <!-- IMPORTANT - as of 10-May-2013 the Redis Camel
                         component only works with version 1.0.0.RELASE -->
        <version>1.0.0.RELEASE</version>
    </dependency> 
    
  2. 我在pom.xml中使用的其他版本是

    3.2.2.发布 2.11.0 7.6.8.v20121106

  3. 如果您使用 Camel Redis 组件发布和使用,则不必声明不同的序列化程序。在我的例子中,我是从 python 以及使用 Jedis 的普通旧 Java 发布的。我必须更改我的路线以包含序列化程序并在我的spring/camel 配置中定义序列化程序。

    @Override public void configure() 抛出异常 {

    from("spring-redis://localhost:6379?command=SUBSCRIBE&channels=mychannel&serializer=#redisserializer") 
    .process(new Processor() {
            @Override
            public void process(Exchange exchange) throws Exception {
                String res = exchange.getIn().getBody().toString();
                System.out.println("************ " + res); 
                exchange.getOut().setBody(res); 
            }
        })
    .to("log:foo");
    

    }

于 2013-05-12T01:20:02.277 回答