Class: Wurk::Manager
Overview
One per Capsule. Lives inside each forked child and owns the Processor pool. Replaces dead processors on the fly (replace-on-die), forwards the quiet/stop signals received by the Swarm to its processors, and ensures in-flight UnitsOfWork are bulk_requeued before threads are killed.
Lifecycle:
* `start` — spawn each processor thread.
* `quiet` — stop fetching; in-flight jobs run to completion.
* `stop(deadline)`— quiet + wait for drain; hard_shutdown on timeout.
Spec: docs/target/sidekiq-free.md §13.
Constant Summary collapse
- PAUSE_TIME =
0.1 in TTY mode so interactive shutdown feels snappy; 0.5 in production so the supervisor isn't spinning while threads drain.
$stdout.tty? ? 0.1 : 0.5
Instance Attribute Summary collapse
-
#capsule ⇒ Object
readonly
Returns the value of attribute capsule.
-
#config ⇒ Object
included
from Component
readonly
Returns the value of attribute config.
-
#workers ⇒ Object
readonly
Returns the value of attribute workers.
Instance Method Summary collapse
- #default_tag(dir = Dir.pwd) ⇒ Object included from Component
-
#fire_event(event, oneshot: true, reverse: false, reraise: false) ⇒ Object
included
from Component
Invokes lifecycle hooks for
event. - #handle_exception(ex, ctx = {}) ⇒ Object included from Component
-
#hard_shutdown ⇒ Object
Reached when the deadline expired with workers still busy.
- #hostname ⇒ Object included from Component
- #identity ⇒ Object included from Component
-
#initialize(capsule) ⇒ Manager
constructor
A new instance of Manager.
-
#leader? ⇒ Boolean
included
from Component
True iff this process currently holds the cluster
dear-leaderlock. -
#logger ⇒ Object
included
from Component
--- delegated to config -------------------------------------------.
- #mono_ms ⇒ Object included from Component
- #process_nonce ⇒ Object included from Component
-
#processor_result(processor, _reason = nil) ⇒ Object
Processor#run callback: invoked when a Processor thread exits, whether cleanly or via raised exception.
- #quiet ⇒ Object
-
#real_ms ⇒ Object
included
from Component
--- clocks ---------------------------------------------------------.
- #redis ⇒ Object included from Component
-
#safe_thread(name, priority: nil, &block) ⇒ Object
included
from Component
Spawns a named thread that runs
blockunderwatchdog(name). - #start ⇒ Object
-
#stop(deadline) ⇒ Object
Graceful shutdown: quiet first, then poll for workers to clear.
- #stopped? ⇒ Boolean
-
#tid ⇒ Object
included
from Component
--- identity -------------------------------------------------------.
-
#watchdog(last_words) ⇒ Object
included
from Component
Wraps a block at a thread boundary: any unhandled exception is reported via handle_exception (so it lands in error_handlers / the log) and then re-raised.
Constructor Details
#initialize(capsule) ⇒ Manager
Returns a new instance of Manager.
27 28 29 30 31 32 33 34 35 36 37 38 |
# File 'lib/wurk/manager.rb', line 27 def initialize(capsule) @config = @capsule = capsule @count = capsule.concurrency raise ArgumentError, "Concurrency of #{@count} is not supported" if @count < 1 @done = false @workers = Set.new @plock = ::Mutex.new @count.times do @workers << Processor.new(@capsule, &method(:processor_result)) end end |
Instance Attribute Details
#capsule ⇒ Object (readonly)
Returns the value of attribute capsule.
25 26 27 |
# File 'lib/wurk/manager.rb', line 25 def capsule @capsule end |
#config ⇒ Object (readonly) Originally defined in module Component
Returns the value of attribute config.
#workers ⇒ Object (readonly)
Returns the value of attribute workers.
25 26 27 |
# File 'lib/wurk/manager.rb', line 25 def workers @workers end |
Instance Method Details
#default_tag(dir = Dir.pwd) ⇒ Object Originally defined in module Component
#fire_event(event, oneshot: true, reverse: false, reraise: false) ⇒ Object Originally defined in module Component
Invokes lifecycle hooks for event. Hooks run in registration order
(or LIFO when reverse: true, used for teardown). A raise in one hook
is reported via handle_exception and does NOT stop the next hook unless
reraise: true (used in tests / fail-fast boot). oneshot: true
clears the bucket after dispatch so the event can't fire twice.
#handle_exception(ex, ctx = {}) ⇒ Object Originally defined in module Component
#hard_shutdown ⇒ Object
Reached when the deadline expired with workers still busy. We must push their in-flight UoWs back to the public queues BEFORE raising Wurk::Shutdown into the threads — losing a job is worse than running it twice (Sidekiq's at-least-once contract).
94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 |
# File 'lib/wurk/manager.rb', line 94 def hard_shutdown # rubocop:disable Metrics/AbcSize cleanup = nil @plock.synchronize do cleanup = @workers.dup end if cleanup.any? jobs = cleanup.map(&:job).compact logger.warn { "Terminating #{cleanup.size} busy threads" } logger.debug { "Jobs still in progress #{jobs.inspect}" } capsule.fetcher.bulk_requeue(jobs) end cleanup.each(&:kill) # The caller typically `exit`s immediately after we return; give # threads a brief window to run their `ensure` blocks. deadline = ::Process.clock_gettime(::Process::CLOCK_MONOTONIC) + 3 wait_for(deadline) { @workers.empty? } end |
#hostname ⇒ Object Originally defined in module Component
#identity ⇒ Object Originally defined in module Component
#leader? ⇒ Boolean Originally defined in module Component
True iff this process currently holds the cluster dear-leader lock.
Per spec, the check is performed at call time (Wurk does not cache);
callers must not poll faster than the 60s follower cadence. Returns
false unconditionally when WURK_LEADER=false (or SIDEKIQ_LEADER=false)
is set on the process (opt-out hot-standby). Any Redis error is swallowed →
false, so a transient partition can't propagate as an exception into user
code.
Spec: docs/target/sidekiq-ent.md §6.1.
#logger ⇒ Object Originally defined in module Component
--- delegated to config -------------------------------------------
#mono_ms ⇒ Object Originally defined in module Component
#process_nonce ⇒ Object Originally defined in module Component
#processor_result(processor, _reason = nil) ⇒ Object
Processor#run callback: invoked when a Processor thread exits, whether cleanly or via raised exception. Removes the dead processor from the pool and (unless we're already stopping) spawns a replacement so the capsule's concurrency stays constant.
79 80 81 82 83 84 85 86 87 88 |
# File 'lib/wurk/manager.rb', line 79 def processor_result(processor, _reason = nil) @plock.synchronize do @workers.delete(processor) unless @done p = Processor.new(@capsule, &method(:processor_result)) @workers << p p.start end end end |
#quiet ⇒ Object
44 45 46 47 48 49 50 |
# File 'lib/wurk/manager.rb', line 44 def quiet return if @done @done = true logger.info { "Terminating quiet threads for #{capsule.name} capsule" } @workers.each(&:terminate) end |
#real_ms ⇒ Object Originally defined in module Component
--- clocks ---------------------------------------------------------
#redis ⇒ Object Originally defined in module Component
#safe_thread(name, priority: nil, &block) ⇒ Object Originally defined in module Component
Spawns a named thread that runs block under watchdog(name). The
parent must retain the returned Thread; otherwise GC may not, but
report_on_exception is disabled so we don't double-log on death.
#start ⇒ Object
40 41 42 |
# File 'lib/wurk/manager.rb', line 40 def start @workers.each(&:start) end |
#stop(deadline) ⇒ Object
Graceful shutdown: quiet first, then poll for workers to clear. If the deadline elapses with workers still alive we fall through to hard_shutdown, which bulk_requeues their UoWs before killing threads.
55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
# File 'lib/wurk/manager.rb', line 55 def stop(deadline) quiet # Lifecycle hooks (e.g. :quiet) can be async; give them a tick to settle # before we start polling. Matches Sidekiq's PAUSE_TIME behavior. sleep PAUSE_TIME return if @workers.empty? logger.info { 'Pausing to allow jobs to finish...' } wait_for(deadline) { @workers.empty? } return if @workers.empty? hard_shutdown ensure capsule.stop end |
#stopped? ⇒ Boolean
71 72 73 |
# File 'lib/wurk/manager.rb', line 71 def stopped? @done end |
#tid ⇒ Object Originally defined in module Component
--- identity -------------------------------------------------------
#watchdog(last_words) ⇒ Object Originally defined in module Component
Wraps a block at a thread boundary: any unhandled exception is reported
via handle_exception (so it lands in error_handlers / the log) and then
re-raised. last_words is the component label included in the context.