Module: Wurk::Middleware::PoisonPill
- Defined in:
- lib/wurk/middleware/poison_pill.rb
Overview
Pro parity (§3.2): poison-pill detection for reliable-fetch orphans.
When a job is recovered out of a dead process's private list, we
INCR a per-jid counter at super_fetch:recovered:<jid> with a 72h
TTL. Once the counter crosses RECOVERY_THRESHOLD (3) the next recovery
is treated as a poison pill: the payload is moved to the dead set,
jobs.poison is emitted to statsd, and recovery callbacks fire so
operators can be paged.
Counter key TTL is wire-compat with Sidekiq Pro — third-party tooling
that watches super_fetch:recovered:* expects 72h.
No server-middleware registration: callers are reaper / bulk_requeue
paths that drive the lifecycle directly via track!(payload, queue:).
When that integration lands, it just calls PoisonPill.track! on each
orphan it about to RPUSH back to the public queue.
Defined Under Namespace
Classes: Pill
Constant Summary collapse
- RECOVERY_THRESHOLD =
3- RECOVERY_TTL =
72 * 60 * 60
- KEY_PREFIX =
'super_fetch:recovered:'- DEAD_RECORD_LIMIT =
100
Class Method Summary collapse
- .bump_counter(jid) ⇒ Object
- .callbacks ⇒ Object
-
.clear!(jid) ⇒ Object
Resets the counter for a jid — call after a successful perform so a job that recovered twice and then completed doesn't accumulate state.
- .emit_poison(klass, queue) ⇒ Object
- .emit_recovered_fetch(klass, queue) ⇒ Object
- .fire_callbacks(pill) ⇒ Object
-
.fire_super_fetch(config, payload, pill) ⇒ Object
Invoke the Pro recovery callback registered via config.super_fetch! { }.
- .mark_poison(payload, job, queue:, count:) ⇒ Object
-
.on_poison(&block) ⇒ Object
Register a callback fired when a poison pill is detected.
-
.parse(payload) ⇒ Object
---- internals --------------------------------------------------.
-
.recovery_count(jid) ⇒ Object
Reads the current recovery counter without bumping it.
-
.reset! ⇒ Object
Test-only reset.
-
.track!(payload, queue: nil, config: Wurk.configuration) ⇒ Symbol
Called per recovered orphan job.
Class Method Details
.bump_counter(jid) ⇒ Object
127 128 129 130 131 132 133 134 |
# File 'lib/wurk/middleware/poison_pill.rb', line 127 def bump_counter(jid) key = "#{KEY_PREFIX}#{jid}" Wurk.redis do |conn| count = conn.call('INCR', key).to_i conn.call('EXPIRE', key, RECOVERY_TTL) count end end |
.callbacks ⇒ Object
104 105 106 |
# File 'lib/wurk/middleware/poison_pill.rb', line 104 def callbacks @callbacks ||= [] end |
.clear!(jid) ⇒ Object
Resets the counter for a jid — call after a successful perform so a job that recovered twice and then completed doesn't accumulate state.
88 89 90 91 92 |
# File 'lib/wurk/middleware/poison_pill.rb', line 88 def clear!(jid) return if jid.nil? || jid.to_s.empty? Wurk.redis { |conn| conn.call('DEL', "#{KEY_PREFIX}#{jid}") } end |
.emit_poison(klass, queue) ⇒ Object
150 151 152 153 154 155 |
# File 'lib/wurk/middleware/poison_pill.rb', line 150 def emit_poison(klass, queue) = [] << "class:#{klass}" if klass << "queue:#{queue}" if queue Wurk::Metrics::Statsd.increment('jobs.poison', tags: .empty? ? nil : ) end |
.emit_recovered_fetch(klass, queue) ⇒ Object
136 137 138 139 140 141 |
# File 'lib/wurk/middleware/poison_pill.rb', line 136 def emit_recovered_fetch(klass, queue) = [] << "class:#{klass}" if klass << "queue:#{queue}" if queue Wurk::Metrics::Statsd.increment('jobs.recovered.fetch', tags: .empty? ? nil : ) end |
.fire_callbacks(pill) ⇒ Object
157 158 159 160 161 162 163 |
# File 'lib/wurk/middleware/poison_pill.rb', line 157 def fire_callbacks(pill) callbacks.each do |cb| cb.call(pill) rescue StandardError => e Wurk.configuration.handle_exception(e, context: 'Wurk::Middleware::PoisonPill') end end |
.fire_super_fetch(config, payload, pill) ⇒ Object
Invoke the Pro recovery callback registered via config.super_fetch! { }.
No-op unless one is registered. jobstr is the raw job JSON so a Pro
|jobstr, pill| block sees exactly what Sidekiq Pro hands it.
168 169 170 171 172 173 174 175 176 |
# File 'lib/wurk/middleware/poison_pill.rb', line 168 def fire_super_fetch(config, payload, pill) cb = config.super_fetch_callback return unless cb jobstr = payload.is_a?(::String) ? payload : Wurk.dump_json(payload) cb.call(jobstr, pill) rescue StandardError => e config.handle_exception(e, context: 'Wurk::Middleware::PoisonPill') end |
.mark_poison(payload, job, queue:, count:) ⇒ Object
143 144 145 146 147 148 |
# File 'lib/wurk/middleware/poison_pill.rb', line 143 def mark_poison(payload, job, queue:, count:) emit_poison(job['class'], queue) json = payload.is_a?(String) ? payload : Wurk.dump_json(job) Wurk::DeadSet.new.kill(json, notify_failure: false) fire_callbacks(jid: job['jid'], klass: job['class'], count: count, queue: queue) end |
.on_poison(&block) ⇒ Object
Register a callback fired when a poison pill is detected. Callbacks
receive a single Hash {jid:, klass:, count:, queue:} — matches
Sidekiq Pro's documented shape so consumers can drop in unchanged.
97 98 99 100 101 102 |
# File 'lib/wurk/middleware/poison_pill.rb', line 97 def on_poison(&block) raise ArgumentError, 'block required' unless block callbacks << block block end |
.parse(payload) ⇒ Object
---- internals --------------------------------------------------
115 116 117 118 119 120 121 122 123 124 125 |
# File 'lib/wurk/middleware/poison_pill.rb', line 115 def parse(payload) case payload when Hash then payload when String begin Wurk.load_json(payload) rescue ::JSON::ParserError nil end end end |
.recovery_count(jid) ⇒ Object
Reads the current recovery counter without bumping it. Used by tests and dashboards; returns 0 for jobs that have never been recovered.
80 81 82 83 84 |
# File 'lib/wurk/middleware/poison_pill.rb', line 80 def recovery_count(jid) return 0 if jid.nil? || jid.to_s.empty? Wurk.redis { |conn| conn.call('GET', "#{KEY_PREFIX}#{jid}") }.to_i end |
.reset! ⇒ Object
Test-only reset.
109 110 111 |
# File 'lib/wurk/middleware/poison_pill.rb', line 109 def reset! @callbacks = [] end |
.track!(payload, queue: nil, config: Wurk.configuration) ⇒ Symbol
Called per recovered orphan job. Returns :poison when the threshold
was crossed and the job was killed; :recovered when the job is
being re-pushed (caller's responsibility — we don't touch the queue
here). Emits jobs.recovered.fetch on every call, jobs.poison
only on the kill path.
Fires the Pro super_fetch! recovery callback (config.super_fetch_callback)
exactly once per call: (jobstr, nil) on plain recovery, (jobstr, pill)
on the kill path. The poison-only on_poison Hash callbacks fire
independently inside #mark_poison.
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 |
# File 'lib/wurk/middleware/poison_pill.rb', line 56 def track!(payload, queue: nil, config: Wurk.configuration) job = parse(payload) unless job fire_super_fetch(config, payload, nil) return :recovered end jid = job['jid'] klass = job['class'] emit_recovered_fetch(klass, queue) count = bump_counter(jid) if jid && !jid.empty? if count && count >= RECOVERY_THRESHOLD mark_poison(payload, job, queue: queue, count: count) fire_super_fetch(config, payload, Pill.new(jid: jid, klass: klass, count: count, queue: queue)) :poison else fire_super_fetch(config, payload, nil) :recovered end end |