可以通过直接使用底层 Akka Persistence 框架进行allPersistenceIds
或currentPersistenceIds
查询来获取所有实体 ID
您可以在 Lagom Online Auction 示例应用程序中看到一个正在使用的示例,位于UserServiceImpl.java
:
public class UserServiceImpl implements UserService {
//...
private final CurrentPersistenceIdsQuery currentIdsQuery;
private final Materializer mat;
@Inject
public UserServiceImpl(PersistentEntityRegistry registry, ActorSystem system, Materializer mat) {
//...
this.mat = mat;
this.currentIdsQuery =
PersistenceQuery.get(system)
.getReadJournalFor(
CassandraReadJournal.class,
CassandraReadJournal.Identifier()
);
//...
}
//...
@Override
public ServiceCall<NotUsed, PSequence<User>> getUsers() {
// Note this should never make production....
return req -> currentIdsQuery.currentPersistenceIds()
.filter(id -> id.startsWith("UserEntity"))
.mapAsync(4, id ->
entityRef(id.substring(10))
.ask(UserCommand.GetUser.INSTANCE))
.filter(Optional::isPresent)
.map(Optional::get)
.runWith(Sink.seq(), mat)
.thenApply(TreePVector::from);
}
//...
}
这种方法虽然可行,但很少是一个好主意。您可能已经注意到示例代码中的注释:“这永远不应该生产”。无法使用这种方法执行聚合命令:您只能将命令一个一个地发送到每个实体。这可能会导致服务集群中节点之间的内存消耗和流量激增。也不可能按实体状态的任何条件过滤此 ID 列表,因为您可能习惯于使用面向行的 SQL 数据模型。
为您的数据定义读取端模型几乎总是更合适的。这采用了一个单独的“读取端”数据存储的形式,该数据存储旨在满足您的应用程序所需的查询类型,以及一个在您的实体发出事件时自动调用的事件处理器,它会更新读取端数据存储以反映这些变化。
Lagom 框架通过管理读取端事件处理器、跟踪它们在事件日志中的位置以及在重新启动或失败时自动重新启动它们来帮助确保应用程序的最终一致性。否则,这种类型的弹性很难实现聚合操作。
(此答案改编自Lagom Framework Google Group 中的相关讨论。)