Class: Wurk::Fetcher::Reliable
- Inherits:
-
Fetcher
- Object
- Fetcher
- Wurk::Fetcher::Reliable
- Includes:
- Component
- Defined in:
- lib/wurk/fetcher/reliable.rb
Overview
Default fetcher. Each public queue is paired with a per-process
private list (queue:<name>|<host>|<pid>|<idx>); a job is moved
atomically from the public tail to the private head via LMOVE, and
stays there until the Processor explicitly ACKs (LREM). SIGKILL
between fetch and ack leaves the job in the private list, where the
next boot of this process reclaims it via bulk_requeue.
Priority handling: iterate queues_cmd in order with non-blocking
LMOVE, then fall back to a blocking BLMOVE on the first queue so an
empty poll doesn't spin Redis. BLMOVE has no multi-key form, so
blocking on a single queue is the best Redis gives us. The block
timeout defaults to TIMEOUT (2s) and is overridable per the Pro
super_fetch §3.3 config.fetch_poll_interval knob.
Spec: docs/target/sidekiq-pro.md §3 (super_fetch, §3.3 poll interval), docs/target/sidekiq-free.md §15 (TIMEOUT=2).
Defined Under Namespace
Classes: UnitOfWork
Constant Summary collapse
- TIMEOUT =
Default BLMOVE block timeout; overridable via config.fetch_poll_interval.
2
Instance Attribute Summary collapse
-
#config ⇒ Object
included
from Component
readonly
Returns the value of attribute config.
Class Method Summary collapse
-
.private_queue_name(public_queue, index = 0) ⇒ Object
Class-level so UnitOfWork can compute the private list without carrying a back-reference to its parent fetcher.
Instance Method Summary collapse
-
#bulk_requeue(in_progress) ⇒ Object
Called on shutdown for jobs the Processor couldn't finish in time.
- #default_tag(dir = Dir.pwd) ⇒ Object included from Component
-
#fire_event(event, oneshot: true, reverse: false, reraise: false) ⇒ Object
included
from Component
Invokes lifecycle hooks for
event. - #handle_exception(ex, ctx = {}) ⇒ Object included from Component
- #hostname ⇒ Object included from Component
- #identity ⇒ Object included from Component
-
#initialize(capsule) ⇒ Reliable
constructor
A new instance of Reliable.
-
#leader? ⇒ Boolean
included
from Component
True iff this process currently holds the cluster
dear-leaderlock. -
#logger ⇒ Object
included
from Component
--- delegated to config -------------------------------------------.
- #mono_ms ⇒ Object included from Component
- #process_nonce ⇒ Object included from Component
-
#queues_cmd ⇒ Object
Prefixed queue keys (
queue:<name>) in fetch order. -
#real_ms ⇒ Object
included
from Component
--- clocks ---------------------------------------------------------.
- #redis ⇒ Object included from Component
- #retrieve_work ⇒ Object
-
#safe_thread(name, priority: nil, &block) ⇒ Object
included
from Component
Spawns a named thread that runs
blockunderwatchdog(name). - #terminate ⇒ Object
-
#tid ⇒ Object
included
from Component
--- identity -------------------------------------------------------.
-
#watchdog(last_words) ⇒ Object
included
from Component
Wraps a block at a thread boundary: any unhandled exception is reported via handle_exception (so it lands in error_handlers / the log) and then re-raised.
Constructor Details
#initialize(capsule) ⇒ Reliable
Returns a new instance of Reliable.
62 63 64 65 66 |
# File 'lib/wurk/fetcher/reliable.rb', line 62 def initialize(capsule) super() @config = capsule @done = false end |
Instance Attribute Details
#config ⇒ Object (readonly) Originally defined in module Component
Returns the value of attribute config.
Class Method Details
.private_queue_name(public_queue, index = 0) ⇒ Object
Class-level so UnitOfWork can compute the private list without carrying a back-reference to its parent fetcher. Index defaults to 0 — we run one fetcher per capsule today. Multi-processor topology (one private list per processor slot) is a future Manager concern.
57 58 59 60 |
# File 'lib/wurk/fetcher/reliable.rb', line 57 def self.private_queue_name(public_queue, index = 0) host = ENV['DYNO'] || Socket.gethostname "#{public_queue}|#{host}|#{::Process.pid}|#{index}" end |
Instance Method Details
#bulk_requeue(in_progress) ⇒ Object
Called on shutdown for jobs the Processor couldn't finish in time. One pipelined RPUSH per public queue (head insert) so on next boot they're picked again ahead of fresh enqueues.
84 85 86 87 88 89 90 91 92 93 94 95 |
# File 'lib/wurk/fetcher/reliable.rb', line 84 def bulk_requeue(in_progress) return if in_progress.nil? || in_progress.empty? grouped = in_progress.group_by(&:queue) config.redis do |conn| conn.pipelined do |pipe| grouped.each do |public_q, uows| pipe.call('RPUSH', public_q, *uows.map(&:job)) end end end end |
#default_tag(dir = Dir.pwd) ⇒ Object Originally defined in module Component
#fire_event(event, oneshot: true, reverse: false, reraise: false) ⇒ Object Originally defined in module Component
Invokes lifecycle hooks for event. Hooks run in registration order
(or LIFO when reverse: true, used for teardown). A raise in one hook
is reported via handle_exception and does NOT stop the next hook unless
reraise: true (used in tests / fail-fast boot). oneshot: true
clears the bucket after dispatch so the event can't fire twice.
#handle_exception(ex, ctx = {}) ⇒ Object Originally defined in module Component
#hostname ⇒ Object Originally defined in module Component
#identity ⇒ Object Originally defined in module Component
#leader? ⇒ Boolean Originally defined in module Component
True iff this process currently holds the cluster dear-leader lock.
Per spec, the check is performed at call time (Wurk does not cache);
callers must not poll faster than the 60s follower cadence. Returns
false unconditionally when WURK_LEADER=false (or SIDEKIQ_LEADER=false)
is set on the process (opt-out hot-standby). Any Redis error is swallowed →
false, so a transient partition can't propagate as an exception into user
code.
Spec: docs/target/sidekiq-ent.md §6.1.
#logger ⇒ Object Originally defined in module Component
--- delegated to config -------------------------------------------
#mono_ms ⇒ Object Originally defined in module Component
#process_nonce ⇒ Object Originally defined in module Component
#queues_cmd ⇒ Object
Prefixed queue keys (queue:<name>) in fetch order. Strict mode
preserves declaration order. Random/weighted shuffle each call —
@queues is pre-expanded by weight in Capsule#queues=, so uniform
shuffle yields weighted fairness; .uniq trims duplicates. Paused
queues are filtered after shuffle so the membership test runs on
the smallest possible set.
103 104 105 106 107 108 |
# File 'lib/wurk/fetcher/reliable.rb', line 103 def queues_cmd names = config.mode == :strict ? config.queues : config.queues.shuffle.uniq paused = paused_names names = names.reject { |q| paused.include?(q) } unless paused.empty? names.map { |q| "#{Keys::QUEUE_PREFIX}#{q}" } end |
#real_ms ⇒ Object Originally defined in module Component
--- clocks ---------------------------------------------------------
#redis ⇒ Object Originally defined in module Component
#retrieve_work ⇒ Object
68 69 70 71 72 73 74 75 76 77 78 79 |
# File 'lib/wurk/fetcher/reliable.rb', line 68 def retrieve_work return nil if @done queues = queues_cmd return nil if queues.empty? queues.each do |public_q| uow = lmove(public_q) return uow if uow end blmove(queues.first) end |
#safe_thread(name, priority: nil, &block) ⇒ Object Originally defined in module Component
Spawns a named thread that runs block under watchdog(name). The
parent must retain the returned Thread; otherwise GC may not, but
report_on_exception is disabled so we don't double-log on death.
#terminate ⇒ Object
110 111 112 |
# File 'lib/wurk/fetcher/reliable.rb', line 110 def terminate @done = true end |
#tid ⇒ Object Originally defined in module Component
--- identity -------------------------------------------------------
#watchdog(last_words) ⇒ Object Originally defined in module Component
Wraps a block at a thread boundary: any unhandled exception is reported
via handle_exception (so it lands in error_handlers / the log) and then
re-raised. last_words is the component label included in the context.