我们已经为这个问题苦苦挣扎了很长时间。简而言之,我们的风暴拓扑会在一段时间后以随机方式停止从 spout 发出消息。我们有一个自动化脚本,它在主数据刷新活动完成后每天 06:00 UTC 重新部署拓扑。
在过去 2 周中,我们的拓扑在 UTC 时间(22:00 到 02:00 之间)停止发送消息 3 次。它只有在我们重新启动它时才上线,大约是 06:00 UTC。
我搜索了许多答案和博客,但找不到这里发生的事情。我们有一个未锚定的拓扑,这是我们在 3-4 年前做出的选择。我们从 0.9.2 开始,现在是 1.1.0。
我检查了所有类型的日志,我 100% 确定nextTuple()
控制器的方法没有被调用,并且系统中没有发生可能导致这种情况的异常。我还检查了我们积累的所有类型的日志,甚至没有一个 ERROR 或 WARN 日志解释突然停止。INFO 日志也没有那么有用。在工作日志、主管日志或 nimbus 日志中,没有任何内容可以与此问题相关联。
这是我们的 spout 类的外观: Controller.java
public class Controller implements IRichSpout {
SpoutOutputCollector _collector;
Calendar LAST_RUN = null;
List<ControllerMessage> msgList;
* It is to open the spout
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
_collector = collector;
msgList= new ArrayList<ControllerMessage>();
MongoIndexingHandler mongoIndexingHandler = new MongoIndexingHandler();
* It executes the next tuple
public void nextTuple() {
Map<String, Object> logMap = new HashMap<>();
logMap.put("BEGIN", new Date());
try {
TriggerHandler thandler = new TriggerHandler();
if (msgList.size() == 0) {
List<ControllerMessage> mList = thandler.getControllerMessage(new Date());
msgList = mList;
if (msgList.size() > 0) {
ControllerMessage message = msgList.get(0);
if(thandler.fire(message.getFireTime())) {
Util.log(message, "CONTROLLER_LOGS", message.getTime(), new Date());
_collector.emit(new Values(message));
} catch (Exception e) {
Util.exLog(e, "EXECUTOR_ERROR", new Date(), "nextTuple()",Controller.class);
* It acknowledges the messages
public void ack(Object id) {
* It tells failed messages
public void fail(Object id) {
* It declares the message name
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("SPOUT_MESSAGE"));
public void activate() {
public void close() {
public void deactivate() {
public Map<String, Object> getComponentConfiguration() {
return null;
public class DiagnosticTopology {
public static void main(String[] args) throws Exception {
int gSize = (null != args && args.length > 0) ? Integer.parseInt(args[0]) : 2;
int sSize = (null != args && args.length > 1) ? Integer.parseInt(args[1]) : 128;
int sMSize = (null != args && args.length > 2) ? Integer.parseInt(args[2]) : 16;
int aGSize = (null != args && args.length > 3) ? Integer.parseInt(args[3]) : 16;
int rSize = (null != args && args.length > 4) ? Integer.parseInt(args[4]) : 64;
int rMSize = (null != args && args.length > 5) ? Integer.parseInt(args[5]) : 16;
int dMSize = (null != args && args.length > 6) ? Integer.parseInt(args[6]) : 8;
int wSize = (null != args && args.length > 7) ? Integer.parseInt(args[7]) : 16;
String topologyName = (null != args && args.length > 8) ? args[8] : "DIAGNOSTIC";
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("controller", new Controller(), 1);
builder.setBolt("generator", new GeneratorBolt(), gSize).shuffleGrouping("controller");
builder.setBolt("scraping", new ScrapingBolt(), sSize).shuffleGrouping("generator");
builder.setBolt("smongo", new MongoBolt(), sMSize).shuffleGrouping("scraping");
builder.setBolt("aggregation", new AggregationBolt(), aGSize).shuffleGrouping("scraping");
builder.setBolt("rule", new RuleBolt(), rSize).shuffleGrouping("smongo");
builder.setBolt("rmongo", new RMongoBolt(), rMSize).shuffleGrouping("rule");
builder.setBolt("dstatus", new DeviceStatusBolt(), dMSize).shuffleGrouping("rule");
builder.setSpout("trigger", new TriggerSpout(), 1);
builder.setBolt("job", new JobTriggerBolt(), 4).shuffleGrouping("trigger");
Config conf = new Config();
StormSubmitter.submitTopologyWithProgressBar(topologyName, conf, builder.createTopology());
我们为生产和测试环境准备了相当不错的服务器(至强、8 核、32 GB 和闪存驱动器),并且没有外部因素会导致此问题,因为代码中到处都有异常处理。