Class: Wurk::Fetcher::Reaper

Inherits:
Object
  • Object
show all
Includes:
Component
Defined in:
lib/wurk/fetcher/reaper.rb

Overview

Orphan reclamation for the reliable fetcher (Pro super_fetch §3.2).

The Reliable fetcher moves each job from a public queue into a per-process private list (queue:<public>|<host>|<pid>|<idx>) and leaves it there until the Processor ACKs. A SIGKILLed or crashed worker therefore strands its in-flight jobs in private lists that nobody will ever ACK. The Reaper is the recovery half: it periodically scans for private lists whose owning process is gone and atomically moves their jobs back to the public queue so a live worker re-runs them.

Liveness is decided per owner:

* same host — the OS is authoritative: `Process.kill(0, pid)`. This
is instant and ignores a stale `processes` SET entry whose 60s TTL
hasn't lapsed yet, so a `kill -9`ed sibling is reclaimed the moment
the supervisor reaps it rather than 60s later. (Pid reuse by an
unrelated local process is the one blind spot — the supervisor
respawns with a fresh pid, so it does not arise in practice.)
* other host — we cannot ping the pid, so we trust the heartbeat:
the owner is alive iff some live `processes` member (one whose
`info` hash still exists) shares its `host:pid`. Cross-host reclaim
therefore waits out the 60s heartbeat TTL, exactly as the spec says.

Re-pushed jobs run through Wurk::Middleware::PoisonPill, which caps a job at RECOVERY_THRESHOLD recoveries within 72h: past the cap the job is killed into the dead set instead of re-queued, so a job that crashes its worker every time can't loop forever.

The reaper runs two passes, exactly as super_fetch's sweeper does:

* a *scoped* sweep every interval ("1/min within process group"): SCANs
only the public queues this process serves, gated by a cluster `SET NX
EX` lock so one process sweeps per interval. The cheap common path.
* a *full* sweep at most once an hour ("full SCAN 1/hr"): SCANs the whole
`queue:*|*` keyspace, gated by its own hourly lock, so private lists
whose public queue no live process serves — a renamed/decommissioned
queue, or a dead host's queue no survivor consumes — are recovered too,
not stranded forever.

Spec: docs/target/sidekiq-pro.md §3.2.

Constant Summary collapse

DEFAULT_INTERVAL =

Sweep cadence in seconds; also the cluster-lock TTL so exactly one process sweeps per interval. 60s matches the heartbeat TTL — the floor below which cross-host orphans can't be detected anyway.

60
FULL_INTERVAL =

Full-keyspace sweep cadence + its lock TTL: at most once per hour across the fleet, since a global SCAN is far costlier than the scoped pass.

3600
LOCK_KEY =
'super_fetch:reaper'
FULL_LOCK_KEY =
'super_fetch:reaper:full'
SCAN_COUNT =
100
THREAD_NAME =
'wurk-reaper'

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config, interval: DEFAULT_INTERVAL, lock_key: LOCK_KEY, full_interval: FULL_INTERVAL, full_lock_key: FULL_LOCK_KEY) ⇒ Reaper

Returns a new instance of Reaper.



67
68
69
70
71
72
73
74
75
76
77
78
# File 'lib/wurk/fetcher/reaper.rb', line 67

def initialize(config, interval: DEFAULT_INTERVAL, lock_key: LOCK_KEY,
               full_interval: FULL_INTERVAL, full_lock_key: FULL_LOCK_KEY)
  @config = config
  @interval = interval
  @lock_key = lock_key
  @full_interval = full_interval
  @full_lock_key = full_lock_key
  @thread = nil
  @done = false
  @mutex = ::Mutex.new
  @sleeper = ::ConditionVariable.new
end

Instance Attribute Details

#configObject (readonly) Originally defined in module Component

Returns the value of attribute config.

#intervalObject (readonly)

Returns the value of attribute interval.



65
66
67
# File 'lib/wurk/fetcher/reaper.rb', line 65

def interval
  @interval
end

Instance Method Details

#default_tag(dir = Dir.pwd) ⇒ Object Originally defined in module Component

#fire_event(event, oneshot: true, reverse: false, reraise: false) ⇒ Object Originally defined in module Component

Invokes lifecycle hooks for event. Hooks run in registration order (or LIFO when reverse: true, used for teardown). A raise in one hook is reported via handle_exception and does NOT stop the next hook unless reraise: true (used in tests / fail-fast boot). oneshot: true clears the bucket after dispatch so the event can't fire twice.

#handle_exception(ex, ctx = {}) ⇒ Object Originally defined in module Component

#hostnameObject Originally defined in module Component

#identityObject Originally defined in module Component

#leader?Boolean Originally defined in module Component

True iff this process currently holds the cluster dear-leader lock. Per spec, the check is performed at call time (Wurk does not cache); callers must not poll faster than the 60s follower cadence. Returns false unconditionally when WURK_LEADER=false (or SIDEKIQ_LEADER=false) is set on the process (opt-out hot-standby). Any Redis error is swallowed → false, so a transient partition can't propagate as an exception into user code.

Spec: docs/target/sidekiq-ent.md §6.1.

Returns:

  • (Boolean)

#loggerObject Originally defined in module Component

--- delegated to config -------------------------------------------

#mono_msObject Originally defined in module Component

#process_nonceObject Originally defined in module Component

#real_msObject Originally defined in module Component

--- clocks ---------------------------------------------------------

#reapObject

One loop tick: the scoped sweep when this process wins the per-interval lock, plus the full-keyspace sweep when it also wins the hourly lock. Returns the total jobs reclaimed across both.



109
110
111
112
113
# File 'lib/wurk/fetcher/reaper.rb', line 109

def reap
  reclaimed = acquire_lock? ? reclaim! : 0
  reclaimed += reclaim_full! if acquire_full_lock?
  reclaimed
end

#reclaim!Object

One unguarded sweep over every served queue. Returns the number of jobs reclaimed (re-queued or killed). Public so boot paths and tests can drive a deterministic pass without the cluster lock.



118
119
120
121
# File 'lib/wurk/fetcher/reaper.rb', line 118

def reclaim!
  prefixes = live_process_prefixes
  served_queues.sum { |public_q| reclaim_queue(public_q, prefixes) }
end

#reclaim_full!Object

One unguarded full-keyspace sweep: every queue:*|* private list, even ones whose public queue this process doesn't serve. Returns the number of jobs reclaimed. Public so boot paths and tests can drive it without the hourly lock.



127
128
129
130
131
132
133
134
135
136
# File 'lib/wurk/fetcher/reaper.rb', line 127

def reclaim_full!
  prefixes = live_process_prefixes
  reclaimed = 0
  each_full_private_list do |key, public_q, host, pid|
    next if owner_alive?(host, pid, prefixes)

    reclaimed += drain(key, public_q)
  end
  reclaimed
end

#redisObject Originally defined in module Component

#running?Boolean

Returns:

  • (Boolean)


102
103
104
# File 'lib/wurk/fetcher/reaper.rb', line 102

def running?
  !@thread.nil? && @thread.alive?
end

#safe_thread(name, priority: nil, &block) ⇒ Object Originally defined in module Component

Spawns a named thread that runs block under watchdog(name). The parent must retain the returned Thread; otherwise GC may not, but report_on_exception is disabled so we don't double-log on death.

#startObject

Spawns the sweep loop. Idempotent. The loop waits one interval before its first sweep so booting processes don't dogpile Redis and so an un-stopped launcher in a unit test never touches the keyspace.



83
84
85
86
87
88
89
90
91
# File 'lib/wurk/fetcher/reaper.rb', line 83

def start
  @mutex.synchronize do
    return @thread if @thread

    @done = false
    @thread = spawn_loop_thread
  end
  @thread
end

#stopObject



93
94
95
96
97
98
99
100
# File 'lib/wurk/fetcher/reaper.rb', line 93

def stop
  @mutex.synchronize do
    @done = true
    @sleeper.signal
  end
  @thread&.join
  @thread = nil
end

#tidObject Originally defined in module Component

--- identity -------------------------------------------------------

#watchdog(last_words) ⇒ Object Originally defined in module Component

Wraps a block at a thread boundary: any unhandled exception is reported via handle_exception (so it lands in error_handlers / the log) and then re-raised. last_words is the component label included in the context.