Class: Wurk::Processor
- Inherits:
-
Object
- Object
- Wurk::Processor
- Includes:
- Component
- Defined in:
- lib/wurk/processor.rb
Overview
Inside each Manager, N Processors run in parallel. Each owns one thread,
pulls a UnitOfWork from the capsule's fetcher, parses the payload, walks
the server middleware chain, invokes perform, then ACKs (removes the
payload from the per-process private list).
Shutdown is two-stage:
* `terminate` flips a flag; the run loop exits between jobs.
* `kill` additionally raises `Wurk::Shutdown` into the thread so an
in-flight perform unwinds. The current UoW is NOT acked, so the
payload survives in the private list and is reclaimed on next boot.
Spec: docs/target/sidekiq-free.md §14.
Defined Under Namespace
Classes: Counter, SharedWorkState
Constant Summary collapse
- PROCESSED =
Counter.new
- FAILURE =
Counter.new
- EXPIRED =
Counter.new
- WORK_STATE =
SharedWorkState.new
Instance Attribute Summary collapse
-
#capsule ⇒ Object
readonly
Returns the value of attribute capsule.
-
#config ⇒ Object
included
from Component
readonly
Returns the value of attribute config.
-
#job ⇒ Object
readonly
Returns the value of attribute job.
-
#thread ⇒ Object
readonly
Returns the value of attribute thread.
Instance Method Summary collapse
- #default_tag(dir = Dir.pwd) ⇒ Object included from Component
-
#fire_event(event, oneshot: true, reverse: false, reraise: false) ⇒ Object
included
from Component
Invokes lifecycle hooks for
event. -
#handle_exception(ex, ctx = {}) ⇒ Object
Capsule doesn't define handle_exception (it's a Configuration method); override Component's delegation so error handlers fire.
- #hostname ⇒ Object included from Component
- #identity ⇒ Object included from Component
-
#initialize(capsule, &callback) ⇒ Processor
constructor
A new instance of Processor.
-
#kill(wait = false) ⇒ Object
Hard-stop: flips the flag and unwinds the in-flight job by raising Wurk::Shutdown into the worker thread.
-
#leader? ⇒ Boolean
included
from Component
True iff this process currently holds the cluster
dear-leaderlock. -
#logger ⇒ Object
included
from Component
--- delegated to config -------------------------------------------.
- #mono_ms ⇒ Object included from Component
- #process_nonce ⇒ Object included from Component
-
#process_one ⇒ Object
Single iteration: fetch one UoW, process it.
-
#real_ms ⇒ Object
included
from Component
--- clocks ---------------------------------------------------------.
- #redis ⇒ Object included from Component
-
#safe_thread(name, priority: nil, &block) ⇒ Object
included
from Component
Spawns a named thread that runs
blockunderwatchdog(name). - #start ⇒ Object
- #stopping? ⇒ Boolean
-
#terminate(wait = false) ⇒ Object
Sidekiq surface — positional boolean to match the drop-in contract.
-
#tid ⇒ Object
included
from Component
--- identity -------------------------------------------------------.
-
#watchdog(last_words) ⇒ Object
included
from Component
Wraps a block at a thread boundary: any unhandled exception is reported via handle_exception (so it lands in error_handlers / the log) and then re-raised.
Constructor Details
#initialize(capsule, &callback) ⇒ Processor
Returns a new instance of Processor.
28 29 30 31 32 33 34 35 36 37 38 |
# File 'lib/wurk/processor.rb', line 28 def initialize(capsule, &callback) @capsule = capsule @config = capsule @callback = callback @done = false @job = nil @thread = nil @reloader = capsule.config[:reloader] || proc { |&b| b.call } @job_logger = (capsule.config[:job_logger] || JobLogger).new(capsule.config) @retrier = JobRetry.new(capsule) end |
Instance Attribute Details
#capsule ⇒ Object (readonly)
Returns the value of attribute capsule.
26 27 28 |
# File 'lib/wurk/processor.rb', line 26 def capsule @capsule end |
#config ⇒ Object (readonly) Originally defined in module Component
Returns the value of attribute config.
#job ⇒ Object (readonly)
Returns the value of attribute job.
26 27 28 |
# File 'lib/wurk/processor.rb', line 26 def job @job end |
#thread ⇒ Object (readonly)
Returns the value of attribute thread.
26 27 28 |
# File 'lib/wurk/processor.rb', line 26 def thread @thread end |
Instance Method Details
#default_tag(dir = Dir.pwd) ⇒ Object Originally defined in module Component
#fire_event(event, oneshot: true, reverse: false, reraise: false) ⇒ Object Originally defined in module Component
Invokes lifecycle hooks for event. Hooks run in registration order
(or LIFO when reverse: true, used for teardown). A raise in one hook
is reported via handle_exception and does NOT stop the next hook unless
reraise: true (used in tests / fail-fast boot). oneshot: true
clears the bucket after dispatch so the event can't fire twice.
#handle_exception(ex, ctx = {}) ⇒ Object
Capsule doesn't define handle_exception (it's a Configuration method); override Component's delegation so error handlers fire.
70 71 72 |
# File 'lib/wurk/processor.rb', line 70 def handle_exception(ex, ctx = {}) @capsule.config.handle_exception(ex, ctx) end |
#hostname ⇒ Object Originally defined in module Component
#identity ⇒ Object Originally defined in module Component
#kill(wait = false) ⇒ Object
Hard-stop: flips the flag and unwinds the in-flight job by raising Wurk::Shutdown into the worker thread. The UoW is intentionally not acked — the payload remains in the private list and is reclaimed on next boot via Reliable#bulk_requeue.
52 53 54 55 56 57 58 |
# File 'lib/wurk/processor.rb', line 52 def kill(wait = false) # rubocop:disable Style/OptionalBooleanParameter @done = true return if @thread.nil? @thread.raise ::Wurk::Shutdown @thread.value if wait end |
#leader? ⇒ Boolean Originally defined in module Component
True iff this process currently holds the cluster dear-leader lock.
Per spec, the check is performed at call time (Wurk does not cache);
callers must not poll faster than the 60s follower cadence. Returns
false unconditionally when WURK_LEADER=false (or SIDEKIQ_LEADER=false)
is set on the process (opt-out hot-standby). Any Redis error is swallowed →
false, so a transient partition can't propagate as an exception into user
code.
Spec: docs/target/sidekiq-ent.md §6.1.
#logger ⇒ Object Originally defined in module Component
--- delegated to config -------------------------------------------
#mono_ms ⇒ Object Originally defined in module Component
#process_nonce ⇒ Object Originally defined in module Component
#process_one ⇒ Object
Single iteration: fetch one UoW, process it. Public so tests can drive the loop step-by-step without spawning a thread.
76 77 78 79 80 |
# File 'lib/wurk/processor.rb', line 76 def process_one @job = fetch process(@job) if @job @job = nil end |
#real_ms ⇒ Object Originally defined in module Component
--- clocks ---------------------------------------------------------
#redis ⇒ Object Originally defined in module Component
#safe_thread(name, priority: nil, &block) ⇒ Object Originally defined in module Component
Spawns a named thread that runs block under watchdog(name). The
parent must retain the returned Thread; otherwise GC may not, but
report_on_exception is disabled so we don't double-log on death.
#start ⇒ Object
64 65 66 |
# File 'lib/wurk/processor.rb', line 64 def start @thread ||= safe_thread("#{@capsule.name}/processor", &method(:run)) # rubocop:disable Naming/MemoizedInstanceVariableName end |
#stopping? ⇒ Boolean
60 61 62 |
# File 'lib/wurk/processor.rb', line 60 def stopping? @done end |
#terminate(wait = false) ⇒ Object
Sidekiq surface — positional boolean to match the drop-in contract.
41 42 43 44 45 46 |
# File 'lib/wurk/processor.rb', line 41 def terminate(wait = false) # rubocop:disable Style/OptionalBooleanParameter @done = true return if @thread.nil? @thread.value if wait end |
#tid ⇒ Object Originally defined in module Component
--- identity -------------------------------------------------------
#watchdog(last_words) ⇒ Object Originally defined in module Component
Wraps a block at a thread boundary: any unhandled exception is reported
via handle_exception (so it lands in error_handlers / the log) and then
re-raised. last_words is the component label included in the context.