1

redis 列表,生产者保持 lpush。在另一个线程中,消费者定期从列表中取出所有元素,并对元素进行分类。因为生产者一直在推动,所以必须以原子方式完成全部取出。那么有没有有效的方法来做到这一点?可以使用spring-data-redis 。

// producer
getOpsForList.push(k, v);

// consumer
alist = range(k,0,-1); // take all out
alist.parallelStream() // during which a producer thread could push but I hope it is "blocked".
delete(k);  // list is now empty and push from producer is unblocked.

multiexec没有达到我的目标,因为它实际上提交了lrangelpush而且delete只是在一次交易中。到目前为止,我能想到的唯一方法是保留lpop并添加返回,alist直到列表为空。

编辑,这就是我的想法:当你想确保一个操作只运行一次时,使用watch

watch key
val = get key
val = val + 1
multi
set key val
exec

当您不想被“中断”(不是多线程中断)并且不在乎它运行多少次时,事务(multiexec)就足够了。

multi
val = lrange key 0 -1
delete key
exec

val完成后仍然是一个列表,就像官方文档中所说的那样

事务中的所有命令都被序列化并按顺序执行。在 Redis 事务的执行过程中服务于另一个客户端发出的请求永远不会发生。

除了redis,我把数据操作list.stream.parallelism拿出来了,现在函数只关注数据getter,和上一段代码一模一样。;)

4

1 回答 1

0

一个很好的例子来说明如何使用 WATCH 来创建新的原子操作,否则 Redis 不支持它是实现 ZPOP,这是一个以原子方式从排序集中弹出具有较低分数的元素的命令。

在文档中有一个实现,ZPOP如下所示:

WATCH zset
element = ZRANGE zset 0 0
MULTI
ZREM zset element
EXEC

你需要做的是重复上面的操作如果EXEC失败(即返回Null回复)。生产者操作lpush是原子的,所以不需要使用watch命令。例如:

// consumer pesudo code
do {
  watch(k);
  transaction = multi();
  alist = transaction.range(k,0,-1); 
  transaction.delete(k);  
  status = get status of transaction.exec();
} while(status == null);

alist.parallelStream() 
于 2017-09-11T13:27:35.020 回答