Class: Wurk::Client

Inherits:
Object
  • Object
show all
Includes:
JobUtil
Defined in:
lib/wurk/client.rb,
lib/wurk/client/buffered.rb

Overview

Enqueue interface. Pipelined LPUSH / ZADD writes against the canonical Sidekiq Redis schema — never change keys, JSON shape, or score format here: wire-compat is sacred. Most apps enqueue through the Worker DSL (MyJob.perform_async), which routes here; reach for Client directly only to push a raw job hash or to drive the bulk/scheduled path explicitly.

Spec: docs/target/sidekiq-free.md §7.

Examples:

Push a raw job hash

Wurk::Client.new.push("class" => "MyJob", "args" => [1, 2], "queue" => "default")

Bulk enqueue in one round-trip

Wurk::Client.new.push_bulk("class" => "MyJob", "args" => [[1], [2], [3]])

Defined Under Namespace

Modules: Buffered

Constant Summary collapse

DEFAULT_BATCH_SIZE =

Sidekiq mirrors these exactly. Tests against the upstream parity suite depend on the magic numbers, not just behavior.

1_000
SCHEDULED_BATCH_SIZE =
100
SPREAD_INTERVAL_FLOOR =
5

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(pool: nil, config: nil, chain: nil) ⇒ Client

Returns a new instance of Client.



31
32
33
34
35
# File 'lib/wurk/client.rb', line 31

def initialize(pool: nil, config: nil, chain: nil)
  @config     = config || Wurk.configuration
  @redis_pool = pool
  @chain      = chain || @config.client_middleware
end

Instance Attribute Details

#redis_poolObject

Returns the value of attribute redis_pool.



29
30
31
# File 'lib/wurk/client.rb', line 29

def redis_pool
  @redis_pool
end

Class Method Details

.enqueue(klass) ⇒ Object



99
# File 'lib/wurk/client.rb', line 99

def enqueue(klass, *) = klass.perform_async(*)

.enqueue_in(interval, klass) ⇒ Object



109
110
111
# File 'lib/wurk/client.rb', line 109

def enqueue_in(interval, klass, *)
  klass.perform_in(interval, *)
end

.enqueue_to(queue, klass) ⇒ Object



101
102
103
# File 'lib/wurk/client.rb', line 101

def enqueue_to(queue, klass, *)
  klass.set(queue: queue.to_s).perform_async(*)
end

.enqueue_to_in(queue, interval, klass) ⇒ Object



105
106
107
# File 'lib/wurk/client.rb', line 105

def enqueue_to_in(queue, interval, klass, *)
  klass.set(queue: queue.to_s).perform_in(interval, *)
end

.push(item) ⇒ Object



97
# File 'lib/wurk/client.rb', line 97

def push(item)        = new.push(item)

.push_bulk(items) ⇒ Object



98
# File 'lib/wurk/client.rb', line 98

def push_bulk(items)  = new.push_bulk(items)

.reliable_push!Object

Activate reliable_push! mode globally. Idempotent — call from the top level of an initializer (NOT inside Wurk.configure_*). Spec: docs/target/sidekiq-pro.md §5.



325
326
327
328
# File 'lib/wurk/client/buffered.rb', line 325

def reliable_push! # rubocop:disable Naming/PredicateMethod
  Buffered.install!
  true
end

.reliable_push?Boolean

Returns:

  • (Boolean)


330
331
332
# File 'lib/wurk/client/buffered.rb', line 330

def reliable_push?
  Buffered.installed?
end

.reliable_push_bufferObject



334
335
336
# File 'lib/wurk/client/buffered.rb', line 334

def reliable_push_buffer
  Buffered.buffer_cap
end

.reliable_push_buffer=(value) ⇒ Object



338
339
340
# File 'lib/wurk/client/buffered.rb', line 338

def reliable_push_buffer=(value)
  Buffered.buffer_cap = value
end

.reliable_push_drainer(interval: Buffered::Drainer::DEFAULT_INTERVAL) ⇒ Object

Start an opt-in background drainer thread. Implicitly enables reliable_push! so callers don't have to chain the two. Idempotent; calling again replaces the thread with one at the new interval. Spec for reliable_push (sidekiq-pro.md §5) only requires drain on next push — this is a Wurk extension for issue #19's "Background drain thread flushes on reconnect" so producer-stopped-mid-outage buffers don't sit idle until next push.



357
358
359
360
361
# File 'lib/wurk/client/buffered.rb', line 357

def reliable_push_drainer(interval: Buffered::Drainer::DEFAULT_INTERVAL)
  Buffered.install!
  Buffered.start_drainer!(interval: interval)
  true
end

.reliable_push_drainer_running?Boolean

Returns:

  • (Boolean)


367
368
369
# File 'lib/wurk/client/buffered.rb', line 367

def reliable_push_drainer_running?
  Buffered.drainer_running?
end

.reliable_push_drainer_stop!Object



363
364
365
# File 'lib/wurk/client/buffered.rb', line 363

def reliable_push_drainer_stop!
  Buffered.stop_drainer!
end

.reliable_push_overflowObject



342
343
344
# File 'lib/wurk/client/buffered.rb', line 342

def reliable_push_overflow
  Buffered.overflow_mode
end

.reliable_push_overflow=(mode) ⇒ Object



346
347
348
# File 'lib/wurk/client/buffered.rb', line 346

def reliable_push_overflow=(mode)
  Buffered.overflow_mode = mode
end

.via(pool) ⇒ Object

Thread-local pool override. Re-entrant calls are rejected — Sidekiq raises here too, because nested via would silently shadow. The begin/ensure guards the slot so a raise on entry doesn't clear the outer caller's pool.

Raises:

  • (ArgumentError)


117
118
119
120
121
122
123
124
125
126
127
# File 'lib/wurk/client.rb', line 117

def via(pool)
  raise ArgumentError, 'pool is required' if pool.nil?
  raise 'Wurk::Client.via is not re-entrant' if Thread.current[:wurk_via_pool]

  Thread.current[:wurk_via_pool] = pool
  begin
    yield
  ensure
    Thread.current[:wurk_via_pool] = nil
  end
end

Instance Method Details

#cancel!(jid) ⇒ Object

Marks an IterableJob as cancelled. Returns the Unix epoch timestamp written. Field name + epoch-second value mirror Sidekiq::IterableJob#cancel! exactly. TTL = CANCELLATION_PERIOD so other workers observe the flag well after the dashboard click that issued the cancel.

Raises:

  • (ArgumentError)


76
77
78
79
80
81
82
83
84
85
# File 'lib/wurk/client.rb', line 76

def cancel!(jid)
  raise ArgumentError, 'jid must be a non-empty String' if jid.nil? || jid.to_s.empty?

  ts = ::Process.clock_gettime(::Process::CLOCK_REALTIME).to_i
  pool.with do |conn|
    conn.call('HSET', "it-#{jid}", 'cancelled', ts)
    conn.call('EXPIRE', "it-#{jid}", Wurk::IterableJob::CANCELLATION_PERIOD)
  end
  ts
end

#flush_batched(payloads) ⇒ Object

Flush batched payloads (each carrying a bid) to Redis in one pipeline. Public entry point for Wurk::Batch's autoflush buffer — see #push_batched for the per-job BATCH_PUSH semantics it reuses.



90
91
92
93
94
# File 'lib/wurk/client.rb', line 90

def flush_batched(payloads)
  return if payloads.empty?

  pool.with { |conn| push_batched_pipelined(conn, payloads, now_in_millis) }
end

#middleware {|copy| ... } ⇒ Object

Returns the chain (or a duplicate when a block is given, matching Sidekiq).

Yields:

  • (copy)


38
39
40
41
42
43
44
# File 'lib/wurk/client.rb', line 38

def middleware
  return @chain unless block_given?

  copy = @chain.dup
  yield copy
  copy
end

#normalize_item(item) ⇒ Object Originally defined in module JobUtil

Validate → merge class/default options → stringify → assign jid & created_at → strip transient keys. Returns the canonical payload.

#now_in_millisObject Originally defined in module JobUtil

#push(item) ⇒ String?

Returns jid; nil when client middleware halts the push.

Parameters:

  • item (Hash)

    job payload; must carry class and args, may carry at, queue, jid, etc.

Returns:

  • (String, nil)

    jid; nil when client middleware halts the push.



48
49
50
51
52
53
54
55
56
57
# File 'lib/wurk/client.rb', line 48

def push(item)
  normed  = normalize_item(item)
  payload = invoke_chain(normed)
  return nil unless payload

  verify_json(payload)
  raw_push([payload])
  emit_enqueued([payload])
  payload['jid']
end

#push_bulk(items) ⇒ Array<String, nil>

Returns jids in submission order; nil entries mark middleware-halted jobs.

Parameters:

  • items (Hash)

    keys: class, args (Array), at?, spread_interval?, batch_size?, jid?

Returns:

  • (Array<String, nil>)

    jids in submission order; nil entries mark middleware-halted jobs.



61
62
63
64
65
66
67
68
69
70
# File 'lib/wurk/client.rb', line 61

def push_bulk(items)
  args = items['args'] || items[:args]
  validate_bulk_shape!(items, args)
  return [] if args.empty?

  at_values = expand_at(items, args.size)
  batch_sz  = items['batch_size'] || items[:batch_size] || (at_values ? SCHEDULED_BATCH_SIZE : DEFAULT_BATCH_SIZE)
  base      = bulk_base(items)
  flush_bulk(args, at_values, base, batch_sz)
end

#validate(item) ⇒ Object Originally defined in module JobUtil

Raises:

  • (ArgumentError)

    if the payload is structurally invalid.

#verify_json(item) ⇒ Object Originally defined in module JobUtil

Walk args; report the first non-JSON-native value according to the configured strict mode. Hash keys must be Strings.