0

我试图在间隔发布后的 3 分钟内缓存 Kafka 记录,因为它将过期并从缓存中删除。

每个使用在 springboot 中编写的 kafka 消费者获取的传入记录都需要首先在缓存中更新,然后如果它存在,如果它与缓存记录匹配,我需要丢弃下一个重复记录。

我尝试使用咖啡因缓存如下,

@EnableCaching
public class AppCacheManagerConfig {

    @Bean
    public CacheManager cacheManager(Ticker ticker) {
        CaffeineCache bookCache = buildCache("declineRecords", ticker, 3);
        SimpleCacheManager cacheManager = new SimpleCacheManager();
        cacheManager.setCaches(Collections.singletonList(bookCache));
        return cacheManager;
    }

    private CaffeineCache buildCache(String name, Ticker ticker, int minutesToExpire) {
        return new CaffeineCache(name, Caffeine.newBuilder().expireAfterWrite(minutesToExpire, TimeUnit.MINUTES)
                .maximumSize(100).ticker(ticker).build());
    }

    @Bean
    public Ticker ticker() {
        return Ticker.systemTicker();
    }

}

我的卡夫卡消费者如下,

@Autowired
    CachingServiceImpl cachingService;

@KafkaListener(topics = "#{'${spring.kafka.consumer.topic}'}", concurrency = "#{'${spring.kafka.consumer.concurrentConsumers}'}", errorHandler = "#{'${spring.kafka.consumer.errorHandler}'}")
    public void consume(Message<?> message, Acknowledgment acknowledgment,
            @Header(KafkaHeaders.RECEIVED_TIMESTAMP) long createTime) {
        logger.info("Recieved Message: " + message.getPayload());
        try {
            boolean approveTopic = false;
            boolean duplicateRecord = false;
if (cachingService.isDuplicateCheck(declineRecord)) {
//do something with records
}
else
{
//do something with records
}
    cachingService.putInCache(xmlJSONObj, declineRecord, time);

我的缓存服务如下,

@Component
public class CachingServiceImpl {
    private static final Logger logger = LoggerFactory.getLogger(CachingServiceImpl.class);
    @Autowired
    CacheManager cacheManager;

    @Cacheable(value = "declineRecords", key = "#declineRecord", sync = true)
    public String putInCache(JSONObject xmlJSONObj, String declineRecord, String time) {
        logger.info("Record is Cached for 3 minutes interval check", declineRecord);
        cacheManager.getCache("declineRecords").put(declineRecord, time);
        return declineRecord;

    }

    public boolean isDuplicateCheck(String declineRecord) {
        if (null != cacheManager.getCache("declineRecords").get(declineRecord)) {
            return true;
        }
        return false;
    }
}

但是每次记录进入消费者时,我的缓存总是空的。它没有保存记录。

修改完成:

在完成建议后,我添加了如下配置文件,更多类型的研发删除了一些早期的逻辑,现在缓存按预期工作,但是当所有三个消费者都发送相同的记录时,重复检查失败。

`

  @Configuration
  public class AppCacheManagerConfig {
  public static Cache<String, Object> jsonCache = 
  Caffeine.newBuilder().expireAfterWrite(3, TimeUnit.MINUTES)
            .maximumSize(10000).recordStats().build();
    @Bean
    public CacheLoader<Object, Object> cacheLoader() {
        CacheLoader<Object, Object> cacheLoader = new CacheLoader<Object, Object>() {
            @Override
            public Object load(Object key) throws Exception {
                return null;
            }

            @Override
            public Object reload(Object key, Object oldValue) throws Exception {
                return oldValue;
            }
        };
        return cacheLoader;
    }

`

现在我使用上面的缓存作为手动放置和获取。

4

1 回答 1

1

我猜您正在尝试为 Kafka 实施重复数据删除。

这是类似的讨论:

https://github.com/spring-projects/spring-kafka/issues/80

这是当前的抽象类,您可以扩展它以获得必要的结果:

https://github.com/spring-projects/spring-kafka/blob/master/spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/AbstractFilteringMessageListener.java

您的缓存服务肯定不正确:Cacheable注释允许标记数据获取器和设置器,以通过 AOP 添加缓存。在代码中,您清楚地实现了自己的一些低级缓存更新逻辑。

至少下一个可能的更改可能会帮助您:

  1. 删除@Cacheable. 您不需要它,因为您手动使用缓存,因此它可能是冲突的根源(尤其是在您使用 时sync = true)。如果有帮助,@EnableCaching也请删除 - 它支持您在此处不需要的与缓存相关的 Spring 注释。

  2. 尝试Ticker使用其他 bean 的适当参数删除 bean。根据您的配置,它不应该是有害的,但通常它仅对测试有帮助,无需另外定义。

  3. 仔细检查什么是declineRecord. 如果是序列化对象,请确保序列化正常工作。

  4. 添加recordStats()缓存并输出stats()到日志以供进一步分析。

于 2020-02-04T00:54:21.533 回答