我是风暴中三叉戟的新手。我对 TridentState 感到头疼。据我了解,三叉戟维护每个批次的状态(即元数据)(批次中的所有元组是否都通过在数据库中维护事务 id 来完全处理)并且我不完全确定以下语句的作用
TridentState urlToTweeters =
topology.newStaticState(getUrlToTweetersState());
任何人都可以解释当我们定义上述代码时实际发生了什么?
我是风暴中三叉戟的新手。我对 TridentState 感到头疼。据我了解,三叉戟维护每个批次的状态(即元数据)(批次中的所有元组是否都通过在数据库中维护事务 id 来完全处理)并且我不完全确定以下语句的作用
TridentState urlToTweeters =
topology.newStaticState(getUrlToTweetersState());
任何人都可以解释当我们定义上述代码时实际发生了什么?
我希望现在回答永远不会太晚,至少其他人可能会发现我的回答很有用:)
所以,topology.newStaticState()
是 Trident 对可查询数据存储的抽象。一个参数 fornewStaticState()
应该是一个实现 - 基于方法的契约 - of storm.trident.state.StateFactory
。反过来,工厂应该实现makeState()
返回storm.trident.state.State
. 但是,如果您打算查询您的状态,您应该返回一个 istance storm.trident.state.map.ReadOnlyMapState
,因为 plainstorm.trident.state.State
没有查询实际数据源的方法(如果您尝试使用除 之外的任何东西,您实际上会得到一个类转换异常ReadOnlyMapState
)。
所以,让我们试一试吧!
虚拟状态实现:
public static class ExampleStaticState implements ReadOnlyMapState<String> {
private final Map<String, String> dataSourceStub;
public ExampleStaticState() {
dataSourceStub = new HashMap<>();
dataSourceStub.put("tuple-00", "Trident");
dataSourceStub.put("tuple-01", "definitely");
dataSourceStub.put("tuple-02", "lacks");
dataSourceStub.put("tuple-03", "documentation");
}
@Override
public List<String> multiGet(List<List<Object>> keys) {
System.out.println("DEBUG: MultiGet, keys is " + keys);
List<String> result = new ArrayList<>();
for (List<Object> inputTuple : keys) {
result.add(dataSourceStub.get(inputTuple.get(0)));
}
return result;
}
@Override
public void beginCommit(Long txid) {
// never gets executed...
System.out.println("DEBUG: Begin commit, txid=" + txid);
}
@Override
public void commit(Long txid) {
// never gets executed...
System.out.println("DEBUG: Commit, txid=" + txid);
}
}
一个工厂:
public static class ExampleStaticStateFactory implements StateFactory {
@Override
public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
return new ExampleStaticState();
}
}
一个简单的psvm
(又名public static void main
):
public static void main(String... args) {
TridentTopology tridentTopology = new TridentTopology();
FeederBatchSpout spout = new FeederBatchSpout(Arrays.asList(new String[]{
"foo"
}));
TridentState state = tridentTopology.newStaticState(new ExampleStaticStateFactory());
tridentTopology
.newStream("spout", spout)
.stateQuery(state, new Fields("foo"), new MapGet(), new Fields("bar"))
.each(new Fields("foo", "bar"), new Debug())
;
Config conf = new Config();
conf.setNumWorkers(6);
LocalCluster localCluster = new LocalCluster();
localCluster.submitTopology("tridentTopology", conf, tridentTopology.build());
spout.feed(Arrays.asList(new Values[]{
new Values("tuple-00"),
new Values("tuple-01"),
new Values("tuple-02"),
new Values("tuple-03")
}));
localCluster.shutdown();
}
最后,输出:
DEBUG: MultiGet, keys is [[tuple-00], [tuple-01], [tuple-02], [tuple-03]]
DEBUG: [tuple-00, Trident]
DEBUG: [tuple-01, definitely]
DEBUG: [tuple-02, lacks]
DEBUG: [tuple-03, documentation]
您会看到,stateQuery() 从输入批次中获取值并将它们映射到“数据存储”中找到的值。
深入一点,您可以查看MapGet
类的来源(其实例用于在拓扑内部进行查询的人)并在那里找到以下内容:
public class MapGet extends BaseQueryFunction<ReadOnlyMapState, Object> {
@Override
public List<Object> batchRetrieve(ReadOnlyMapState map, List<TridentTuple> keys) {
return map.multiGet((List) keys);
}
@Override
public void execute(TridentTuple tuple, Object result, TridentCollector collector) {
collector.emit(new Values(result));
}
}
所以在底层它只是调用multiGet()
你ReadOnlyMapState
实现的方法,然后发出在数据存储中找到的值,将它们添加到已经存在的元组中。您可以(尽管这可能不是最好的做法)创建自己的实现BaseQueryFunction<ReadOnlyMapState, Object>
来做一些更复杂的事情。
风暴 wiki 上有关于三叉戟状态的很好的文档。对您的问题的简单回答是,这urlToTweeters
是一个可以从中查询的状态对象。我假设上面的陈述来自三叉戟教程,转载如下:
TridentState urlToTweeters = topology.newStaticState(getUrlToTweetersState());
TridentState tweetersToFollowers = topology.newStaticState(getTweeterToFollowersState());
topology.newDRPCStream("reach")
.stateQuery(urlToTweeters, new Fields("args"), new MapGet(), new Fields("tweeters")).each(new Fields("tweeters"), new ExpandList(), new Fields("tweeter"))
/* At this point we have the tweeters for each url passed in args */
.shuffle()
.stateQuery(tweetersToFollowers, new Fields("tweeter"), new MapGet(), new Fields("followers"))
.parallelismHint(200)
.each(new Fields("followers"), new ExpandList(), new Fields("follower"))
.groupBy(new Fields("follower"))
.aggregate(new One(), new Fields("one"))
.parallelismHint(20)
.aggregate(new Count(), new Fields("reach"));
在此示例中,urlToTweeters
将存储 URL 到 Tweeter 的映射,并且在下一行定义的 DRPCreach
查询(将 URL 作为其参数)最终将产生范围。但是在途中(标有内联评论),您会看到每个 url 的高音流,即查询的结果urlToTweeters
。