1

I want to use Apache Storm's TridentTopology in a project. I am finding it difficult to understand the .each() function from the storm.trident.Stream class. Below is the example code given in their tutorial for reference:

TridentTopology topology = new TridentTopology();        
TridentState wordCounts =
     topology.newStream("spout1", spout)
       .each(new Fields("sentence"), new Split(), new Fields("word"))
       .groupBy(new Fields("word"))
       .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"))                
       .parallelismHint(6);

I didn't understand the signature of the method .each(). Below is what I understood. Please correct me if I am wrong and also give some more information for my knowledge.

.each()

  • The first parameter takes the fields which are correlated keys to the emitted values from spout and returned from the getOutputFields() method in the spout. I still don't know why is that parameter used for.
  • The second parameter is the class extending the BaseFunction. It processes the tuple.
  • The third parameter understanding is similar to the first parameter.
4

1 回答 1

3

The first parameter is a projection on the input tuples. In your example, only the field with name "sentence" in provided to Split. If your source emits tuple with schema Fields("first", "sentence", "third") you can only access "sentence" in Split. Furthermore, "sentence" will have index zero (and not one) in Split. Pay attention that it is not a projection on the output -- all field will remain in the output tuples! It's just a limited view on the whole tuple within Split.

The last parameter is the schema of the Values given to emit() within Split. This field names are appended as new attribute to the output tuples. Thus, the output tuple's schema is the input tuple's schema (original, not projected by the first parameter) plus the fields of this last parameter.

See section "Function" in the documentation: https://storm.apache.org/releases/0.10.0/Trident-API-Overview.html

于 2015-11-26T12:33:26.280 回答