我试图在间隔发布后的 3 分钟内缓存 Kafka 记录,因为它将过期并从缓存中删除。
每个使用在 springboot 中编写的 kafka 消费者获取的传入记录都需要首先在缓存中更新,然后如果它存在,如果它与缓存记录匹配,我需要丢弃下一个重复记录。
我尝试使用咖啡因缓存如下,
@EnableCaching
public class AppCacheManagerConfig {
@Bean
public CacheManager cacheManager(Ticker ticker) {
CaffeineCache bookCache = buildCache("declineRecords", ticker, 3);
SimpleCacheManager cacheManager = new SimpleCacheManager();
cacheManager.setCaches(Collections.singletonList(bookCache));
return cacheManager;
}
private CaffeineCache buildCache(String name, Ticker ticker, int minutesToExpire) {
return new CaffeineCache(name, Caffeine.newBuilder().expireAfterWrite(minutesToExpire, TimeUnit.MINUTES)
.maximumSize(100).ticker(ticker).build());
}
@Bean
public Ticker ticker() {
return Ticker.systemTicker();
}
}
我的卡夫卡消费者如下,
@Autowired
CachingServiceImpl cachingService;
@KafkaListener(topics = "#{'${spring.kafka.consumer.topic}'}", concurrency = "#{'${spring.kafka.consumer.concurrentConsumers}'}", errorHandler = "#{'${spring.kafka.consumer.errorHandler}'}")
public void consume(Message<?> message, Acknowledgment acknowledgment,
@Header(KafkaHeaders.RECEIVED_TIMESTAMP) long createTime) {
logger.info("Recieved Message: " + message.getPayload());
try {
boolean approveTopic = false;
boolean duplicateRecord = false;
if (cachingService.isDuplicateCheck(declineRecord)) {
//do something with records
}
else
{
//do something with records
}
cachingService.putInCache(xmlJSONObj, declineRecord, time);
我的缓存服务如下,
@Component
public class CachingServiceImpl {
private static final Logger logger = LoggerFactory.getLogger(CachingServiceImpl.class);
@Autowired
CacheManager cacheManager;
@Cacheable(value = "declineRecords", key = "#declineRecord", sync = true)
public String putInCache(JSONObject xmlJSONObj, String declineRecord, String time) {
logger.info("Record is Cached for 3 minutes interval check", declineRecord);
cacheManager.getCache("declineRecords").put(declineRecord, time);
return declineRecord;
}
public boolean isDuplicateCheck(String declineRecord) {
if (null != cacheManager.getCache("declineRecords").get(declineRecord)) {
return true;
}
return false;
}
}
但是每次记录进入消费者时,我的缓存总是空的。它没有保存记录。
修改完成:
在完成建议后,我添加了如下配置文件,更多类型的研发删除了一些早期的逻辑,现在缓存按预期工作,但是当所有三个消费者都发送相同的记录时,重复检查失败。
`
@Configuration
public class AppCacheManagerConfig {
public static Cache<String, Object> jsonCache =
Caffeine.newBuilder().expireAfterWrite(3, TimeUnit.MINUTES)
.maximumSize(10000).recordStats().build();
@Bean
public CacheLoader<Object, Object> cacheLoader() {
CacheLoader<Object, Object> cacheLoader = new CacheLoader<Object, Object>() {
@Override
public Object load(Object key) throws Exception {
return null;
}
@Override
public Object reload(Object key, Object oldValue) throws Exception {
return oldValue;
}
};
return cacheLoader;
}
`
现在我使用上面的缓存作为手动放置和获取。