Class: Wurk::Launcher
- Inherits:
-
Object
- Object
- Wurk::Launcher
- Includes:
- Component
- Defined in:
- lib/wurk/launcher.rb
Overview
Top-level supervisor inside each worker process. Owns the Manager pool (one per Capsule), the scheduler poller, and the heartbeat thread. The heartbeat WIRE lives in Wurk::Heartbeat — Launcher owns lifecycle, signal dispatch, and stats rollup; Heartbeat owns the Redis writes.
Lifecycle:
* `run(async_beat:)` — freeze config, start heartbeat, poller, managers.
* `quiet` — stop fetching across all managers + poller.
* `stop` — graceful drain inside `config[:timeout]`.
* `heartbeat` — one-shot beat (also driven by the heartbeat thread).
flush_stats rolls per-process Processor counters (PROCESSED / FAILURE
/ EXPIRED) into the global + per-day Redis strings every beat. Per-day
keys carry STATS_TTL so old days expire automatically.
Spec: docs/target/sidekiq-free.md §12 (Sidekiq::Launcher).
Constant Summary collapse
- STATS_TTL =
5 years, in seconds. Per-day
stat:processed:YYYY-MM-DD/stat:failed:YYYY-MM-DD/stat:expired:YYYY-MM-DDstrings carry this TTL so they roll off without manual cleanup. 5 * 365 * 24 * 60 * 60
- BEAT_PAUSE =
Re-exported for test/third-party callers that read it off Launcher (Sidekiq's drop-in surface). The single source of truth is Heartbeat.
Heartbeat::BEAT_PAUSE
Instance Attribute Summary collapse
-
#config ⇒ Object
included
from Component
readonly
Returns the value of attribute config.
-
#cron_poller ⇒ Object
Returns the value of attribute cron_poller.
-
#heartbeat_thread ⇒ Object
readonly
Used by tests to inspect the heartbeat thread; not part of the Sidekiq public surface.
-
#history ⇒ Object
Returns the value of attribute history.
-
#managers ⇒ Object
Returns the value of attribute managers.
-
#metrics_rollup ⇒ Object
Returns the value of attribute metrics_rollup.
-
#poller ⇒ Object
Returns the value of attribute poller.
-
#queue_rollup ⇒ Object
Returns the value of attribute queue_rollup.
Instance Method Summary collapse
- #default_tag(dir = Dir.pwd) ⇒ Object included from Component
-
#fire_event(event, oneshot: true, reverse: false, reraise: false) ⇒ Object
included
from Component
Invokes lifecycle hooks for
event. -
#flush_stats ⇒ Object
Rolls in-process Processor counters into Redis.
- #handle_exception(ex, ctx = {}) ⇒ Object included from Component
-
#heartbeat ⇒ Object
One-shot beat.
- #hostname ⇒ Object included from Component
- #identity ⇒ Object included from Component
-
#initialize(config, embedded: false) ⇒ Launcher
constructor
A new instance of Launcher.
-
#leader? ⇒ Boolean
included
from Component
True iff this process currently holds the cluster
dear-leaderlock. -
#logger ⇒ Object
included
from Component
--- delegated to config -------------------------------------------.
- #mono_ms ⇒ Object included from Component
- #process_nonce ⇒ Object included from Component
-
#quiet ⇒ Object
Idempotent.
-
#real_ms ⇒ Object
included
from Component
--- clocks ---------------------------------------------------------.
- #redis ⇒ Object included from Component
-
#run(async_beat: true) ⇒ Object
Boot order matters: 1.
-
#safe_thread(name, priority: nil, &block) ⇒ Object
included
from Component
Spawns a named thread that runs
blockunderwatchdog(name). -
#stop ⇒ Object
Graceful shutdown.
- #stopping? ⇒ Boolean
-
#tid ⇒ Object
included
from Component
--- identity -------------------------------------------------------.
-
#watchdog(last_words) ⇒ Object
included
from Component
Wraps a block at a thread boundary: any unhandled exception is reported via handle_exception (so it lands in error_handlers / the log) and then re-raised.
Constructor Details
#initialize(config, embedded: false) ⇒ Launcher
Returns a new instance of Launcher.
48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
# File 'lib/wurk/launcher.rb', line 48 def initialize(config, embedded: false) @config = config @embedded = # Two separate flags, deliberately. @done = "quieted" (stop fetching, stay # alive, report quiet=true). @stopped = "shutting down" (terminate the # heartbeat loop). Quiet must NOT stop the heartbeat — otherwise a quieted # process never publishes quiet=true and expires out of the live set (#236). @done = false @stopped = false @managers = config.capsules.values.map { |cap| Manager.new(cap) } @poller = build_poller @cron_poller = build_cron_poller @metrics_rollup = build_metrics_rollup @queue_rollup = build_queue_rollup @history = build_history @leader = build_leader @reaper = build_reaper @started_at = nil @heartbeat = nil @heartbeat_thread = nil @health_server = build_health_server end |
Instance Attribute Details
#config ⇒ Object (readonly) Originally defined in module Component
Returns the value of attribute config.
#cron_poller ⇒ Object
Returns the value of attribute cron_poller.
46 47 48 |
# File 'lib/wurk/launcher.rb', line 46 def cron_poller @cron_poller end |
#heartbeat_thread ⇒ Object (readonly)
Used by tests to inspect the heartbeat thread; not part of the Sidekiq public surface.
170 171 172 |
# File 'lib/wurk/launcher.rb', line 170 def heartbeat_thread @heartbeat_thread end |
#history ⇒ Object
Returns the value of attribute history.
46 47 48 |
# File 'lib/wurk/launcher.rb', line 46 def history @history end |
#managers ⇒ Object
Returns the value of attribute managers.
46 47 48 |
# File 'lib/wurk/launcher.rb', line 46 def managers @managers end |
#metrics_rollup ⇒ Object
Returns the value of attribute metrics_rollup.
46 47 48 |
# File 'lib/wurk/launcher.rb', line 46 def metrics_rollup @metrics_rollup end |
#poller ⇒ Object
Returns the value of attribute poller.
46 47 48 |
# File 'lib/wurk/launcher.rb', line 46 def poller @poller end |
#queue_rollup ⇒ Object
Returns the value of attribute queue_rollup.
46 47 48 |
# File 'lib/wurk/launcher.rb', line 46 def queue_rollup @queue_rollup end |
Instance Method Details
#default_tag(dir = Dir.pwd) ⇒ Object Originally defined in module Component
#fire_event(event, oneshot: true, reverse: false, reraise: false) ⇒ Object Originally defined in module Component
Invokes lifecycle hooks for event. Hooks run in registration order
(or LIFO when reverse: true, used for teardown). A raise in one hook
is reported via handle_exception and does NOT stop the next hook unless
reraise: true (used in tests / fail-fast boot). oneshot: true
clears the bucket after dispatch so the event can't fire twice.
#flush_stats ⇒ Object
Rolls in-process Processor counters into Redis. Pipelined so a single round trip covers all writes. Skips when all counters are zero to avoid touching keys we have nothing to add to.
154 155 156 157 158 159 160 161 162 163 164 165 166 |
# File 'lib/wurk/launcher.rb', line 154 def flush_stats processed = Processor::PROCESSED.reset failed = Processor::FAILURE.reset expired = Processor::EXPIRED.reset return if processed.zero? && failed.zero? && expired.zero? write_stats(processed, failed, expired) rescue StandardError => e # Replay-safety: counters were reset above, so a Redis blip would # otherwise drop stats. We log and accept — the per-job at-least-once # semantics don't apply to *counters*, and the next beat resets again. handle_exception(e, { context: 'flush_stats' }) end |
#handle_exception(ex, ctx = {}) ⇒ Object Originally defined in module Component
#heartbeat ⇒ Object
One-shot beat. Public for embedded mode (and for tests) — the
heartbeat thread calls this on BEAT_PAUSE cadence.
146 147 148 149 |
# File 'lib/wurk/launcher.rb', line 146 def heartbeat flush_stats beat end |
#hostname ⇒ Object Originally defined in module Component
#identity ⇒ Object Originally defined in module Component
#leader? ⇒ Boolean Originally defined in module Component
True iff this process currently holds the cluster dear-leader lock.
Per spec, the check is performed at call time (Wurk does not cache);
callers must not poll faster than the 60s follower cadence. Returns
false unconditionally when WURK_LEADER=false (or SIDEKIQ_LEADER=false)
is set on the process (opt-out hot-standby). Any Redis error is swallowed →
false, so a transient partition can't propagate as an exception into user
code.
Spec: docs/target/sidekiq-ent.md §6.1.
#logger ⇒ Object Originally defined in module Component
--- delegated to config -------------------------------------------
#mono_ms ⇒ Object Originally defined in module Component
#process_nonce ⇒ Object Originally defined in module Component
#quiet ⇒ Object
Idempotent. Flips stopping? true, halts fetching across every
Manager + the poller, then fires the :quiet event in reverse
registration order so teardown hooks run LIFO.
104 105 106 107 108 109 110 111 112 113 114 |
# File 'lib/wurk/launcher.rb', line 104 def quiet return if @done @done = true @managers.each(&:quiet) @poller&.terminate # The cron poller is intentionally NOT terminated here: a USR1-quieted # leader still enqueues periodic jobs — it only stops fetching for itself. # Loops stop only on full shutdown (#stop). Spec: sidekiq-ent.md §2.6. fire_event(:quiet, reverse: true) end |
#real_ms ⇒ Object Originally defined in module Component
--- clocks ---------------------------------------------------------
#redis ⇒ Object Originally defined in module Component
#run(async_beat: true) ⇒ Object
Boot order matters:
1. freeze! the config so mutations after fork are visible mistakes.
2. spawn the heartbeat thread BEFORE the managers so the dashboard
sees the process the moment it can pick up jobs.
3. start the scheduler poller + the cron poller (both leader-gated for
what they enqueue; safe to start before leadership is settled since
a non-leader tick just returns early).
4. start the managers (which start their processors).
5. start the health probe server LAST so the listener doesn't
accept k8s probes until the rest of the launcher is up.
81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 |
# File 'lib/wurk/launcher.rb', line 81 def run(async_beat: true) @started_at = Time.now.to_f # Default each capsule's fetcher + materialize its lazy pools/middleware # before the config freezes. Every entry point (swarm child, standalone # CLI, embedded) runs through here, so none boots with a nil fetcher. @config.capsules.each_value(&:prepare!) @config.freeze! @heartbeat_thread = safe_thread('heartbeat', &method(:start_heartbeat)) if async_beat @poller&.start @leader&.start @cron_poller&.start @metrics_rollup&.start @queue_rollup&.start @history&.start @managers.each(&:start) @reaper.start boot_reclaim @health_server&.start end |
#safe_thread(name, priority: nil, &block) ⇒ Object Originally defined in module Component
Spawns a named thread that runs block under watchdog(name). The
parent must retain the returned Thread; otherwise GC may not, but
report_on_exception is disabled so we don't double-log on death.
#stop ⇒ Object
Graceful shutdown. Deadline is monotonic so wall-clock skew can't extend it. Managers stop in parallel threads so a slow capsule doesn't block its siblings.
119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 |
# File 'lib/wurk/launcher.rb', line 119 def stop deadline = ::Process.clock_gettime(::Process::CLOCK_MONOTONIC) + (@config[:timeout] || 25) quiet stoppers = @managers.map { |m| Thread.new { m.stop(deadline) } } fire_event(:shutdown, reverse: true) stoppers.each(&:join) # Full shutdown stops periodic firing (it survived #quiet); do this before # releasing the lock so no tick races a follower's promotion. @cron_poller&.terminate @metrics_rollup&.terminate @queue_rollup&.terminate @history&.terminate @reaper&.stop # CAS-release the cluster lock now (planned shutdown) so a follower can # take over immediately instead of waiting out the TTL. @leader&.stop stop_heartbeat clear_heartbeat fire_event(:exit, reverse: true) end |
#stopping? ⇒ Boolean
140 141 142 |
# File 'lib/wurk/launcher.rb', line 140 def stopping? @done end |
#tid ⇒ Object Originally defined in module Component
--- identity -------------------------------------------------------
#watchdog(last_words) ⇒ Object Originally defined in module Component
Wraps a block at a thread boundary: any unhandled exception is reported
via handle_exception (so it lands in error_handlers / the log) and then
re-raised. last_words is the component label included in the context.