2

我是反应式编程的新手。我需要连接到 Redis 来保存和获取一些数据。redis 实例存在于云中。我正在使用生菜连接工厂建立连接。

与redis建立连接时,请求失败。这是我的 Redis 配置类:

package com.sap.slh.tax.attributes.determination.springwebfluxdemo.config;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
import org.springframework.data.redis.connection.RedisPassword;
import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceClientConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.core.ReactiveRedisOperations;
import org.springframework.data.redis.core.ReactiveRedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializationContext;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.scheduling.annotation.EnableAsync;

import com.sap.slh.tax.attributes.determination.springwebfluxdemo.model.TaxDetails;
import com.sap.slh.tax.attributes.determination.springwebfluxdemo.model.TaxLine;
import com.sap.slh.tax.attributes.determination.springwebfluxdemo.util.JsonUtil;

@Configuration
@EnableAsync
public class RedisConfig {
    private static final Logger log = LoggerFactory.getLogger(RedisConfig.class);

    @Value("${vcap.services.redis.credentials.hostname:10.11.241.101}")
    private String host;

    @Value("${vcap.services.redis.credentials.port:36516}")
    private int port;

    @Value("$vcap.services.redis.credentials.password:123456788")
    private String password;

    @Bean
    public ReactiveRedisConnectionFactory reactiveRedisConnectionFactory() {
        RedisStandaloneConfiguration redisStandaloneConfiguration = new RedisStandaloneConfiguration(host, port);
        redisStandaloneConfiguration.setPassword(RedisPassword.of(password));
        redisStandaloneConfiguration.setDatabase(0);
        log.error("Redis standalone configuration{}",JsonUtil.toJsonString(redisStandaloneConfiguration));
        LettuceClientConfiguration clientConfig = LettuceClientConfiguration.builder().build();
        LettuceConnectionFactory lettuceConnectionFactory = new LettuceConnectionFactory(redisStandaloneConfiguration, clientConfig);
        lettuceConnectionFactory.afterPropertiesSet();
        return lettuceConnectionFactory;

    }

    @Bean
    ReactiveRedisOperations<TaxDetails, TaxLine> redisOperations(
            ReactiveRedisConnectionFactory reactiveRedisConnectionFactory) {
        Jackson2JsonRedisSerializer<TaxDetails> serializer = new Jackson2JsonRedisSerializer<>(TaxDetails.class);
        Jackson2JsonRedisSerializer<TaxLine> serializer1 = new Jackson2JsonRedisSerializer<>(TaxLine.class);
        RedisSerializationContext.RedisSerializationContextBuilder<TaxDetails, TaxLine> builder = RedisSerializationContext
                .newSerializationContext(new StringRedisSerializer());
        RedisSerializationContext<TaxDetails, TaxLine> context = builder.key(serializer).value(serializer1).build();
        ;
        return new ReactiveRedisTemplate<>(
                reactiveRedisConnectionFactory, context);
    }
}

这是我的查找服务类,它在请求期间实际上与 redis 通信


import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.ReactiveRedisOperations;
import org.springframework.stereotype.Service;

import com.sap.slh.tax.attributes.determination.springwebfluxdemo.model.RedisRepo;
import com.sap.slh.tax.attributes.determination.springwebfluxdemo.model.TaxDetails;
import com.sap.slh.tax.attributes.determination.springwebfluxdemo.model.TaxLine;
import com.sap.slh.tax.attributes.determination.springwebfluxdemo.util.JsonUtil;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

@Service
public class RedisTaxLineLookUpService {
    private static final Logger log = LoggerFactory.getLogger(RedisTaxLineLookUpService.class);

    @Autowired
    private ReactiveRedisOperations<TaxDetails, TaxLine> redisOperations;

    public Flux<TaxLine> get(TaxDetails taxDetails) {

        log.info("going to call redis to fetch tax lines{}", JsonUtil.toJsonString(taxDetails));
        return redisOperations.keys(taxDetails).flatMap(redisOperations.opsForValue()::get);

    }

    public Mono<RedisRepo> set(RedisRepo redisRepo) {
        log.info("going to call redis to save tax lines{}", JsonUtil.toJsonString(redisRepo.getTaxDetails()));
        return redisOperations.opsForValue().set(redisRepo.getTaxDetails(), redisRepo.getTaxLine())
                .map(__ -> redisRepo);
    }

}

堆栈跟踪 :

2020-03-26T16:27:54.513+0000 [APP/PROC/WEB/0] OUT org.springframework.data.redis.RedisConnectionFailureException: Unable to connect to Redis; nested exception is io.lettuce.core.RedisConnectionException: Unable to connect to 10.11.241.101:36516 | at org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory$SharedConnection.getNativeConnection(LettuceConnectionFactory.java:1199) | Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException: | Error has been observed at the following site(s): | |_ checkpoint ? Handler com.sap.slh.tax.attributes.determination.springwebfluxdemo.controller.TaxLinesDeterminationController#saveTaxLines(RedisRepo) [DispatcherHandler] | |_ checkpoint ? HTTP POST "/tax/lines/save/" [ExceptionHandlingWebHandler] | Stack trace: | at org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory$SharedConnection.getNativeConnection(LettuceConnectionFactory.java:1199) | at org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory$SharedConnection.getConnection(LettuceConnectionFactory.java:1178) | at org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory.getSharedReactiveConnection(LettuceConnectionFactory.java:952) | at org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory.getReactiveConnection(LettuceConnectionFactory.java:429) | at org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory.getReactiveConnection(LettuceConnectionFactory.java:94) | at org.springframework.data.redis.core.ReactiveRedisTemplate.lambda$doInConnection$0(ReactiveRedisTemplate.java:198) | at reactor.core.publisher.MonoSupplier.call(MonoSupplier.java:85) | at reactor.core.publisher.FluxUsingWhen.subscribe(FluxUsingWhen.java:80) | at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:55) | at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:150) | at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1705) | at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:241) | at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:73) | at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:203) | at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:203) | at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1705) | at reactor.core.publisher.MonoIgnoreThen$ThenAcceptInner.onNext(MonoIgnoreThen.java:296) | at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1705) | at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:144) | at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1705) | at reactor.core.publisher.MonoZip$ZipCoordinator.signal(MonoZip.java:247) | at reactor.core.publisher.MonoZip$ZipInner.onNext(MonoZip.java:329) | at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onNext(MonoPeekTerminal.java:173) | at reactor.core.publisher.FluxDefaultIfEmpty$DefaultIfEmptySubscriber.onNext(FluxDefaultIfEmpty.java:92) | at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:67) | at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:73) | at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1705) | at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:144) | at reactor.core.publisher.FluxContextStart$ContextStartSubscriber.onNext(FluxContextStart.java:103) | at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:287) | at reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onNext(FluxFilterFuseable.java:330) | at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1705) | at reactor.core.publisher.MonoCollect$CollectSubscriber.onComplete(MonoCollect.java:160) | at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:136) | at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:252) | at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:136) | at reactor.netty.channel.FluxReceive.terminateReceiver(FluxReceive.java:419) | at reactor.netty.channel.FluxReceive.drainReceiver(FluxReceive.java:209) | at reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:367) | at reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:363) | at reactor.netty.http.server.HttpServerOperations.onInboundNext(HttpServerOperations.java:489) | at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:90) | at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:377)

任何建议或答案都会非常有帮助!提前致谢 !

4

3 回答 3

1

我在 AWS(EC2 实例)上运行 Redis 时遇到了类似的问题。它在以下之后起作用:

sudo vi /etc/redis/redis.conf

  1. 评论行:bind 127.0.0.1 ::1
  2. 设置线路protected-mode no
  3. 设置线路supervised systemd
  4. sudo systemctl restart redis.service
  5. 检查 AWS 安全组以防万一。
于 2021-11-01T09:20:35.110 回答
1

我使用这个 RedisConfig.java 它对我有用。

@Configuration
@ConfigurationProperties(prefix = "spring.redis")
@Setter
public class RedisConfig {

    private String host;
    private String password;

    @Bean
    @Primary
    public ReactiveRedisConnectionFactory reactiveRedisConnectionFactory(RedisConfiguration defaultRedisConfig) {
        LettuceClientConfiguration clientConfig = LettuceClientConfiguration.builder()
                .useSsl().build();
        return new LettuceConnectionFactory(defaultRedisConfig, clientConfig);
    }

    @Bean
    public RedisConfiguration defaultRedisConfig() {
        RedisStandaloneConfiguration config = new RedisStandaloneConfiguration();
        config.setHostName(host);
        config.setPassword(RedisPassword.of(password));
        return config;
    }
}
于 2020-12-16T02:22:23.730 回答
0

我更新了我的 RedisConfig 类如下:


import java.time.Duration;
import java.util.List;
import java.util.stream.Collectors;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
import org.springframework.data.redis.connection.RedisConfiguration;
import org.springframework.data.redis.connection.RedisNode;
import org.springframework.data.redis.connection.RedisPassword;
import org.springframework.data.redis.connection.RedisSentinelConfiguration;
import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceClientConfiguration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.connection.lettuce.LettucePoolingClientConfiguration;
import org.springframework.data.redis.core.ReactiveRedisOperations;
import org.springframework.data.redis.core.ReactiveRedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.RedisSerializationContext;
import org.springframework.data.redis.serializer.StringRedisSerializer;

import com.sap.slh.tax.attributes.determination.springwebfluxdemo.model.TaxDetails;
import com.sap.slh.tax.attributes.determination.springwebfluxdemo.model.TaxLine;

import io.lettuce.core.RedisURI;
import io.pivotal.cfenv.core.CfEnv;

@Configuration
public class RedisConfig {

    CfEnv cfEnv = new CfEnv();
    String tag = "redis";
    String redisHost = cfEnv.findCredentialsByTag(tag).getHost();

    @Bean
    @Primary
    public ReactiveRedisConnectionFactory reactiveRedisConnectionFactory(RedisConfiguration defaultRedisConfig) {
        LettuceClientConfiguration clientConfig = LettucePoolingClientConfiguration.builder()
                .commandTimeout(Duration.ofMillis(60000)).build();
        return new LettuceConnectionFactory(defaultRedisConfig, clientConfig);
    }

    @Bean
    public RedisConfiguration defaultRedisConfig() {
        if (redisHost != null) {
//          RedisStandaloneConfiguration config = new RedisStandaloneConfiguration("127.0.0.1", 6379);
            RedisStandaloneConfiguration config = new RedisStandaloneConfiguration();
            String redisPort = cfEnv.findCredentialsByTag(tag).getPort();
            String redisPassword = cfEnv.findCredentialsByTag(tag).getPassword();
            config.setHostName(redisHost);
            config.setPassword(RedisPassword.of(redisPassword));
            config.setPort(Integer.parseInt(redisPort));
            config.setDatabase(2);
            return config;
        } else {
            RedisSentinelConfiguration config = new RedisSentinelConfiguration();
            String uri = cfEnv.findCredentialsByTag(tag).getUri();
            RedisURI redisURI = RedisURI.create(uri);
            config.master(redisURI.getSentinelMasterId());
            List<RedisNode> nodes = redisURI.getSentinels().stream()
                    .map(redisUri -> populateNode(redisUri.getHost(), redisUri.getPort())).collect(Collectors.toList());
            nodes.forEach(node -> config.addSentinel(node));
            config.setPassword(RedisPassword.of(redisURI.getPassword()));
            config.setDatabase(2);
            return config;
        }
    }

    @Bean
    public ReactiveRedisOperations<TaxDetails, TaxLine> reactiveRedisTemplate(
        ReactiveRedisConnectionFactory factory) {
        StringRedisSerializer keySerializer = new StringRedisSerializer();
        Jackson2JsonRedisSerializer<TaxLine> valueSerializer = new Jackson2JsonRedisSerializer<>(
            TaxLine.class);
        Jackson2JsonRedisSerializer<TaxDetails> valueSerializer1 = new Jackson2JsonRedisSerializer<>(
                TaxDetails.class);
        RedisSerializationContext.RedisSerializationContextBuilder<TaxDetails, TaxLine> builder = RedisSerializationContext
            .newSerializationContext(keySerializer);
        RedisSerializationContext<TaxDetails, TaxLine> context = builder.key(valueSerializer1).value(valueSerializer).build();
        return new ReactiveRedisTemplate<>(factory, context);
    }

    private RedisNode populateNode(String host, Integer port) {
        return new RedisNode(host, port);
    }

}

cfEnv 的依赖项:

            <groupId>io.pivotal.cfenv</groupId>
            <artifactId>java-cfenv-boot</artifactId>
            <version>2.1.1.RELEASE</version>
        </dependency>
于 2020-03-30T18:13:50.840 回答