Module: Wurk::Worker::ClassMethods

Defined in:
lib/wurk/worker.rb

Overview

Class-level DSL mixed into every job class by Wurk::Worker. These are the public enqueue and configuration entry points.

Instance Method Summary collapse

Instance Method Details

#build_client(pool = get_sidekiq_options['pool'], client_class: nil) ⇒ Object

client_class swaps the enqueue client (e.g. TransactionAwareClient via Wurk.transactional_push!). Resolution order: per-call set(client_class:), then the class option, then the live process default, then Wurk::Client. The default_job_options fallback keeps a global transactional_push! order-independent: a class whose options memoized before the opt-in (its inherited copy is a stale dup) still routes through the new client.



194
195
196
197
198
# File 'lib/wurk/worker.rb', line 194

def build_client(pool = get_sidekiq_options['pool'], client_class: nil)
  klass = client_class || get_sidekiq_options['client_class'] ||
          Wurk.default_job_options['client_class'] || Wurk::Client
  klass.new(pool: pool)
end

#clearObject



213
214
215
# File 'lib/wurk/worker.rb', line 213

def clear
  ::Wurk::Queues.clear_class(to_s)
end

#client_push(item) ⇒ Object

Raises:

  • (ArgumentError)


178
179
180
181
182
183
184
185
186
# File 'lib/wurk/worker.rb', line 178

def client_push(item)
  raise ArgumentError, "Job arguments to #{name || self} must have string keys" if symbol_keyed?(item)

  # `pool` is a transient enqueue-time attribute: a per-call `set(pool:)`
  # overrides the class-level option, then it's deleted so it never reaches
  # the wire (normalize_item strips any class-level pool re-merged below).
  pool = item.delete('pool') || get_sidekiq_options['pool']
  build_client(pool, client_class: item.delete('client_class')).push(item)
end

#delayObject Also known as: delay_for, delay_until

Raises:

  • (ArgumentError)


254
255
256
# File 'lib/wurk/worker.rb', line 254

def delay(*)
  raise ArgumentError, "#{name || self}.delay is removed in Sidekiq 7+. Use #{name || 'klass'}.perform_async."
end

#drainObject

Run & remove every fake job for this class — including ones it enqueues mid-drain. Returns the count processed.



219
220
221
222
223
224
225
226
# File 'lib/wurk/worker.rb', line 219

def drain
  count = 0
  while (job = ::Wurk::Queues.shift_class(to_s))
    process_job(job)
    count += 1
  end
  count
end

#execute_job(worker, args) ⇒ Object



250
251
252
# File 'lib/wurk/worker.rb', line 250

def execute_job(worker, args)
  worker.perform(*args)
end

#get_sidekiq_optionsObject

Sidekiq's public API name — wire-compat sacred. Must stay get_sidekiq_options.



103
104
105
# File 'lib/wurk/worker.rb', line 103

def get_sidekiq_options # rubocop:disable Naming/AccessorMethodName
  @sidekiq_options_hash ||= inherited_sidekiq_options # rubocop:disable Naming/MemoizedInstanceVariableName
end

#inherited(subclass) ⇒ Object



260
261
262
263
264
265
# File 'lib/wurk/worker.rb', line 260

def inherited(subclass)
  super
  subclass.instance_variable_set(:@sidekiq_options_hash, get_sidekiq_options.dup)
  inherit_block(subclass, :@sidekiq_retry_in_block)
  inherit_block(subclass, :@sidekiq_retries_exhausted_block)
end

#jobsObject

Fake jobs enqueued for this class, across every queue.



209
210
211
# File 'lib/wurk/worker.rb', line 209

def jobs
  ::Wurk::Queues.jobs_by_class[to_s] || []
end

#perform_asyncString?

Enqueue the job to run as soon as a worker is free. Arguments are forwarded to #perform and must be JSON-serializable (string/number/bool/nil/array/hash).

Examples:

EmailJob.perform_async(user.id, "welcome")

Returns:

  • (String, nil)

    the job id (jid), or nil if a client middleware halted the push



131
132
133
# File 'lib/wurk/worker.rb', line 131

def perform_async(*)
  Wurk::Worker::Setter.new(self, {}).perform_async(*)
end

#perform_bulk(items) ⇒ Array<String>

Enqueue many jobs in one round-trip via the Lua bulk path.

Examples:

ImportJob.perform_bulk([[1], [2], [3]])

Parameters:

  • items (Array<Array>)

    one args array per job

Returns:

  • (Array<String>)

    the job ids, in order



163
164
165
# File 'lib/wurk/worker.rb', line 163

def perform_bulk(items, **)
  Wurk::Worker::Setter.new(self, {}).perform_bulk(items, **)
end

#perform_in(interval) ⇒ String? Also known as: perform_at

Schedule the job for later. perform_at is an alias taking an absolute time; perform_in takes a relative interval.

Examples:

ReminderJob.perform_in(1.hour, lead.id)
ReminderJob.perform_at(Time.now + 3600, lead.id)

Parameters:

  • interval (Numeric, Time)

    seconds-from-now, or an absolute Time

Returns:

  • (String, nil)

    the job id (jid)



152
153
154
# File 'lib/wurk/worker.rb', line 152

def perform_in(interval, *)
  Wurk::Worker::Setter.new(self, {}).perform_in(interval, *)
end

#perform_inlineObject Also known as: perform_sync

Run #perform synchronously in the current thread (no Redis). Useful in tests and for inline execution.

Returns:

  • (Object)

    the return value of #perform



139
140
141
# File 'lib/wurk/worker.rb', line 139

def perform_inline(*)
  new.perform(*)
end

#perform_oneObject

Run & remove the first fake job for this class; EmptyQueueError if none.



229
230
231
232
233
234
# File 'lib/wurk/worker.rb', line 229

def perform_one
  job = ::Wurk::Queues.shift_class(to_s)
  raise ::Wurk::Testing::EmptyQueueError, "no #{self} jobs were found" if job.nil?

  process_job(job)
end

#process_job(job_hash) ⇒ Object

Execute a normalized job hash through the inline server-middleware chain (empty by default — see Wurk::Testing.server_middleware). Returns the value of the server-middleware invoke (i.e. the worker's perform return), matching Sidekiq::Testing — so perform_one yields the job result.



241
242
243
244
245
246
247
248
# File 'lib/wurk/worker.rb', line 241

def process_job(job_hash)
  instance = new
  instance.jid = job_hash['jid']
  instance.bid = job_hash['bid'] if instance.respond_to?(:bid=)
  ::Wurk::Testing.server_middleware.invoke(instance, job_hash, job_hash['queue'] || queue) do
    execute_job(instance, job_hash['args'])
  end
end

#queueObject

--- Sidekiq::Testing class-level helpers (spec §24.3) -------------- Only meaningful in :fake / :inline mode; the in-memory store is empty otherwise.



204
205
206
# File 'lib/wurk/worker.rb', line 204

def queue
  get_sidekiq_options['queue']
end

#queue_as(queue) ⇒ Object



111
112
113
# File 'lib/wurk/worker.rb', line 111

def queue_as(queue)
  sidekiq_options('queue' => queue.to_s)
end

#set(opts) ⇒ Wurk::Worker::Setter

Return a per-call option carrier so a single enqueue can override class-level options (queue, scheduling, pool, …).

Examples:

ReportJob.set(queue: "low").perform_async(.id)

Parameters:

  • opts (Hash)

    per-call overrides

Returns:



174
175
176
# File 'lib/wurk/worker.rb', line 174

def set(opts)
  Wurk::Worker::Setter.new(self, opts)
end

#sidekiq_options(opts = {}) ⇒ Hash

Set per-class job options (merged over any inherited options).

Examples:

sidekiq_options queue: "mailers", retry: 3, unique_for: 10.minutes

Parameters:

  • opts (Hash) (defaults to: {})

    any of queue:, retry:, dead:, backtrace:, expires_in:, tags:, pool:, unique_for:, … (see the migration guide's sidekiq_options table for the full set)

Returns:

  • (Hash)

    the merged, string-keyed options hash



97
98
99
100
# File 'lib/wurk/worker.rb', line 97

def sidekiq_options(opts = {})
  merged = get_sidekiq_options.merge(opts.transform_keys(&:to_s))
  @sidekiq_options_hash = merged
end

#sidekiq_options_hashObject



107
108
109
# File 'lib/wurk/worker.rb', line 107

def sidekiq_options_hash
  get_sidekiq_options
end

#sidekiq_retries_exhausted(&block) ⇒ Object



119
120
121
# File 'lib/wurk/worker.rb', line 119

def sidekiq_retries_exhausted(&block)
  self.sidekiq_retries_exhausted_block = block
end

#sidekiq_retry_in(&block) ⇒ Object



115
116
117
# File 'lib/wurk/worker.rb', line 115

def sidekiq_retry_in(&block)
  self.sidekiq_retry_in_block = block
end