Class: Wurk::DeadSet
Overview
Capped ZSET of jobs that exhausted retries (the "morgue"). Bounded by
dead_max_jobs and dead_timeout_in_seconds config knobs — every
kill trims both axes. Death handlers fire by default — including on
API/UI kills, matching Sidekiq — unless notify_failure: false.
Spec: docs/target/sidekiq-free.md §19.5, §17.2, §31.8.
Constant Summary collapse
- API_KILL_MESSAGE =
Synthesized as the death-handler exception for kills without a real error. Byte-for-byte Sidekiq's message — Wurk::Unique::DEATH_HANDLER matches on it to keep the lock on manual kills (Ent parity), and ecosystem handlers may pattern-match it too.
'Job killed by API'
Instance Method Summary collapse
-
#initialize(name = 'dead') ⇒ DeadSet
constructor
Optional
nameallows tests to operate on a namespaced ZSET; production callers always use the default'dead'key (wire-compat with Sidekiq). -
#kill(message, opts = {}) ⇒ Object
ZADD the raw JSON payload, trim, fire death handlers.
-
#trim(max_jobs: nil, timeout: nil) ⇒ Object
Two-axis trim:
ZREMRANGEBYSCOREevicts entries older thandead_timeout_in_seconds,ZREMRANGEBYRANK 0 -dead_max_jobskeeps the count bounded.
Constructor Details
#initialize(name = 'dead') ⇒ DeadSet
Optional name allows tests to operate on a namespaced ZSET; production
callers always use the default 'dead' key (wire-compat with Sidekiq).
20 21 22 |
# File 'lib/wurk/dead_set.rb', line 20 def initialize(name = 'dead') super end |
Instance Method Details
#kill(message, opts = {}) ⇒ Object
ZADD the raw JSON payload, trim, fire death handlers. notify_failure: true (default) routes the kill through the death-handler chain;
UI-initiated kills pass false. ex is the originating exception (or
synthesized RuntimeError when callers don't have one) — death handlers
receive (job, ex). max_jobs: / timeout: propagate to the auto-trim;
see #trim for the rationale.
53 54 55 56 57 58 59 60 61 62 63 |
# File 'lib/wurk/dead_set.rb', line 53 def kill(, opts = {}) # rubocop:disable Naming/PredicateMethod notify = opts.fetch(:notify_failure, true) do_trim = opts.fetch(:trim, true) ex = opts[:ex] || RuntimeError.new(API_KILL_MESSAGE).tap { |e| e.set_backtrace(caller) } now = ::Process.clock_gettime(::Process::CLOCK_REALTIME) Wurk.redis { |conn| conn.call('ZADD', @name, now.to_s, ) } trim(max_jobs: opts[:max_jobs], timeout: opts[:timeout]) if do_trim fire_death_handlers(, ex) if notify true end |
#trim(max_jobs: nil, timeout: nil) ⇒ Object
Two-axis trim: ZREMRANGEBYSCORE evicts entries older than
dead_timeout_in_seconds, ZREMRANGEBYRANK 0 -dead_max_jobs keeps
the count bounded. Pipelined — partial failure leaves at most one
axis applied (acceptable; trim is non-critical, runs again next kill).
max_jobs: / timeout: override the global config for this call.
Lets parallel tests run trim with isolated limits without mutating
Wurk.configuration (which is process-global and races across threads).
32 33 34 35 36 37 38 39 40 41 42 43 44 45 |
# File 'lib/wurk/dead_set.rb', line 32 def trim(max_jobs: nil, timeout: nil) # rubocop:disable Naming/PredicateMethod config = Wurk.configuration max_jobs ||= config[:dead_max_jobs] || 10_000 timeout ||= config[:dead_timeout_in_seconds] || (180 * 24 * 60 * 60) cutoff = ::Process.clock_gettime(::Process::CLOCK_REALTIME) - timeout Wurk.redis do |conn| conn.pipelined do |pipe| pipe.call('ZREMRANGEBYSCORE', @name, '-inf', "(#{cutoff}") pipe.call('ZREMRANGEBYRANK', @name, 0, -(max_jobs + 1)) end end true end |