本文所讲内容并非storm的acker机制,如果想看acker机制的让您失望了,不过在此奉上徐明明大牛的blog: Twitter Storm源代码分析之acker工作流程 Twitter Storm如何保证消息不丢失
或者查看《storm源码分析》(又给京狗打链接)第12章-storm的acker系统,里面会详细说明storm的acker机制,笔者在此就不多述(多述都是废话,还不一定有人家讲的好)了。
这篇主要讲一下,关于开acker和不开acker的区别。 首先说一下,BasicBolt和RichBolt的区别,RichBolt会帮我们自动ack tuple的,basicbolt不会,所以如果继承的是basicBolt的话,就需要自己outputcollecter调ack方法了。
一般Storm有个配置项
/**
* How many executors to spawn for ackers.
*
* <p>If this is set to 0, then Storm will immediately ack tuples as soon
* as they come off the spout, effectively disabling reliability.</p>
*/
public static final String TOPOLOGY_ACKER_EXECUTORS = "topology.acker.executors";
该配置项是配置Acker Bolt数目的,大于0则spout每发一条msg,都会把相应的信息<rootid, msgid>发送给Acker Bolt进行跟踪。AckerBolt跟踪这个消息进行可靠性处理。
但是如果TOPOLOGY_ACKER_EXECUTORS配置为0的话,bolt和spout之间的ack又会是怎样的呐??
先看下面的executor.clj中(defmethod mk-threads :spout [executor-data task-datas]方法的部分代码
pending (RotatingMap.
2 ;; microoptimize for performance of .size method
(reify RotatingMap$ExpiredCallback
(expire [this msg-id [task-id spout-id tuple-info start-time-ms]]
(let [time-delta (if start-time-ms (time-delta-ms start-time-ms))]
(fail-spout-msg executor-data (get task-datas task-id) spout-id tuple-info time-delta)
))))
tuple-action-fn (fn [task-id ^TupleImpl tuple]
(let [stream-id (.getSourceStreamId tuple)]
(condp = stream-id
Constants/SYSTEM_TICK_STREAM_ID (.rotate pending)
Constants/METRICS_TICK_STREAM_ID (metrics-tick executor-data (get task-datas task-id) tuple)
(let [id (.getValue tuple 0)
[stored-task-id spout-id tuple-finished-info start-time-ms] (.remove pending id)]
(when spout-id
(when-not (= stored-task-id task-id)
(throw-runtime "Fatal error, mismatched task ids: " task-id " " stored-task-id))
(let [time-delta (if start-time-ms (time-delta-ms start-time-ms))]
(condp = stream-id //这里,根据stream id 来对msg进行ack或fail
ACKER-ACK-STREAM-ID (ack-spout-msg executor-data (get task-datas task-id)
spout-id tuple-finished-info time-delta)
ACKER-FAIL-STREAM-ID (fail-spout-msg executor-data (get task-datas task-id)
spout-id tuple-finished-info time-delta)
)))
;; TODO: on failure, emit tuple to failure stream
))))
receive-queue (:receive-queue executor-data)
event-handler (mk-task-receiver executor-data tuple-action-fn)
has-ackers? (has-ackers? storm-conf)
emitted-count (MutableLong. 0)
empty-emit-streak (MutableLong. 0)
再看acker.clj的execute代码就清晰多了,acker也是根据stream-id来对tuple的msgid进行异或处理的,
(^void execute [this ^Tuple tuple]
(let [^RotatingMap pending (.getObject pending)
stream-id (.getSourceStreamId tuple)]
(if (= stream-id Constants/SYSTEM_TICK_STREAM_ID)
(.rotate pending)
(let [id (.getValue tuple 0)
^OutputCollector output-collector (.getObject output-collector)
curr (.get pending id)
curr (condp = stream-id
ACKER-INIT-STREAM-ID (-> curr
(update-ack (.getValue tuple 1))
(assoc :spout-task (.getValue tuple 2)))
ACKER-ACK-STREAM-ID (update-ack curr (.getValue tuple 1))
ACKER-FAIL-STREAM-ID (assoc curr :failed true))]
(.put pending id curr)
(when (and curr (:spout-task curr))
(cond (= 0 (:val curr))
(do
(.remove pending id)
(acker-emit-direct output-collector
(:spout-task curr)
ACKER-ACK-STREAM-ID
[id]
))
(:failed curr)
(do
(.remove pending id)
(acker-emit-direct output-collector
(:spout-task curr)
ACKER-FAIL-STREAM-ID
[id]
))
))
(.ack output-collector tuple)
))))
(^void cleanup [this]
)
那么bolt是怎样调用acker的呐,上面说了,继承richbolt时会自动调用ack方法,那么ack方法到底做了哪些事情呐,按executor.clj中的mk-threads :bolt部分代码就清晰多了
(.prepare bolt-obj
storm-conf
user-context
(OutputCollector. //实现了匿名内部类,实现IOutputCollector的emit,emitDirect,ack和fail方法
(reify IOutputCollector
(emit [this stream anchors values]
(bolt-emit stream anchors values nil))
(emitDirect [this task stream anchors values]
(bolt-emit stream anchors values task))
(^void ack [this ^Tuple tuple]
(let [^TupleImpl tuple tuple
ack-val (.getAckVal tuple)]
(fast-map-iter [[root id] (.. tuple getMessageId getAnchorsToIds)]
(task/send-unanchored task-data
ACKER-ACK-STREAM-ID
[root (bit-xor id ack-val)])
))
(let [delta (tuple-time-delta! tuple)]
(task/apply-hooks user-context .boltAck (BoltAckInfo. tuple task-id delta))
(when delta
(builtin-metrics/bolt-acked-tuple! (:builtin-metrics task-data)
executor-stats
(.getSourceComponent tuple)
(.getSourceStreamId tuple)
delta)
(stats/bolt-acked-tuple! executor-stats
(.getSourceComponent tuple)
(.getSourceStreamId tuple)
delta))))
(^void fail [this ^Tuple tuple]
(fast-list-iter [root (.. tuple getMessageId getAnchors)]
(task/send-unanchored task-data
ACKER-FAIL-STREAM-ID
[root]))
(let [delta (tuple-time-delta! tuple)]
(task/apply-hooks user-context .boltFail (BoltFailInfo. tuple task-id delta))
(when delta
(builtin-metrics/bolt-failed-tuple! (:builtin-metrics task-data)
executor-stats
(.getSourceComponent tuple)
从上面可以看出,prepare方法中实现了匿名内部类,实现IOutputCollector的emit,emitDirect,ack和fail方法,然后把OutputCollector传给用户所实现的bolt,即bolt重载prepare方法中的OutputCollector,当用利用OutputCollector调用ack方法时就是调用上面clojure代码中匿名实现的ack方法,该方法会发送一个ACKER-ACK-STREAM-ID 的streamid,然后Spout就可以接收到该stream进行ack操作,这也是你在WebUI上看到spout后面的ack count数目了。
总结一下
Spout只是根据Ack stream id来判断是否进行ack或fail,跟具体哪个bolt没有任何关系。所以大家不要认为storm的消息可靠性是来源spout和acker bolt之间的通信,其实不然,Bolt也是会发消息给spout的。
OK,到此结束。 PS:storm版本是0.9.4