Module: Wurk::Batch::Callbacks
- Defined in:
- lib/wurk/batch/callbacks.rb
Overview
Fires batch callbacks (:success, :complete, :death) by enqueuing
them as ordinary jobs on the batch's callback_queue. Dedup is via
b-
Callback wrapper job: Wurk::Batch::CallbackJob — given a target spec
("Klass" or "Klass#method") and options hash, it instantiates and
invokes on_
Class Method Summary collapse
- .any_child_dead?(bid) ⇒ Boolean
-
.apply_linger(bid) ⇒ Object
Post-success retention: a succeeded batch no longer coordinates any jobs, so its keys expire after the per-batch
lingeroverride (else 24h) instead of the 30d pending TTL. - .callback_specs_for(bid) ⇒ Object
-
.cascade_death(bid) ⇒ Object
A child's death means the parent — and every ancestor — can never fully succeed, so
:deathpropagates up the parent chain. -
.clear_death_on_recovery(bid) ⇒ Object
Recovery counterpart to cascade_death (#226).
-
.death_fired?(bid) ⇒ Boolean
True once
:deathhas fired for this batch — from one of its own jobs dying or from a descendant's death cascading up. -
.dedup_set(bid, event) ⇒ Object
Atomically marks
bidas having firedevent. -
.emit_duration_metric(bid) ⇒ Object
Pro statsd metric (spec §9.3): wall-clock seconds from batch creation to full success.
- .enqueue_callback_job(bid, target, event, options, queue) ⇒ Object
-
.enqueue_callbacks(bid, event) ⇒ Object
Per-callback rescue: one bad spec or a transient enqueue failure must not strand the batch with the remaining callbacks for this event un-enqueued.
- .fire_complete(bid) ⇒ Object
-
.fire_death(bid) ⇒ Object
Fired from Wurk::Batch::DeathHandler whenever a death makes the died set go non-empty: the first death, or the first re-death after every dead jid was manually retried back into the live set (#212 — that retry's BATCH_PUSH cleared the death mark).
- .fire_success(bid) ⇒ Object
- .kids_finished?(bid) ⇒ Boolean
- .live_for(bid) ⇒ Object
-
.maybe_fire(bid, pending:, live:) ⇒ Object
Called from the server middleware after BATCH_ACK_SUCCESS (and from DeathHandler when a death drains the last live jid).
- .own_died_remaining?(bid) ⇒ Boolean
- .parent_bid_for(bid) ⇒ Object
- .parse_callbacks(raw) ⇒ Object
- .pending_for(bid) ⇒ Object
- .pkids_drained?(parent_bid, child_bid) ⇒ Boolean
-
.propagate_to_parent(bid) ⇒ Object
When a child batch finishes (its live jids hit 0 — by success or death), remove it from the parent's pkids set so the parent's own callbacks wait on the full subtree.
- .record_event(bid, field) ⇒ Object
-
.subtree_dead?(bid) ⇒ Boolean
A batch's subtree is still dead while it carries the durable death mark OR any direct child does — deaths cascade up the parent chain, so a dead descendant keeps every ancestor's child marked.
Class Method Details
.any_child_dead?(bid) ⇒ Boolean
188 189 190 191 |
# File 'lib/wurk/batch/callbacks.rb', line 188 def any_child_dead?(bid) kids = Wurk.redis { |conn| conn.call('SMEMBERS', "b-#{bid}-kids") } kids.any? { |kid| death_fired?(kid) } end |
.apply_linger(bid) ⇒ Object
Post-success retention: a succeeded batch no longer coordinates any
jobs, so its keys expire after the per-batch linger override (else
24h) instead of the 30d pending TTL. Mirrors Sidekiq Pro §2.8.
111 112 113 114 115 116 117 |
# File 'lib/wurk/batch/callbacks.rb', line 111 def apply_linger(bid) raw = Wurk.redis { |conn| conn.call('HGET', "b-#{bid}", 'linger') } seconds = raw.nil? || raw.to_s.empty? ? Batch::POST_SUCCESS_EXPIRY_SECONDS : raw.to_i Wurk.redis do |conn| Batch.keys_for(bid).each { |key| conn.call('EXPIRE', key, seconds) } end end |
.callback_specs_for(bid) ⇒ Object
208 209 210 211 212 213 214 |
# File 'lib/wurk/batch/callbacks.rb', line 208 def callback_specs_for(bid) raw = Wurk.redis { |conn| conn.call('HMGET', "b-#{bid}", 'callbacks', 'callback_queue') } callbacks_json, queue = raw queue = 'default' if queue.nil? || queue.empty? parsed = parse_callbacks(callbacks_json) [parsed, queue] end |
.cascade_death(bid) ⇒ Object
A child's death means the parent — and every ancestor — can never
fully succeed, so :death propagates up the parent chain. The
recursion bottoms out at the root (empty parent_bid); fire_death's own
dedup_set guard makes each ancestor's :death fire exactly once even
under racing children.
66 67 68 69 70 71 |
# File 'lib/wurk/batch/callbacks.rb', line 66 def cascade_death(bid) parent_bid = parent_bid_for(bid) return if parent_bid.nil? || parent_bid.empty? fire_death(parent_bid) end |
.clear_death_on_recovery(bid) ⇒ Object
Recovery counterpart to cascade_death (#226). When a descendant's
last dead job is manually retried back to success, the descendant
clears its OWN death mark (#212, in BATCH_PUSH) — but every ancestor
was marked by the death cascade, not by a jid in its own died set,
so nothing here ever cleared them and the ancestor's :success
stayed suppressed forever. Re-evaluate this batch: drop its durable
death mark and dead-batches membership once its own died set is
empty AND no child still carries a death mark. The b-<bid>-death
notify dedup key is deliberately left intact, so a later re-death
re-marks the batch (fire_death restores the flag before its own
dedup guard) without ever re-enqueuing :death.
173 174 175 176 177 178 179 180 181 182 |
# File 'lib/wurk/batch/callbacks.rb', line 173 def clear_death_on_recovery(bid) return unless death_fired?(bid) return if own_died_remaining?(bid) return if any_child_dead?(bid) Wurk.redis do |conn| conn.call('HDEL', "b-#{bid}", 'death') conn.call('ZREM', 'dead-batches', bid) end end |
.death_fired?(bid) ⇒ Boolean
True once :death has fired for this batch — from one of its own
jobs dying or from a descendant's death cascading up. Suppresses
:success, which must never fire after any death in the subtree.
Reads the durable death field on b-<bid> (written by record_event),
not the b-<bid>-death dedup key — the dedup key has its own 30d TTL
and can expire while an ancestor batch is still open, after which a
late maybe_fire would wrongly emit :success.
145 146 147 |
# File 'lib/wurk/batch/callbacks.rb', line 145 def death_fired?(bid) Wurk.redis { |conn| conn.call('HGET', "b-#{bid}", 'death') } == '1' end |
.dedup_set(bid, event) ⇒ Object
Atomically marks bid as having fired event. Returns true the
first time, false thereafter — caller skips the enqueue when false.
SET NX makes this safe under racing acks.
122 123 124 125 126 127 |
# File 'lib/wurk/batch/callbacks.rb', line 122 def dedup_set(bid, event) Wurk.redis do |conn| ok = conn.call('SET', "b-#{bid}-#{event}", '1', 'NX', 'EX', Batch::CALLBACK_NOTIFY_TTL) ok == 'OK' end end |
.emit_duration_metric(bid) ⇒ Object
Pro statsd metric (spec §9.3): wall-clock seconds from batch creation to
full success. created_at shares the CLOCK_REALTIME epoch we record it
with. No-op without a dogstatsd client.
Strictly best-effort: fire_success has already burned the success
dedup key by the time we run, so a raise here (e.g. a Redis hiccup on the
HGET) would permanently strand the success callbacks and linger that
follow — a retry can't re-fire them. Swallow and log instead.
97 98 99 100 101 102 103 104 105 106 |
# File 'lib/wurk/batch/callbacks.rb', line 97 def emit_duration_metric(bid) created = Wurk.redis { |conn| conn.call('HGET', "b-#{bid}", 'created_at') } return if created.nil? || created.to_s.empty? seconds = ::Process.clock_gettime(::Process::CLOCK_REALTIME) - created.to_f Wurk::Metrics::Statsd.distribution('batch.duration_dist', seconds) rescue StandardError => e Wurk.logger.warn("batch #{bid}: duration metric emit failed: #{e.class}: #{e.}") nil end |
.enqueue_callback_job(bid, target, event, options, queue) ⇒ Object
224 225 226 227 228 229 230 231 |
# File 'lib/wurk/batch/callbacks.rb', line 224 def enqueue_callback_job(bid, target, event, , queue) Wurk::Client.push( 'class' => 'Wurk::Batch::CallbackJob', 'args' => [bid, target, event, ], 'queue' => queue, 'retry' => true ) end |
.enqueue_callbacks(bid, event) ⇒ Object
Per-callback rescue: one bad spec or a transient enqueue failure must not strand the batch with the remaining callbacks for this event un-enqueued. Log and move on so every other callback still fires.
196 197 198 199 200 201 202 203 204 205 206 |
# File 'lib/wurk/batch/callbacks.rb', line 196 def enqueue_callbacks(bid, event) callbacks, queue = callback_specs_for(bid) callbacks.each do |(cb_event, target, )| next unless cb_event == event enqueue_callback_job(bid, target, event, , queue) rescue StandardError => e Wurk.logger.warn("batch #{bid}: #{event} callback #{target.inspect} enqueue failed: #{e.class}: #{e.}") end end |
.fire_complete(bid) ⇒ Object
73 74 75 76 77 78 |
# File 'lib/wurk/batch/callbacks.rb', line 73 def fire_complete(bid) return unless dedup_set(bid, 'complete') record_event(bid, 'complete_at') enqueue_callbacks(bid, 'complete') end |
.fire_death(bid) ⇒ Object
Fired from Wurk::Batch::DeathHandler whenever a death makes the died
set go non-empty: the first death, or the first re-death after every
dead jid was manually retried back into the live set (#212 — that
retry's BATCH_PUSH cleared the death mark). The mark — durable death
flag, death_at, dead-batches membership — is (re-)applied before
the dedup guard so it is restored on re-death; the callback enqueue
and parent cascade stay behind the guard so :death is enqueued at
most once per batch.
52 53 54 55 56 57 58 59 |
# File 'lib/wurk/batch/callbacks.rb', line 52 def fire_death(bid) record_event(bid, 'death_at') Wurk.redis { |conn| conn.call('ZADD', 'dead-batches', Time.now.to_f.to_s, bid) } return unless dedup_set(bid, 'death') enqueue_callbacks(bid, 'death') cascade_death(bid) end |
.fire_success(bid) ⇒ Object
80 81 82 83 84 85 86 87 |
# File 'lib/wurk/batch/callbacks.rb', line 80 def fire_success(bid) return unless dedup_set(bid, 'success') record_event(bid, 'success_at') emit_duration_metric(bid) enqueue_callbacks(bid, 'success') apply_linger(bid) end |
.kids_finished?(bid) ⇒ Boolean
40 41 42 |
# File 'lib/wurk/batch/callbacks.rb', line 40 def kids_finished?(bid) Wurk.redis { |conn| conn.call('SCARD', "b-#{bid}-pkids") }.to_i.zero? end |
.live_for(bid) ⇒ Object
263 |
# File 'lib/wurk/batch/callbacks.rb', line 263 def live_for(bid) = Wurk.redis { |conn| conn.call('SCARD', "b-#{bid}-jids") }.to_i |
.maybe_fire(bid, pending:, live:) ⇒ Object
Called from the server middleware after BATCH_ACK_SUCCESS (and from
DeathHandler when a death drains the last live jid). Fires :complete
when live jids hit 0; fires :success when pending also hits 0 and
there have been no deaths.
Both fires are additionally gated on b-<bid>-pkids being empty —
children whose own subtree hasn't finished yet (#209). Spec §2.4:
child :complete/:success fire before the parent's, so when the
parent's own last job acks while a child batch is still running,
nothing fires here; the last child's propagate_to_parent re-invokes
this and fires then. The SREM in pkids_drained? happens before that
re-invocation, so exactly one of the racing paths fires (dedup_set
absorbs the overlap).
31 32 33 34 35 36 37 38 |
# File 'lib/wurk/batch/callbacks.rb', line 31 def maybe_fire(bid, pending:, live:) return unless live.zero? return unless kids_finished?(bid) fire_complete(bid) fire_success(bid) if pending.zero? && !subtree_dead?(bid) propagate_to_parent(bid) end |
.own_died_remaining?(bid) ⇒ Boolean
184 185 186 |
# File 'lib/wurk/batch/callbacks.rb', line 184 def own_died_remaining?(bid) Wurk.redis { |conn| conn.call('SCARD', "b-#{bid}-died") }.to_i.positive? end |
.parent_bid_for(bid) ⇒ Object
251 252 253 |
# File 'lib/wurk/batch/callbacks.rb', line 251 def parent_bid_for(bid) Wurk.redis { |conn| conn.call('HGET', "b-#{bid}", 'parent_bid') } end |
.parse_callbacks(raw) ⇒ Object
216 217 218 219 220 221 222 |
# File 'lib/wurk/batch/callbacks.rb', line 216 def parse_callbacks(raw) return [] if raw.nil? || raw.empty? JSON.parse(raw) rescue JSON::ParserError [] end |
.pending_for(bid) ⇒ Object
262 |
# File 'lib/wurk/batch/callbacks.rb', line 262 def pending_for(bid) = Wurk.redis { |conn| conn.call('HGET', "b-#{bid}", 'pending') }.to_i |
.pkids_drained?(parent_bid, child_bid) ⇒ Boolean
255 256 257 258 259 260 |
# File 'lib/wurk/batch/callbacks.rb', line 255 def pkids_drained?(parent_bid, child_bid) Wurk.redis do |conn| conn.call('SREM', "b-#{parent_bid}-pkids", child_bid) conn.call('SCARD', "b-#{parent_bid}-pkids").to_i.zero? end end |
.propagate_to_parent(bid) ⇒ Object
When a child batch finishes (its live jids hit 0 — by success or death), remove it from the parent's pkids set so the parent's own callbacks wait on the full subtree. When the parent's pkids hits 0, re-run the parent's maybe_fire: if its own counts are already at zero (the parent-acks-first race), this is what finally fires it.
238 239 240 241 242 243 244 245 246 247 248 249 |
# File 'lib/wurk/batch/callbacks.rb', line 238 def propagate_to_parent(bid) parent_bid = parent_bid_for(bid) return if parent_bid.nil? || parent_bid.empty? return unless pkids_drained?(parent_bid, bid) # A recovered child may have lifted the last death from the parent's # subtree — clear the parent's cascaded mark before its gate runs, so # `:success` can fire. Harmless on the death path: the dying child # still carries its mark, so any_child_dead? keeps the parent dead. clear_death_on_recovery(parent_bid) maybe_fire(parent_bid, pending: pending_for(parent_bid), live: live_for(parent_bid)) end |
.record_event(bid, field) ⇒ Object
129 130 131 132 133 134 135 |
# File 'lib/wurk/batch/callbacks.rb', line 129 def record_event(bid, field) now = ::Process.clock_gettime(::Process::CLOCK_REALTIME) Wurk.redis do |conn| conn.call('HSET', "b-#{bid}", field, now.to_s) conn.call('HSET', "b-#{bid}", field.to_s.sub('_at', ''), '1') end end |
.subtree_dead?(bid) ⇒ Boolean
A batch's subtree is still dead while it carries the durable death
mark OR any direct child does — deaths cascade up the parent chain,
so a dead descendant keeps every ancestor's child marked. This gates
:success, which must never fire while a job in the subtree is
terminally dead (spec §2.4). The child check matters for the brief
window where a batch with both its own dead job and a dead child has
its OWN dead job retried to success: BATCH_PUSH (#212) clears that
batch's own mark when its died set drains, but the child subtree is
still dead, so death_fired? alone would wrongly let :success fire.
158 159 160 |
# File 'lib/wurk/batch/callbacks.rb', line 158 def subtree_dead?(bid) death_fired?(bid) || any_child_dead?(bid) end |