分类帐 API 允许我们按需提取数据,但是当有与阈值或搜索条件相关的新交易或仅基于交易数据的聚合函数时,是否有一种服务可以推送数据,例如分析。
问问题
127 次
1 回答
3
Ledger API 允许您使用 gRPC 流式处理在分类帐上创建的交易。使用任何 gRPC 生成的客户端,您都可以对代码中的事务实现过滤、转换或聚合。要连接的服务端点的 protobuf 定义是这样的:
service TransactionService {
// Read the ledger's filtered transaction stream for a set of parties.
rpc GetTransactions (GetTransactionsRequest) returns (stream GetTransactionsResponse);
}
例如,在 Java 中订阅事务流将如下所示:
public class Main {
private static void processTransaction(TransactionOuterClass.Transaction tx) {
// process (filter, transform, aggregate) transaction
System.out.printf("received transaction %s\n", tx.getTransactionId());
}
public static void main(String[] args) {
// Create a GRPC channel
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 7600).usePlaintext().build();
// Get ledger identity
LedgerIdentityServiceBlockingStub ledgerIdService = LedgerIdentityServiceGrpc.newBlockingStub(channel);
GetLedgerIdentityResponse identityResponse = ledgerIdService.getLedgerIdentity(GetLedgerIdentityRequest.getDefaultInstance());
String ledgerId = identityResponse.getLedgerId();
// Subscribe observer to transaction stream
String party = "Alice";
TransactionServiceStub transactionService = TransactionServiceGrpc.newStub(channel);
GetTransactionsRequest transactionsRequest = GetTransactionsRequest.newBuilder()
.setLedgerId(ledgerId)
.setBegin(LedgerOffset.newBuilder().setBoundary(LedgerOffset.LedgerBoundary.LEDGER_BEGIN))
.setFilter(TransactionFilter.newBuilder().putFiltersByParty(party, Filters.getDefaultInstance()))
.build();
StreamObserver<GetTransactionsResponse> transactionObserver = new StreamObserver<GetTransactionsResponse>() {
@Override
public void onNext(GetTransactionsResponse value) {
value.getTransactionsList().forEach(Main::processTransaction);
}
@Override
public void onError(Throwable t) {
System.err.printf("%s encountered an error while processing transactions!\n", party);
t.printStackTrace();
}
@Override
public void onCompleted() {
System.out.printf("%s's transactions stream completed.\n", party);
}
};
System.out.printf("%s starts reading transactions.\n", party);
transactionService.getTransactions(transactionsRequest, transactionObserver);
try {
Thread.sleep(Long.MAX_VALUE);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
请注意,这是使用原始 GRPC 生成的代码。SDK 中还提供了反应式绑定,这使得上述内容不那么冗长。
于 2019-01-17T17:09:59.023 回答