Class: Wurk::Fetcher::Reaper
- Inherits:
-
Object
- Object
- Wurk::Fetcher::Reaper
- Includes:
- Component
- Defined in:
- lib/wurk/fetcher/reaper.rb
Overview
Orphan reclamation for the reliable fetcher (Pro super_fetch §3.2).
The Reliable fetcher moves each job from a public queue into a
per-process private list (queue:<public>|<host>|<pid>|<idx>) and
leaves it there until the Processor ACKs. A SIGKILLed or crashed
worker therefore strands its in-flight jobs in private lists that
nobody will ever ACK. The Reaper is the recovery half: it periodically
scans for private lists whose owning process is gone and atomically
moves their jobs back to the public queue so a live worker re-runs them.
Liveness is decided per owner:
* same host — the OS is authoritative: `Process.kill(0, pid)`. This
is instant and ignores a stale `processes` SET entry whose 60s TTL
hasn't lapsed yet, so a `kill -9`ed sibling is reclaimed the moment
the supervisor reaps it rather than 60s later. (Pid reuse by an
unrelated local process is the one blind spot — the supervisor
respawns with a fresh pid, so it does not arise in practice.)
* other host — we cannot ping the pid, so we trust the heartbeat:
the owner is alive iff some live `processes` member (one whose
`info` hash still exists) shares its `host:pid`. Cross-host reclaim
therefore waits out the 60s heartbeat TTL, exactly as the spec says.
Re-pushed jobs run through Wurk::Middleware::PoisonPill, which caps a job at RECOVERY_THRESHOLD recoveries within 72h: past the cap the job is killed into the dead set instead of re-queued, so a job that crashes its worker every time can't loop forever.
The reaper runs two passes, exactly as super_fetch's sweeper does:
* a *scoped* sweep every interval ("1/min within process group"): SCANs
only the public queues this process serves, gated by a cluster `SET NX
EX` lock so one process sweeps per interval. The cheap common path.
* a *full* sweep at most once an hour ("full SCAN 1/hr"): SCANs the whole
`queue:*|*` keyspace, gated by its own hourly lock, so private lists
whose public queue no live process serves — a renamed/decommissioned
queue, or a dead host's queue no survivor consumes — are recovered too,
not stranded forever.
Spec: docs/target/sidekiq-pro.md §3.2.
Constant Summary collapse
- DEFAULT_INTERVAL =
Sweep cadence in seconds; also the cluster-lock TTL so exactly one process sweeps per interval. 60s matches the heartbeat TTL — the floor below which cross-host orphans can't be detected anyway.
60- FULL_INTERVAL =
Full-keyspace sweep cadence + its lock TTL: at most once per hour across the fleet, since a global SCAN is far costlier than the scoped pass.
3600- LOCK_KEY =
'super_fetch:reaper'- FULL_LOCK_KEY =
'super_fetch:reaper:full'- SCAN_COUNT =
100- THREAD_NAME =
'wurk-reaper'
Instance Attribute Summary collapse
-
#config ⇒ Object
included
from Component
readonly
Returns the value of attribute config.
-
#interval ⇒ Object
readonly
Returns the value of attribute interval.
Instance Method Summary collapse
- #default_tag(dir = Dir.pwd) ⇒ Object included from Component
-
#fire_event(event, oneshot: true, reverse: false, reraise: false) ⇒ Object
included
from Component
Invokes lifecycle hooks for
event. - #handle_exception(ex, ctx = {}) ⇒ Object included from Component
- #hostname ⇒ Object included from Component
- #identity ⇒ Object included from Component
-
#initialize(config, interval: DEFAULT_INTERVAL, lock_key: LOCK_KEY, full_interval: FULL_INTERVAL, full_lock_key: FULL_LOCK_KEY) ⇒ Reaper
constructor
A new instance of Reaper.
-
#leader? ⇒ Boolean
included
from Component
True iff this process currently holds the cluster
dear-leaderlock. -
#logger ⇒ Object
included
from Component
--- delegated to config -------------------------------------------.
- #mono_ms ⇒ Object included from Component
- #process_nonce ⇒ Object included from Component
-
#real_ms ⇒ Object
included
from Component
--- clocks ---------------------------------------------------------.
-
#reap ⇒ Object
One loop tick: the scoped sweep when this process wins the per-interval lock, plus the full-keyspace sweep when it also wins the hourly lock.
-
#reclaim! ⇒ Object
One unguarded sweep over every served queue.
-
#reclaim_full! ⇒ Object
One unguarded full-keyspace sweep: every
queue:*|*private list, even ones whose public queue this process doesn't serve. - #redis ⇒ Object included from Component
- #running? ⇒ Boolean
-
#safe_thread(name, priority: nil, &block) ⇒ Object
included
from Component
Spawns a named thread that runs
blockunderwatchdog(name). -
#start ⇒ Object
Spawns the sweep loop.
- #stop ⇒ Object
-
#tid ⇒ Object
included
from Component
--- identity -------------------------------------------------------.
-
#watchdog(last_words) ⇒ Object
included
from Component
Wraps a block at a thread boundary: any unhandled exception is reported via handle_exception (so it lands in error_handlers / the log) and then re-raised.
Constructor Details
#initialize(config, interval: DEFAULT_INTERVAL, lock_key: LOCK_KEY, full_interval: FULL_INTERVAL, full_lock_key: FULL_LOCK_KEY) ⇒ Reaper
Returns a new instance of Reaper.
67 68 69 70 71 72 73 74 75 76 77 78 |
# File 'lib/wurk/fetcher/reaper.rb', line 67 def initialize(config, interval: DEFAULT_INTERVAL, lock_key: LOCK_KEY, full_interval: FULL_INTERVAL, full_lock_key: FULL_LOCK_KEY) @config = config @interval = interval @lock_key = lock_key @full_interval = full_interval @full_lock_key = full_lock_key @thread = nil @done = false @mutex = ::Mutex.new @sleeper = ::ConditionVariable.new end |
Instance Attribute Details
#config ⇒ Object (readonly) Originally defined in module Component
Returns the value of attribute config.
#interval ⇒ Object (readonly)
Returns the value of attribute interval.
65 66 67 |
# File 'lib/wurk/fetcher/reaper.rb', line 65 def interval @interval end |
Instance Method Details
#default_tag(dir = Dir.pwd) ⇒ Object Originally defined in module Component
#fire_event(event, oneshot: true, reverse: false, reraise: false) ⇒ Object Originally defined in module Component
Invokes lifecycle hooks for event. Hooks run in registration order
(or LIFO when reverse: true, used for teardown). A raise in one hook
is reported via handle_exception and does NOT stop the next hook unless
reraise: true (used in tests / fail-fast boot). oneshot: true
clears the bucket after dispatch so the event can't fire twice.
#handle_exception(ex, ctx = {}) ⇒ Object Originally defined in module Component
#hostname ⇒ Object Originally defined in module Component
#identity ⇒ Object Originally defined in module Component
#leader? ⇒ Boolean Originally defined in module Component
True iff this process currently holds the cluster dear-leader lock.
Per spec, the check is performed at call time (Wurk does not cache);
callers must not poll faster than the 60s follower cadence. Returns
false unconditionally when WURK_LEADER=false (or SIDEKIQ_LEADER=false)
is set on the process (opt-out hot-standby). Any Redis error is swallowed →
false, so a transient partition can't propagate as an exception into user
code.
Spec: docs/target/sidekiq-ent.md §6.1.
#logger ⇒ Object Originally defined in module Component
--- delegated to config -------------------------------------------
#mono_ms ⇒ Object Originally defined in module Component
#process_nonce ⇒ Object Originally defined in module Component
#real_ms ⇒ Object Originally defined in module Component
--- clocks ---------------------------------------------------------
#reap ⇒ Object
One loop tick: the scoped sweep when this process wins the per-interval lock, plus the full-keyspace sweep when it also wins the hourly lock. Returns the total jobs reclaimed across both.
109 110 111 112 113 |
# File 'lib/wurk/fetcher/reaper.rb', line 109 def reap reclaimed = acquire_lock? ? reclaim! : 0 reclaimed += reclaim_full! if acquire_full_lock? reclaimed end |
#reclaim! ⇒ Object
One unguarded sweep over every served queue. Returns the number of jobs reclaimed (re-queued or killed). Public so boot paths and tests can drive a deterministic pass without the cluster lock.
118 119 120 121 |
# File 'lib/wurk/fetcher/reaper.rb', line 118 def reclaim! prefixes = live_process_prefixes served_queues.sum { |public_q| reclaim_queue(public_q, prefixes) } end |
#reclaim_full! ⇒ Object
One unguarded full-keyspace sweep: every queue:*|* private list, even
ones whose public queue this process doesn't serve. Returns the number
of jobs reclaimed. Public so boot paths and tests can drive it without
the hourly lock.
127 128 129 130 131 132 133 134 135 136 |
# File 'lib/wurk/fetcher/reaper.rb', line 127 def reclaim_full! prefixes = live_process_prefixes reclaimed = 0 each_full_private_list do |key, public_q, host, pid| next if owner_alive?(host, pid, prefixes) reclaimed += drain(key, public_q) end reclaimed end |
#redis ⇒ Object Originally defined in module Component
#running? ⇒ Boolean
102 103 104 |
# File 'lib/wurk/fetcher/reaper.rb', line 102 def running? !@thread.nil? && @thread.alive? end |
#safe_thread(name, priority: nil, &block) ⇒ Object Originally defined in module Component
Spawns a named thread that runs block under watchdog(name). The
parent must retain the returned Thread; otherwise GC may not, but
report_on_exception is disabled so we don't double-log on death.
#start ⇒ Object
Spawns the sweep loop. Idempotent. The loop waits one interval before its first sweep so booting processes don't dogpile Redis and so an un-stopped launcher in a unit test never touches the keyspace.
83 84 85 86 87 88 89 90 91 |
# File 'lib/wurk/fetcher/reaper.rb', line 83 def start @mutex.synchronize do return @thread if @thread @done = false @thread = spawn_loop_thread end @thread end |
#stop ⇒ Object
93 94 95 96 97 98 99 100 |
# File 'lib/wurk/fetcher/reaper.rb', line 93 def stop @mutex.synchronize do @done = true @sleeper.signal end @thread&.join @thread = nil end |
#tid ⇒ Object Originally defined in module Component
--- identity -------------------------------------------------------
#watchdog(last_words) ⇒ Object Originally defined in module Component
Wraps a block at a thread boundary: any unhandled exception is reported
via handle_exception (so it lands in error_handlers / the log) and then
re-raised. last_words is the component label included in the context.