Class: Wurk::Launcher

Inherits:
Object
  • Object
show all
Includes:
Component
Defined in:
lib/wurk/launcher.rb

Overview

Top-level supervisor inside each worker process. Owns the Manager pool (one per Capsule), the scheduler poller, and the heartbeat thread. The heartbeat WIRE lives in Wurk::Heartbeat — Launcher owns lifecycle, signal dispatch, and stats rollup; Heartbeat owns the Redis writes.

Lifecycle:

* `run(async_beat:)` — freeze config, start heartbeat, poller, managers.
* `quiet`            — stop fetching across all managers + poller.
* `stop`             — graceful drain inside `config[:timeout]`.
* `heartbeat`        — one-shot beat (also driven by the heartbeat thread).

flush_stats rolls per-process Processor counters (PROCESSED / FAILURE / EXPIRED) into the global + per-day Redis strings every beat. Per-day keys carry STATS_TTL so old days expire automatically.

Spec: docs/target/sidekiq-free.md §12 (Sidekiq::Launcher).

Constant Summary collapse

STATS_TTL =

5 years, in seconds. Per-day stat:processed:YYYY-MM-DD / stat:failed:YYYY-MM-DD / stat:expired:YYYY-MM-DD strings carry this TTL so they roll off without manual cleanup.

5 * 365 * 24 * 60 * 60
BEAT_PAUSE =

Re-exported for test/third-party callers that read it off Launcher (Sidekiq's drop-in surface). The single source of truth is Heartbeat.

Heartbeat::BEAT_PAUSE

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config, embedded: false) ⇒ Launcher

Returns a new instance of Launcher.



48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
# File 'lib/wurk/launcher.rb', line 48

def initialize(config, embedded: false)
  @config = config
  @embedded = embedded
  # Two separate flags, deliberately. @done = "quieted" (stop fetching, stay
  # alive, report quiet=true). @stopped = "shutting down" (terminate the
  # heartbeat loop). Quiet must NOT stop the heartbeat — otherwise a quieted
  # process never publishes quiet=true and expires out of the live set (#236).
  @done = false
  @stopped = false
  @managers = config.capsules.values.map { |cap| Manager.new(cap) }
  @poller = build_poller
  @cron_poller = build_cron_poller
  @metrics_rollup = build_metrics_rollup
  @queue_rollup = build_queue_rollup
  @history = build_history
  @leader = build_leader
  @reaper = build_reaper
  @started_at = nil
  @heartbeat = nil
  @heartbeat_thread = nil
  @health_server = build_health_server
end

Instance Attribute Details

#configObject (readonly) Originally defined in module Component

Returns the value of attribute config.

#cron_pollerObject

Returns the value of attribute cron_poller.



46
47
48
# File 'lib/wurk/launcher.rb', line 46

def cron_poller
  @cron_poller
end

#heartbeat_threadObject (readonly)

Used by tests to inspect the heartbeat thread; not part of the Sidekiq public surface.



170
171
172
# File 'lib/wurk/launcher.rb', line 170

def heartbeat_thread
  @heartbeat_thread
end

#historyObject

Returns the value of attribute history.



46
47
48
# File 'lib/wurk/launcher.rb', line 46

def history
  @history
end

#managersObject

Returns the value of attribute managers.



46
47
48
# File 'lib/wurk/launcher.rb', line 46

def managers
  @managers
end

#metrics_rollupObject

Returns the value of attribute metrics_rollup.



46
47
48
# File 'lib/wurk/launcher.rb', line 46

def metrics_rollup
  @metrics_rollup
end

#pollerObject

Returns the value of attribute poller.



46
47
48
# File 'lib/wurk/launcher.rb', line 46

def poller
  @poller
end

#queue_rollupObject

Returns the value of attribute queue_rollup.



46
47
48
# File 'lib/wurk/launcher.rb', line 46

def queue_rollup
  @queue_rollup
end

Instance Method Details

#default_tag(dir = Dir.pwd) ⇒ Object Originally defined in module Component

#fire_event(event, oneshot: true, reverse: false, reraise: false) ⇒ Object Originally defined in module Component

Invokes lifecycle hooks for event. Hooks run in registration order (or LIFO when reverse: true, used for teardown). A raise in one hook is reported via handle_exception and does NOT stop the next hook unless reraise: true (used in tests / fail-fast boot). oneshot: true clears the bucket after dispatch so the event can't fire twice.

#flush_statsObject

Rolls in-process Processor counters into Redis. Pipelined so a single round trip covers all writes. Skips when all counters are zero to avoid touching keys we have nothing to add to.



154
155
156
157
158
159
160
161
162
163
164
165
166
# File 'lib/wurk/launcher.rb', line 154

def flush_stats
  processed = Processor::PROCESSED.reset
  failed = Processor::FAILURE.reset
  expired = Processor::EXPIRED.reset
  return if processed.zero? && failed.zero? && expired.zero?

  write_stats(processed, failed, expired)
rescue StandardError => e
  # Replay-safety: counters were reset above, so a Redis blip would
  # otherwise drop stats. We log and accept — the per-job at-least-once
  # semantics don't apply to *counters*, and the next beat resets again.
  handle_exception(e, { context: 'flush_stats' })
end

#handle_exception(ex, ctx = {}) ⇒ Object Originally defined in module Component

#heartbeatObject

One-shot beat. Public for embedded mode (and for tests) — the heartbeat thread calls this on BEAT_PAUSE cadence.



146
147
148
149
# File 'lib/wurk/launcher.rb', line 146

def heartbeat
  flush_stats
  beat
end

#hostnameObject Originally defined in module Component

#identityObject Originally defined in module Component

#leader?Boolean Originally defined in module Component

True iff this process currently holds the cluster dear-leader lock. Per spec, the check is performed at call time (Wurk does not cache); callers must not poll faster than the 60s follower cadence. Returns false unconditionally when WURK_LEADER=false (or SIDEKIQ_LEADER=false) is set on the process (opt-out hot-standby). Any Redis error is swallowed → false, so a transient partition can't propagate as an exception into user code.

Spec: docs/target/sidekiq-ent.md §6.1.

Returns:

  • (Boolean)

#loggerObject Originally defined in module Component

--- delegated to config -------------------------------------------

#mono_msObject Originally defined in module Component

#process_nonceObject Originally defined in module Component

#quietObject

Idempotent. Flips stopping? true, halts fetching across every Manager + the poller, then fires the :quiet event in reverse registration order so teardown hooks run LIFO.



104
105
106
107
108
109
110
111
112
113
114
# File 'lib/wurk/launcher.rb', line 104

def quiet
  return if @done

  @done = true
  @managers.each(&:quiet)
  @poller&.terminate
  # The cron poller is intentionally NOT terminated here: a USR1-quieted
  # leader still enqueues periodic jobs — it only stops fetching for itself.
  # Loops stop only on full shutdown (#stop). Spec: sidekiq-ent.md §2.6.
  fire_event(:quiet, reverse: true)
end

#real_msObject Originally defined in module Component

--- clocks ---------------------------------------------------------

#redisObject Originally defined in module Component

#run(async_beat: true) ⇒ Object

Boot order matters:

1. freeze! the config so mutations after fork are visible mistakes.
2. spawn the heartbeat thread BEFORE the managers so the dashboard
 sees the process the moment it can pick up jobs.
3. start the scheduler poller + the cron poller (both leader-gated for
 what they enqueue; safe to start before leadership is settled since
 a non-leader tick just returns early).
4. start the managers (which start their processors).
5. start the health probe server LAST so the listener doesn't
 accept k8s probes until the rest of the launcher is up.


81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
# File 'lib/wurk/launcher.rb', line 81

def run(async_beat: true)
  @started_at = Time.now.to_f
  # Default each capsule's fetcher + materialize its lazy pools/middleware
  # before the config freezes. Every entry point (swarm child, standalone
  # CLI, embedded) runs through here, so none boots with a nil fetcher.
  @config.capsules.each_value(&:prepare!)
  @config.freeze!
  @heartbeat_thread = safe_thread('heartbeat', &method(:start_heartbeat)) if async_beat
  @poller&.start
  @leader&.start
  @cron_poller&.start
  @metrics_rollup&.start
  @queue_rollup&.start
  @history&.start
  @managers.each(&:start)
  @reaper.start
  boot_reclaim
  @health_server&.start
end

#safe_thread(name, priority: nil, &block) ⇒ Object Originally defined in module Component

Spawns a named thread that runs block under watchdog(name). The parent must retain the returned Thread; otherwise GC may not, but report_on_exception is disabled so we don't double-log on death.

#stopObject

Graceful shutdown. Deadline is monotonic so wall-clock skew can't extend it. Managers stop in parallel threads so a slow capsule doesn't block its siblings.



119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
# File 'lib/wurk/launcher.rb', line 119

def stop
  deadline = ::Process.clock_gettime(::Process::CLOCK_MONOTONIC) + (@config[:timeout] || 25)
  quiet
  stoppers = @managers.map { |m| Thread.new { m.stop(deadline) } }
  fire_event(:shutdown, reverse: true)
  stoppers.each(&:join)
  # Full shutdown stops periodic firing (it survived #quiet); do this before
  # releasing the lock so no tick races a follower's promotion.
  @cron_poller&.terminate
  @metrics_rollup&.terminate
  @queue_rollup&.terminate
  @history&.terminate
  @reaper&.stop
  # CAS-release the cluster lock now (planned shutdown) so a follower can
  # take over immediately instead of waiting out the TTL.
  @leader&.stop
  stop_heartbeat
  clear_heartbeat
  fire_event(:exit, reverse: true)
end

#stopping?Boolean

Returns:

  • (Boolean)


140
141
142
# File 'lib/wurk/launcher.rb', line 140

def stopping?
  @done
end

#tidObject Originally defined in module Component

--- identity -------------------------------------------------------

#watchdog(last_words) ⇒ Object Originally defined in module Component

Wraps a block at a thread boundary: any unhandled exception is reported via handle_exception (so it lands in error_handlers / the log) and then re-raised. last_words is the component label included in the context.