Class: Wurk::DeadSet

Inherits:
JobSet show all
Defined in:
lib/wurk/dead_set.rb

Overview

Capped ZSET of jobs that exhausted retries (the "morgue"). Bounded by dead_max_jobs and dead_timeout_in_seconds config knobs — every kill trims both axes. Death handlers fire by default — including on API/UI kills, matching Sidekiq — unless notify_failure: false.

Spec: docs/target/sidekiq-free.md §19.5, §17.2, §31.8.

Constant Summary collapse

API_KILL_MESSAGE =

Synthesized as the death-handler exception for kills without a real error. Byte-for-byte Sidekiq's message — Wurk::Unique::DEATH_HANDLER matches on it to keep the lock on manual kills (Ent parity), and ecosystem handlers may pattern-match it too.

'Job killed by API'

Instance Method Summary collapse

Constructor Details

#initialize(name = 'dead') ⇒ DeadSet

Optional name allows tests to operate on a namespaced ZSET; production callers always use the default 'dead' key (wire-compat with Sidekiq).



20
21
22
# File 'lib/wurk/dead_set.rb', line 20

def initialize(name = 'dead')
  super
end

Instance Method Details

#kill(message, opts = {}) ⇒ Object

ZADD the raw JSON payload, trim, fire death handlers. notify_failure: true (default) routes the kill through the death-handler chain; UI-initiated kills pass false. ex is the originating exception (or synthesized RuntimeError when callers don't have one) — death handlers receive (job, ex). max_jobs: / timeout: propagate to the auto-trim; see #trim for the rationale.



53
54
55
56
57
58
59
60
61
62
63
# File 'lib/wurk/dead_set.rb', line 53

def kill(message, opts = {}) # rubocop:disable Naming/PredicateMethod
  notify = opts.fetch(:notify_failure, true)
  do_trim = opts.fetch(:trim, true)
  ex = opts[:ex] || RuntimeError.new(API_KILL_MESSAGE).tap { |e| e.set_backtrace(caller) }

  now = ::Process.clock_gettime(::Process::CLOCK_REALTIME)
  Wurk.redis { |conn| conn.call('ZADD', @name, now.to_s, message) }
  trim(max_jobs: opts[:max_jobs], timeout: opts[:timeout]) if do_trim
  fire_death_handlers(message, ex) if notify
  true
end

#trim(max_jobs: nil, timeout: nil) ⇒ Object

Two-axis trim: ZREMRANGEBYSCORE evicts entries older than dead_timeout_in_seconds, ZREMRANGEBYRANK 0 -dead_max_jobs keeps the count bounded. Pipelined — partial failure leaves at most one axis applied (acceptable; trim is non-critical, runs again next kill).

max_jobs: / timeout: override the global config for this call. Lets parallel tests run trim with isolated limits without mutating Wurk.configuration (which is process-global and races across threads).



32
33
34
35
36
37
38
39
40
41
42
43
44
45
# File 'lib/wurk/dead_set.rb', line 32

def trim(max_jobs: nil, timeout: nil) # rubocop:disable Naming/PredicateMethod
  config = Wurk.configuration
  max_jobs ||= config[:dead_max_jobs] || 10_000
  timeout ||= config[:dead_timeout_in_seconds] || (180 * 24 * 60 * 60)
  cutoff = ::Process.clock_gettime(::Process::CLOCK_REALTIME) - timeout

  Wurk.redis do |conn|
    conn.pipelined do |pipe|
      pipe.call('ZREMRANGEBYSCORE', @name, '-inf', "(#{cutoff}")
      pipe.call('ZREMRANGEBYRANK', @name, 0, -(max_jobs + 1))
    end
  end
  true
end