Module: Wurk::Worker::ClassMethods
- Defined in:
- lib/wurk/worker.rb
Overview
Class-level DSL mixed into every job class by Wurk::Worker. These are the public enqueue and configuration entry points.
Instance Method Summary collapse
-
#build_client(pool = get_sidekiq_options['pool'], client_class: nil) ⇒ Object
client_classswaps the enqueue client (e.g. TransactionAwareClient via Wurk.transactional_push!). - #clear ⇒ Object
- #client_push(item) ⇒ Object
- #delay ⇒ Object (also: #delay_for, #delay_until)
-
#drain ⇒ Object
Run & remove every fake job for this class — including ones it enqueues mid-drain.
- #execute_job(worker, args) ⇒ Object
-
#get_sidekiq_options ⇒ Object
Sidekiq's public API name — wire-compat sacred.
- #inherited(subclass) ⇒ Object
-
#jobs ⇒ Object
Fake jobs enqueued for this class, across every queue.
-
#perform_async ⇒ String?
Enqueue the job to run as soon as a worker is free.
-
#perform_bulk(items) ⇒ Array<String>
Enqueue many jobs in one round-trip via the Lua bulk path.
-
#perform_in(interval) ⇒ String?
(also: #perform_at)
Schedule the job for later.
-
#perform_inline ⇒ Object
(also: #perform_sync)
Run
#performsynchronously in the current thread (no Redis). -
#perform_one ⇒ Object
Run & remove the first fake job for this class; EmptyQueueError if none.
-
#process_job(job_hash) ⇒ Object
Execute a normalized job hash through the inline server-middleware chain (empty by default — see Wurk::Testing.server_middleware).
-
#queue ⇒ Object
--- Sidekiq::Testing class-level helpers (spec §24.3) -------------- Only meaningful in :fake / :inline mode; the in-memory store is empty otherwise.
- #queue_as(queue) ⇒ Object
-
#set(opts) ⇒ Wurk::Worker::Setter
Return a per-call option carrier so a single enqueue can override class-level options (queue, scheduling, pool, …).
-
#sidekiq_options(opts = {}) ⇒ Hash
Set per-class job options (merged over any inherited options).
- #sidekiq_options_hash ⇒ Object
- #sidekiq_retries_exhausted(&block) ⇒ Object
- #sidekiq_retry_in(&block) ⇒ Object
Instance Method Details
#build_client(pool = get_sidekiq_options['pool'], client_class: nil) ⇒ Object
client_class swaps the enqueue client (e.g. TransactionAwareClient via
Wurk.transactional_push!). Resolution order: per-call set(client_class:),
then the class option, then the live process default, then Wurk::Client.
The default_job_options fallback keeps a global transactional_push!
order-independent: a class whose options memoized before the opt-in (its
inherited copy is a stale dup) still routes through the new client.
194 195 196 197 198 |
# File 'lib/wurk/worker.rb', line 194 def build_client(pool = ['pool'], client_class: nil) klass = client_class || ['client_class'] || Wurk.['client_class'] || Wurk::Client klass.new(pool: pool) end |
#clear ⇒ Object
213 214 215 |
# File 'lib/wurk/worker.rb', line 213 def clear ::Wurk::Queues.clear_class(to_s) end |
#client_push(item) ⇒ Object
178 179 180 181 182 183 184 185 186 |
# File 'lib/wurk/worker.rb', line 178 def client_push(item) raise ArgumentError, "Job arguments to #{name || self} must have string keys" if symbol_keyed?(item) # `pool` is a transient enqueue-time attribute: a per-call `set(pool:)` # overrides the class-level option, then it's deleted so it never reaches # the wire (normalize_item strips any class-level pool re-merged below). pool = item.delete('pool') || ['pool'] build_client(pool, client_class: item.delete('client_class')).push(item) end |
#delay ⇒ Object Also known as: delay_for, delay_until
254 255 256 |
# File 'lib/wurk/worker.rb', line 254 def delay(*) raise ArgumentError, "#{name || self}.delay is removed in Sidekiq 7+. Use #{name || 'klass'}.perform_async." end |
#drain ⇒ Object
Run & remove every fake job for this class — including ones it enqueues mid-drain. Returns the count processed.
219 220 221 222 223 224 225 226 |
# File 'lib/wurk/worker.rb', line 219 def drain count = 0 while (job = ::Wurk::Queues.shift_class(to_s)) process_job(job) count += 1 end count end |
#execute_job(worker, args) ⇒ Object
250 251 252 |
# File 'lib/wurk/worker.rb', line 250 def execute_job(worker, args) worker.perform(*args) end |
#get_sidekiq_options ⇒ Object
Sidekiq's public API name — wire-compat sacred. Must stay get_sidekiq_options.
103 104 105 |
# File 'lib/wurk/worker.rb', line 103 def # rubocop:disable Naming/AccessorMethodName @sidekiq_options_hash ||= # rubocop:disable Naming/MemoizedInstanceVariableName end |
#inherited(subclass) ⇒ Object
260 261 262 263 264 265 |
# File 'lib/wurk/worker.rb', line 260 def inherited(subclass) super subclass.instance_variable_set(:@sidekiq_options_hash, .dup) inherit_block(subclass, :@sidekiq_retry_in_block) inherit_block(subclass, :@sidekiq_retries_exhausted_block) end |
#jobs ⇒ Object
Fake jobs enqueued for this class, across every queue.
209 210 211 |
# File 'lib/wurk/worker.rb', line 209 def jobs ::Wurk::Queues.jobs_by_class[to_s] || [] end |
#perform_async ⇒ String?
Enqueue the job to run as soon as a worker is free. Arguments are
forwarded to #perform and must be JSON-serializable
(string/number/bool/nil/array/hash).
131 132 133 |
# File 'lib/wurk/worker.rb', line 131 def perform_async(*) Wurk::Worker::Setter.new(self, {}).perform_async(*) end |
#perform_bulk(items) ⇒ Array<String>
Enqueue many jobs in one round-trip via the Lua bulk path.
163 164 165 |
# File 'lib/wurk/worker.rb', line 163 def perform_bulk(items, **) Wurk::Worker::Setter.new(self, {}).perform_bulk(items, **) end |
#perform_in(interval) ⇒ String? Also known as: perform_at
Schedule the job for later. perform_at is an alias taking an absolute
time; perform_in takes a relative interval.
152 153 154 |
# File 'lib/wurk/worker.rb', line 152 def perform_in(interval, *) Wurk::Worker::Setter.new(self, {}).perform_in(interval, *) end |
#perform_inline ⇒ Object Also known as: perform_sync
Run #perform synchronously in the current thread (no Redis). Useful in
tests and for inline execution.
139 140 141 |
# File 'lib/wurk/worker.rb', line 139 def perform_inline(*) new.perform(*) end |
#perform_one ⇒ Object
Run & remove the first fake job for this class; EmptyQueueError if none.
229 230 231 232 233 234 |
# File 'lib/wurk/worker.rb', line 229 def perform_one job = ::Wurk::Queues.shift_class(to_s) raise ::Wurk::Testing::EmptyQueueError, "no #{self} jobs were found" if job.nil? process_job(job) end |
#process_job(job_hash) ⇒ Object
Execute a normalized job hash through the inline server-middleware chain
(empty by default — see Wurk::Testing.server_middleware).
Returns the value of the server-middleware invoke (i.e. the worker's
perform return), matching Sidekiq::Testing — so perform_one yields
the job result.
241 242 243 244 245 246 247 248 |
# File 'lib/wurk/worker.rb', line 241 def process_job(job_hash) instance = new instance.jid = job_hash['jid'] instance.bid = job_hash['bid'] if instance.respond_to?(:bid=) ::Wurk::Testing.server_middleware.invoke(instance, job_hash, job_hash['queue'] || queue) do execute_job(instance, job_hash['args']) end end |
#queue ⇒ Object
--- Sidekiq::Testing class-level helpers (spec §24.3) -------------- Only meaningful in :fake / :inline mode; the in-memory store is empty otherwise.
204 205 206 |
# File 'lib/wurk/worker.rb', line 204 def queue ['queue'] end |
#queue_as(queue) ⇒ Object
111 112 113 |
# File 'lib/wurk/worker.rb', line 111 def queue_as(queue) ('queue' => queue.to_s) end |
#set(opts) ⇒ Wurk::Worker::Setter
Return a per-call option carrier so a single enqueue can override class-level options (queue, scheduling, pool, …).
174 175 176 |
# File 'lib/wurk/worker.rb', line 174 def set(opts) Wurk::Worker::Setter.new(self, opts) end |
#sidekiq_options(opts = {}) ⇒ Hash
Set per-class job options (merged over any inherited options).
97 98 99 100 |
# File 'lib/wurk/worker.rb', line 97 def (opts = {}) merged = .merge(opts.transform_keys(&:to_s)) @sidekiq_options_hash = merged end |
#sidekiq_options_hash ⇒ Object
107 108 109 |
# File 'lib/wurk/worker.rb', line 107 def end |
#sidekiq_retries_exhausted(&block) ⇒ Object
119 120 121 |
# File 'lib/wurk/worker.rb', line 119 def sidekiq_retries_exhausted(&block) self.sidekiq_retries_exhausted_block = block end |
#sidekiq_retry_in(&block) ⇒ Object
115 116 117 |
# File 'lib/wurk/worker.rb', line 115 def sidekiq_retry_in(&block) self.sidekiq_retry_in_block = block end |