Class: Wurk::Client
- Inherits:
-
Object
- Object
- Wurk::Client
- Includes:
- JobUtil
- Defined in:
- lib/wurk/client.rb,
lib/wurk/client/buffered.rb
Overview
Enqueue interface. Pipelined LPUSH / ZADD writes against the canonical
Sidekiq Redis schema — never change keys, JSON shape, or score format here:
wire-compat is sacred. Most apps enqueue through the Worker DSL
(MyJob.perform_async), which routes here; reach for Client directly only to
push a raw job hash or to drive the bulk/scheduled path explicitly.
Spec: docs/target/sidekiq-free.md §7.
Defined Under Namespace
Modules: Buffered
Constant Summary collapse
- DEFAULT_BATCH_SIZE =
Sidekiq mirrors these exactly. Tests against the upstream parity suite depend on the magic numbers, not just behavior.
1_000- SCHEDULED_BATCH_SIZE =
100- SPREAD_INTERVAL_FLOOR =
5
Instance Attribute Summary collapse
-
#redis_pool ⇒ Object
Returns the value of attribute redis_pool.
Class Method Summary collapse
- .enqueue(klass) ⇒ Object
- .enqueue_in(interval, klass) ⇒ Object
- .enqueue_to(queue, klass) ⇒ Object
- .enqueue_to_in(queue, interval, klass) ⇒ Object
- .push(item) ⇒ Object
- .push_bulk(items) ⇒ Object
-
.reliable_push! ⇒ Object
Activate reliable_push! mode globally.
- .reliable_push? ⇒ Boolean
- .reliable_push_buffer ⇒ Object
- .reliable_push_buffer=(value) ⇒ Object
-
.reliable_push_drainer(interval: Buffered::Drainer::DEFAULT_INTERVAL) ⇒ Object
Start an opt-in background drainer thread.
- .reliable_push_drainer_running? ⇒ Boolean
- .reliable_push_drainer_stop! ⇒ Object
- .reliable_push_overflow ⇒ Object
- .reliable_push_overflow=(mode) ⇒ Object
-
.via(pool) ⇒ Object
Thread-local pool override.
Instance Method Summary collapse
-
#cancel!(jid) ⇒ Object
Marks an IterableJob as cancelled.
-
#flush_batched(payloads) ⇒ Object
Flush batched payloads (each carrying a
bid) to Redis in one pipeline. -
#initialize(pool: nil, config: nil, chain: nil) ⇒ Client
constructor
A new instance of Client.
-
#middleware {|copy| ... } ⇒ Object
Returns the chain (or a duplicate when a block is given, matching Sidekiq).
-
#normalize_item(item) ⇒ Object
included
from JobUtil
Validate → merge class/default options → stringify → assign jid & created_at → strip transient keys.
- #now_in_millis ⇒ Object included from JobUtil
-
#push(item) ⇒ String?
Jid; nil when client middleware halts the push.
-
#push_bulk(items) ⇒ Array<String, nil>
Jids in submission order; nil entries mark middleware-halted jobs.
- #validate(item) ⇒ Object included from JobUtil
-
#verify_json(item) ⇒ Object
included
from JobUtil
Walk args; report the first non-JSON-native value according to the configured strict mode.
Constructor Details
#initialize(pool: nil, config: nil, chain: nil) ⇒ Client
Returns a new instance of Client.
31 32 33 34 35 |
# File 'lib/wurk/client.rb', line 31 def initialize(pool: nil, config: nil, chain: nil) @config = config || Wurk.configuration @redis_pool = pool @chain = chain || @config.client_middleware end |
Instance Attribute Details
#redis_pool ⇒ Object
Returns the value of attribute redis_pool.
29 30 31 |
# File 'lib/wurk/client.rb', line 29 def redis_pool @redis_pool end |
Class Method Details
.enqueue(klass) ⇒ Object
99 |
# File 'lib/wurk/client.rb', line 99 def enqueue(klass, *) = klass.perform_async(*) |
.enqueue_in(interval, klass) ⇒ Object
109 110 111 |
# File 'lib/wurk/client.rb', line 109 def enqueue_in(interval, klass, *) klass.perform_in(interval, *) end |
.enqueue_to(queue, klass) ⇒ Object
101 102 103 |
# File 'lib/wurk/client.rb', line 101 def enqueue_to(queue, klass, *) klass.set(queue: queue.to_s).perform_async(*) end |
.enqueue_to_in(queue, interval, klass) ⇒ Object
105 106 107 |
# File 'lib/wurk/client.rb', line 105 def enqueue_to_in(queue, interval, klass, *) klass.set(queue: queue.to_s).perform_in(interval, *) end |
.push(item) ⇒ Object
97 |
# File 'lib/wurk/client.rb', line 97 def push(item) = new.push(item) |
.push_bulk(items) ⇒ Object
98 |
# File 'lib/wurk/client.rb', line 98 def push_bulk(items) = new.push_bulk(items) |
.reliable_push! ⇒ Object
Activate reliable_push! mode globally. Idempotent — call from the top level of an initializer (NOT inside Wurk.configure_*). Spec: docs/target/sidekiq-pro.md §5.
325 326 327 328 |
# File 'lib/wurk/client/buffered.rb', line 325 def reliable_push! # rubocop:disable Naming/PredicateMethod Buffered.install! true end |
.reliable_push? ⇒ Boolean
330 331 332 |
# File 'lib/wurk/client/buffered.rb', line 330 def reliable_push? Buffered.installed? end |
.reliable_push_buffer ⇒ Object
334 335 336 |
# File 'lib/wurk/client/buffered.rb', line 334 def reliable_push_buffer Buffered.buffer_cap end |
.reliable_push_buffer=(value) ⇒ Object
338 339 340 |
# File 'lib/wurk/client/buffered.rb', line 338 def reliable_push_buffer=(value) Buffered.buffer_cap = value end |
.reliable_push_drainer(interval: Buffered::Drainer::DEFAULT_INTERVAL) ⇒ Object
Start an opt-in background drainer thread. Implicitly enables reliable_push! so callers don't have to chain the two. Idempotent; calling again replaces the thread with one at the new interval. Spec for reliable_push (sidekiq-pro.md §5) only requires drain on next push — this is a Wurk extension for issue #19's "Background drain thread flushes on reconnect" so producer-stopped-mid-outage buffers don't sit idle until next push.
357 358 359 360 361 |
# File 'lib/wurk/client/buffered.rb', line 357 def reliable_push_drainer(interval: Buffered::Drainer::DEFAULT_INTERVAL) Buffered.install! Buffered.start_drainer!(interval: interval) true end |
.reliable_push_drainer_running? ⇒ Boolean
367 368 369 |
# File 'lib/wurk/client/buffered.rb', line 367 def reliable_push_drainer_running? Buffered.drainer_running? end |
.reliable_push_drainer_stop! ⇒ Object
363 364 365 |
# File 'lib/wurk/client/buffered.rb', line 363 def reliable_push_drainer_stop! Buffered.stop_drainer! end |
.reliable_push_overflow ⇒ Object
342 343 344 |
# File 'lib/wurk/client/buffered.rb', line 342 def reliable_push_overflow Buffered.overflow_mode end |
.reliable_push_overflow=(mode) ⇒ Object
346 347 348 |
# File 'lib/wurk/client/buffered.rb', line 346 def reliable_push_overflow=(mode) Buffered.overflow_mode = mode end |
.via(pool) ⇒ Object
Thread-local pool override. Re-entrant calls are rejected — Sidekiq
raises here too, because nested via would silently shadow. The
begin/ensure guards the slot so a raise on entry doesn't clear the
outer caller's pool.
117 118 119 120 121 122 123 124 125 126 127 |
# File 'lib/wurk/client.rb', line 117 def via(pool) raise ArgumentError, 'pool is required' if pool.nil? raise 'Wurk::Client.via is not re-entrant' if Thread.current[:wurk_via_pool] Thread.current[:wurk_via_pool] = pool begin yield ensure Thread.current[:wurk_via_pool] = nil end end |
Instance Method Details
#cancel!(jid) ⇒ Object
Marks an IterableJob as cancelled. Returns the Unix epoch timestamp written. Field name + epoch-second value mirror Sidekiq::IterableJob#cancel! exactly. TTL = CANCELLATION_PERIOD so other workers observe the flag well after the dashboard click that issued the cancel.
76 77 78 79 80 81 82 83 84 85 |
# File 'lib/wurk/client.rb', line 76 def cancel!(jid) raise ArgumentError, 'jid must be a non-empty String' if jid.nil? || jid.to_s.empty? ts = ::Process.clock_gettime(::Process::CLOCK_REALTIME).to_i pool.with do |conn| conn.call('HSET', "it-#{jid}", 'cancelled', ts) conn.call('EXPIRE', "it-#{jid}", Wurk::IterableJob::CANCELLATION_PERIOD) end ts end |
#flush_batched(payloads) ⇒ Object
Flush batched payloads (each carrying a bid) to Redis in one pipeline.
Public entry point for Wurk::Batch's autoflush buffer — see #push_batched
for the per-job BATCH_PUSH semantics it reuses.
90 91 92 93 94 |
# File 'lib/wurk/client.rb', line 90 def flush_batched(payloads) return if payloads.empty? pool.with { |conn| push_batched_pipelined(conn, payloads, now_in_millis) } end |
#middleware {|copy| ... } ⇒ Object
Returns the chain (or a duplicate when a block is given, matching Sidekiq).
38 39 40 41 42 43 44 |
# File 'lib/wurk/client.rb', line 38 def middleware return @chain unless block_given? copy = @chain.dup yield copy copy end |
#normalize_item(item) ⇒ Object Originally defined in module JobUtil
Validate → merge class/default options → stringify → assign jid & created_at → strip transient keys. Returns the canonical payload.
#now_in_millis ⇒ Object Originally defined in module JobUtil
#push(item) ⇒ String?
Returns jid; nil when client middleware halts the push.
48 49 50 51 52 53 54 55 56 57 |
# File 'lib/wurk/client.rb', line 48 def push(item) normed = normalize_item(item) payload = invoke_chain(normed) return nil unless payload verify_json(payload) raw_push([payload]) emit_enqueued([payload]) payload['jid'] end |
#push_bulk(items) ⇒ Array<String, nil>
Returns jids in submission order; nil entries mark middleware-halted jobs.
61 62 63 64 65 66 67 68 69 70 |
# File 'lib/wurk/client.rb', line 61 def push_bulk(items) args = items['args'] || items[:args] validate_bulk_shape!(items, args) return [] if args.empty? at_values = (items, args.size) batch_sz = items['batch_size'] || items[:batch_size] || (at_values ? SCHEDULED_BATCH_SIZE : DEFAULT_BATCH_SIZE) base = bulk_base(items) flush_bulk(args, at_values, base, batch_sz) end |
#validate(item) ⇒ Object Originally defined in module JobUtil
#verify_json(item) ⇒ Object Originally defined in module JobUtil
Walk args; report the first non-JSON-native value according to the configured strict mode. Hash keys must be Strings.