Class: Wurk::Processor

Inherits:
Object
  • Object
show all
Includes:
Component
Defined in:
lib/wurk/processor.rb

Overview

Inside each Manager, N Processors run in parallel. Each owns one thread, pulls a UnitOfWork from the capsule's fetcher, parses the payload, walks the server middleware chain, invokes perform, then ACKs (removes the payload from the per-process private list).

Shutdown is two-stage:

* `terminate` flips a flag; the run loop exits between jobs.
* `kill` additionally raises `Wurk::Shutdown` into the thread so an
in-flight perform unwinds. The current UoW is NOT acked, so the
payload survives in the private list and is reclaimed on next boot.

Spec: docs/target/sidekiq-free.md §14.

Defined Under Namespace

Classes: Counter, SharedWorkState

Constant Summary collapse

PROCESSED =
Counter.new
FAILURE =
Counter.new
EXPIRED =
Counter.new
WORK_STATE =
SharedWorkState.new

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(capsule, &callback) ⇒ Processor

Returns a new instance of Processor.



28
29
30
31
32
33
34
35
36
37
38
# File 'lib/wurk/processor.rb', line 28

def initialize(capsule, &callback)
  @capsule = capsule
  @config = capsule
  @callback = callback
  @done = false
  @job = nil
  @thread = nil
  @reloader = capsule.config[:reloader] || proc { |&b| b.call }
  @job_logger = (capsule.config[:job_logger] || JobLogger).new(capsule.config)
  @retrier = JobRetry.new(capsule)
end

Instance Attribute Details

#capsuleObject (readonly)

Returns the value of attribute capsule.



26
27
28
# File 'lib/wurk/processor.rb', line 26

def capsule
  @capsule
end

#configObject (readonly) Originally defined in module Component

Returns the value of attribute config.

#jobObject (readonly)

Returns the value of attribute job.



26
27
28
# File 'lib/wurk/processor.rb', line 26

def job
  @job
end

#threadObject (readonly)

Returns the value of attribute thread.



26
27
28
# File 'lib/wurk/processor.rb', line 26

def thread
  @thread
end

Instance Method Details

#default_tag(dir = Dir.pwd) ⇒ Object Originally defined in module Component

#fire_event(event, oneshot: true, reverse: false, reraise: false) ⇒ Object Originally defined in module Component

Invokes lifecycle hooks for event. Hooks run in registration order (or LIFO when reverse: true, used for teardown). A raise in one hook is reported via handle_exception and does NOT stop the next hook unless reraise: true (used in tests / fail-fast boot). oneshot: true clears the bucket after dispatch so the event can't fire twice.

#handle_exception(ex, ctx = {}) ⇒ Object

Capsule doesn't define handle_exception (it's a Configuration method); override Component's delegation so error handlers fire.



70
71
72
# File 'lib/wurk/processor.rb', line 70

def handle_exception(ex, ctx = {})
  @capsule.config.handle_exception(ex, ctx)
end

#hostnameObject Originally defined in module Component

#identityObject Originally defined in module Component

#kill(wait = false) ⇒ Object

Hard-stop: flips the flag and unwinds the in-flight job by raising Wurk::Shutdown into the worker thread. The UoW is intentionally not acked — the payload remains in the private list and is reclaimed on next boot via Reliable#bulk_requeue.



52
53
54
55
56
57
58
# File 'lib/wurk/processor.rb', line 52

def kill(wait = false) # rubocop:disable Style/OptionalBooleanParameter
  @done = true
  return if @thread.nil?

  @thread.raise ::Wurk::Shutdown
  @thread.value if wait
end

#leader?Boolean Originally defined in module Component

True iff this process currently holds the cluster dear-leader lock. Per spec, the check is performed at call time (Wurk does not cache); callers must not poll faster than the 60s follower cadence. Returns false unconditionally when WURK_LEADER=false (or SIDEKIQ_LEADER=false) is set on the process (opt-out hot-standby). Any Redis error is swallowed → false, so a transient partition can't propagate as an exception into user code.

Spec: docs/target/sidekiq-ent.md §6.1.

Returns:

  • (Boolean)

#loggerObject Originally defined in module Component

--- delegated to config -------------------------------------------

#mono_msObject Originally defined in module Component

#process_nonceObject Originally defined in module Component

#process_oneObject

Single iteration: fetch one UoW, process it. Public so tests can drive the loop step-by-step without spawning a thread.



76
77
78
79
80
# File 'lib/wurk/processor.rb', line 76

def process_one
  @job = fetch
  process(@job) if @job
  @job = nil
end

#real_msObject Originally defined in module Component

--- clocks ---------------------------------------------------------

#redisObject Originally defined in module Component

#safe_thread(name, priority: nil, &block) ⇒ Object Originally defined in module Component

Spawns a named thread that runs block under watchdog(name). The parent must retain the returned Thread; otherwise GC may not, but report_on_exception is disabled so we don't double-log on death.

#startObject



64
65
66
# File 'lib/wurk/processor.rb', line 64

def start
  @thread ||= safe_thread("#{@capsule.name}/processor", &method(:run)) # rubocop:disable Naming/MemoizedInstanceVariableName
end

#stopping?Boolean

Returns:

  • (Boolean)


60
61
62
# File 'lib/wurk/processor.rb', line 60

def stopping?
  @done
end

#terminate(wait = false) ⇒ Object

Sidekiq surface — positional boolean to match the drop-in contract.



41
42
43
44
45
46
# File 'lib/wurk/processor.rb', line 41

def terminate(wait = false) # rubocop:disable Style/OptionalBooleanParameter
  @done = true
  return if @thread.nil?

  @thread.value if wait
end

#tidObject Originally defined in module Component

--- identity -------------------------------------------------------

#watchdog(last_words) ⇒ Object Originally defined in module Component

Wraps a block at a thread boundary: any unhandled exception is reported via handle_exception (so it lands in error_handlers / the log) and then re-raised. last_words is the component label included in the context.