Class: Wurk::Batch
- Inherits:
-
Object
- Object
- Wurk::Batch
- Defined in:
- lib/wurk/batch.rb,
lib/wurk/batch_set.rb,
lib/wurk/batch/empty.rb,
lib/wurk/batch/buffer.rb,
lib/wurk/batch/status.rb,
lib/wurk/batch/callbacks.rb,
lib/wurk/batch/callback_job.rb,
lib/wurk/batch/death_handler.rb,
lib/wurk/batch/client_middleware.rb,
lib/wurk/batch/server_middleware.rb
Overview
Sidekiq Pro Batches. Group jobs, attach success/complete/death callbacks, track progress. Spec: docs/target/sidekiq-pro.md §2.
Lifecycle:
1. `Batch.new` allocates a fresh BID; the batch is `mutable?` until the
first `#jobs` block flushes — that flush HSETs the core hash, ZADDs
to `batches`, and writes tag indexes.
2. `Batch.new(bid)` reopens an existing batch (legal from inside a job
or callback only). `mutable?` is false because reopening implies the
first flush already happened.
3. `#jobs { ... }` collects `Job.perform_async` calls via the client
middleware (Thread.current[:wurk_current_batch] is the signal),
atomically registering each via BATCH_PUSH.
4. Workers ack on success → BATCH_ACK_SUCCESS → pending--.
Death handler acks on permanent failure → BATCH_ACK_COMPLETE.
5. When live jids hits zero → fire `:complete`. When pending also
hits zero with zero deaths → fire `:success`.
Nested batches: a job opening its OWN batch (batch.jobs { ... })
increments live counters on the existing batch. A callback opening its
PARENT batch links via parent_bid and adds child BID to b-
Defined Under Namespace
Modules: Callbacks Classes: Buffer, CallbackJob, ClientMiddleware, DeadSet, DeathHandler, Empty, ServerMiddleware, Status
Constant Summary collapse
- DEFAULT_EXPIRY_SECONDS =
30 * 24 * 60 * 60
- POST_SUCCESS_EXPIRY_SECONDS =
24 * 60 * 60
- CALLBACK_NOTIFY_TTL =
30 * 24 * 60 * 60
- BID_BYTES =
Bid is URL-safe base64 of 10 random bytes — matches Sidekiq Pro's BID generator. Length matters: third-party gems that key off bid prefix (sharded batches in Pro 8) inspect the first character.
10- VALID_EVENTS =
%i[success complete death].freeze
- KEY_SUFFIXES =
The 'live' set tracks jobs that have not yet reached a terminal state. When it's empty, every job has either succeeded or died →
:completeis allowed to fire. %w[jids failed died notify cbsucc kids pkids tags].freeze
- THREAD_KEY =
:wurk_current_batch- BUFFER_KEY =
Set on the current thread (to a Buffer) only inside an autoflush
#jobsblock. Client#raw_push reads it: when present, batched pushes accumulate here instead of round-tripping per job. :wurk_batch_buffer
Instance Attribute Summary collapse
-
#autoflush ⇒ Object
Returns the value of attribute autoflush.
-
#bid ⇒ Object
readonly
Returns the value of attribute bid.
-
#callback_class ⇒ Object
Returns the value of attribute callback_class.
-
#callback_queue ⇒ Object
Returns the value of attribute callback_queue.
-
#description ⇒ Object
Returns the value of attribute description.
-
#linger ⇒ Object
Returns the value of attribute linger.
-
#parent_bid ⇒ Object
readonly
Returns the value of attribute parent_bid.
Class Method Summary collapse
Instance Method Summary collapse
- #expires_in(duration) ⇒ Object
- #include?(jid) ⇒ Boolean
-
#initialize(bid = nil) ⇒ Batch
constructor
A new instance of Batch.
-
#invalidate_all ⇒ Object
Mark batch invalid.
-
#jobs(&block) ⇒ Object
Atomic enqueue block.
- #mutable? ⇒ Boolean
-
#on(event, callback, options = {}) ⇒ Object
Register a callback.
- #parent ⇒ Object
-
#remove_jobs(*jids) ⇒ Object
Remove jobs from the batch.
- #status ⇒ Object
- #tags ⇒ Object
-
#tags=(value) ⇒ Object
Hash assignment writes strings — Sidekiq's UI / third-party gems expect String tags.
- #valid? ⇒ Boolean
Constructor Details
#initialize(bid = nil) ⇒ Batch
Returns a new instance of Batch.
71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 |
# File 'lib/wurk/batch.rb', line 71 def initialize(bid = nil) @bid = bid || SecureRandom.urlsafe_base64(BID_BYTES) @existing = !bid.nil? @description = nil @callback_queue = 'default' @callback_class = nil @tags = [] @autoflush = nil @linger = nil @parent_bid = nil @callbacks = [] @expires_in = DEFAULT_EXPIRY_SECONDS @mutable = !@existing @flushed_once = @existing load_existing! if @existing end |
Instance Attribute Details
#autoflush ⇒ Object
Returns the value of attribute autoflush.
64 65 66 |
# File 'lib/wurk/batch.rb', line 64 def autoflush @autoflush end |
#bid ⇒ Object (readonly)
Returns the value of attribute bid.
63 64 65 |
# File 'lib/wurk/batch.rb', line 63 def bid @bid end |
#callback_class ⇒ Object
Returns the value of attribute callback_class.
64 65 66 |
# File 'lib/wurk/batch.rb', line 64 def callback_class @callback_class end |
#callback_queue ⇒ Object
Returns the value of attribute callback_queue.
64 65 66 |
# File 'lib/wurk/batch.rb', line 64 def callback_queue @callback_queue end |
#description ⇒ Object
Returns the value of attribute description.
64 65 66 |
# File 'lib/wurk/batch.rb', line 64 def description @description end |
#linger ⇒ Object
Returns the value of attribute linger.
63 64 65 |
# File 'lib/wurk/batch.rb', line 63 def linger @linger end |
#parent_bid ⇒ Object (readonly)
Returns the value of attribute parent_bid.
63 64 65 |
# File 'lib/wurk/batch.rb', line 63 def parent_bid @parent_bid end |
Class Method Details
.keys_for(bid) ⇒ Object
66 67 68 69 |
# File 'lib/wurk/batch.rb', line 66 def self.keys_for(bid) base = "b-#{bid}" [base, *KEY_SUFFIXES.map { |s| "#{base}-#{s}" }] end |
Instance Method Details
#expires_in(duration) ⇒ Object
156 157 158 159 |
# File 'lib/wurk/batch.rb', line 156 def expires_in(duration) @expires_in = duration.to_i self end |
#include?(jid) ⇒ Boolean
121 122 123 |
# File 'lib/wurk/batch.rb', line 121 def include?(jid) Wurk.redis { |conn| conn.call('SISMEMBER', "b-#{@bid}-jids", jid) }.to_i.positive? end |
#invalidate_all ⇒ Object
Mark batch invalid. Pending jobs still exist in their queues; the
server middleware short-circuits them when it observes the flag.
Cascades to descendant batches via b-
143 144 145 146 |
# File 'lib/wurk/batch.rb', line 143 def invalidate_all cascade_invalidate(@bid) nil end |
#jobs(&block) ⇒ Object
Atomic enqueue block. Inside the block, Job.perform_async finds
this batch via Thread.current and stamps bid onto the
payload — the client middleware then uses BATCH_PUSH to register and
push atomically. Empty blocks synthesise a Batch::Empty no-op so
callbacks still fire.
180 181 182 183 184 185 186 187 188 189 190 191 |
# File 'lib/wurk/batch.rb', line 180 def jobs(&block) raise ArgumentError, 'jobs requires a block' unless block ensure_first_flush! pre_count = job_count collect_jobs(&block) # By the time we check, the buffer (if any) has flushed, so `total` # reflects everything the block pushed — a flat count is reliable. enqueue_empty_marker if job_count == pre_count @mutable = false self end |
#mutable? ⇒ Boolean
117 118 119 |
# File 'lib/wurk/batch.rb', line 117 def mutable? @mutable end |
#on(event, callback, options = {}) ⇒ Object
Register a callback. Multiple callbacks of the same event are allowed.
The callback target may be a Class, "Foo#bar" string spec, or anything
responding to name. options must be JSON-serializable.
164 165 166 167 168 169 170 171 172 173 |
# File 'lib/wurk/batch.rb', line 164 def on(event, callback, = {}) sym = event.to_sym raise ArgumentError, "invalid event #{event.inspect}" unless VALID_EVENTS.include?(sym) raise ArgumentError, 'callback options must be a Hash' unless .is_a?(Hash) entry = [sym.to_s, callback_target(callback), ] @callbacks << entry persist_callback!(entry) if @flushed_once self end |
#parent ⇒ Object
111 112 113 114 115 |
# File 'lib/wurk/batch.rb', line 111 def parent return nil if @parent_bid.nil? || @parent_bid.empty? Batch.new(@parent_bid) end |
#remove_jobs(*jids) ⇒ Object
Remove jobs from the batch. Decrements pending/total by exactly the count of jids actually removed (idempotent for repeated calls).
127 128 129 130 131 132 133 134 135 136 137 138 |
# File 'lib/wurk/batch.rb', line 127 def remove_jobs(*jids) return 0 if jids.empty? Wurk.redis do |conn| removed = conn.call('SREM', "b-#{@bid}-jids", *jids).to_i if removed.positive? conn.call('HINCRBY', "b-#{@bid}", 'pending', -removed) conn.call('HINCRBY', "b-#{@bid}", 'total', -removed) end removed end end |
#tags ⇒ Object
94 95 96 |
# File 'lib/wurk/batch.rb', line 94 def @tags.dup end |
#tags=(value) ⇒ Object
Hash assignment writes strings — Sidekiq's UI / third-party gems expect String tags. Array-coercion lets callers pass a String or Set.
90 91 92 |
# File 'lib/wurk/batch.rb', line 90 def (value) @tags = Array(value).map(&:to_s) end |