Class: Wurk::Batch

Inherits:
Object
  • Object
show all
Defined in:
lib/wurk/batch.rb,
lib/wurk/batch_set.rb,
lib/wurk/batch/empty.rb,
lib/wurk/batch/buffer.rb,
lib/wurk/batch/status.rb,
lib/wurk/batch/callbacks.rb,
lib/wurk/batch/callback_job.rb,
lib/wurk/batch/death_handler.rb,
lib/wurk/batch/client_middleware.rb,
lib/wurk/batch/server_middleware.rb

Overview

Sidekiq Pro Batches. Group jobs, attach success/complete/death callbacks, track progress. Spec: docs/target/sidekiq-pro.md §2.

Lifecycle:

1. `Batch.new` allocates a fresh BID; the batch is `mutable?` until the
 first `#jobs` block flushes — that flush HSETs the core hash, ZADDs
 to `batches`, and writes tag indexes.
2. `Batch.new(bid)` reopens an existing batch (legal from inside a job
 or callback only). `mutable?` is false because reopening implies the
 first flush already happened.
3. `#jobs { ... }` collects `Job.perform_async` calls via the client
 middleware (Thread.current[:wurk_current_batch] is the signal),
 atomically registering each via BATCH_PUSH.
4. Workers ack on success → BATCH_ACK_SUCCESS → pending--.
 Death handler acks on permanent failure → BATCH_ACK_COMPLETE.
5. When live jids hits zero → fire `:complete`. When pending also
 hits zero with zero deaths → fire `:success`.

Nested batches: a job opening its OWN batch (batch.jobs { ... }) increments live counters on the existing batch. A callback opening its PARENT batch links via parent_bid and adds child BID to b--kids.

Examples:

Define a batch with a success callback

batch = Sidekiq::Batch.new
batch.description = "Nightly import"
batch.on(:success, ImportCallback, "user_id" => user.id)
batch.jobs do
  rows.each { |r| ImportRowJob.perform_async(r.id) }
end
batch.bid   # => the batch id, persisted in Redis

Defined Under Namespace

Modules: Callbacks Classes: Buffer, CallbackJob, ClientMiddleware, DeadSet, DeathHandler, Empty, ServerMiddleware, Status

Constant Summary collapse

DEFAULT_EXPIRY_SECONDS =
30 * 24 * 60 * 60
POST_SUCCESS_EXPIRY_SECONDS =
24 * 60 * 60
CALLBACK_NOTIFY_TTL =
30 * 24 * 60 * 60
BID_BYTES =

Bid is URL-safe base64 of 10 random bytes — matches Sidekiq Pro's BID generator. Length matters: third-party gems that key off bid prefix (sharded batches in Pro 8) inspect the first character.

10
VALID_EVENTS =
%i[success complete death].freeze
KEY_SUFFIXES =

The 'live' set tracks jobs that have not yet reached a terminal state. When it's empty, every job has either succeeded or died → :complete is allowed to fire.

%w[jids failed died notify cbsucc kids pkids tags].freeze
THREAD_KEY =
:wurk_current_batch
BUFFER_KEY =

Set on the current thread (to a Buffer) only inside an autoflush #jobs block. Client#raw_push reads it: when present, batched pushes accumulate here instead of round-tripping per job.

:wurk_batch_buffer

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(bid = nil) ⇒ Batch

Returns a new instance of Batch.



71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
# File 'lib/wurk/batch.rb', line 71

def initialize(bid = nil)
  @bid              = bid || SecureRandom.urlsafe_base64(BID_BYTES)
  @existing         = !bid.nil?
  @description      = nil
  @callback_queue   = 'default'
  @callback_class   = nil
  @tags             = []
  @autoflush        = nil
  @linger           = nil
  @parent_bid       = nil
  @callbacks        = []
  @expires_in       = DEFAULT_EXPIRY_SECONDS
  @mutable          = !@existing
  @flushed_once     = @existing
  load_existing! if @existing
end

Instance Attribute Details

#autoflushObject

Returns the value of attribute autoflush.



64
65
66
# File 'lib/wurk/batch.rb', line 64

def autoflush
  @autoflush
end

#bidObject (readonly)

Returns the value of attribute bid.



63
64
65
# File 'lib/wurk/batch.rb', line 63

def bid
  @bid
end

#callback_classObject

Returns the value of attribute callback_class.



64
65
66
# File 'lib/wurk/batch.rb', line 64

def callback_class
  @callback_class
end

#callback_queueObject

Returns the value of attribute callback_queue.



64
65
66
# File 'lib/wurk/batch.rb', line 64

def callback_queue
  @callback_queue
end

#descriptionObject

Returns the value of attribute description.



64
65
66
# File 'lib/wurk/batch.rb', line 64

def description
  @description
end

#lingerObject

Returns the value of attribute linger.



63
64
65
# File 'lib/wurk/batch.rb', line 63

def linger
  @linger
end

#parent_bidObject (readonly)

Returns the value of attribute parent_bid.



63
64
65
# File 'lib/wurk/batch.rb', line 63

def parent_bid
  @parent_bid
end

Class Method Details

.keys_for(bid) ⇒ Object



66
67
68
69
# File 'lib/wurk/batch.rb', line 66

def self.keys_for(bid)
  base = "b-#{bid}"
  [base, *KEY_SUFFIXES.map { |s| "#{base}-#{s}" }]
end

Instance Method Details

#expires_in(duration) ⇒ Object



156
157
158
159
# File 'lib/wurk/batch.rb', line 156

def expires_in(duration)
  @expires_in = duration.to_i
  self
end

#include?(jid) ⇒ Boolean

Returns:

  • (Boolean)


121
122
123
# File 'lib/wurk/batch.rb', line 121

def include?(jid)
  Wurk.redis { |conn| conn.call('SISMEMBER', "b-#{@bid}-jids", jid) }.to_i.positive?
end

#invalidate_allObject

Mark batch invalid. Pending jobs still exist in their queues; the server middleware short-circuits them when it observes the flag. Cascades to descendant batches via b--kids.



143
144
145
146
# File 'lib/wurk/batch.rb', line 143

def invalidate_all
  cascade_invalidate(@bid)
  nil
end

#jobs(&block) ⇒ Object

Atomic enqueue block. Inside the block, Job.perform_async finds this batch via Thread.current and stamps bid onto the payload — the client middleware then uses BATCH_PUSH to register and push atomically. Empty blocks synthesise a Batch::Empty no-op so callbacks still fire.

Raises:

  • (ArgumentError)


180
181
182
183
184
185
186
187
188
189
190
191
# File 'lib/wurk/batch.rb', line 180

def jobs(&block)
  raise ArgumentError, 'jobs requires a block' unless block

  ensure_first_flush!
  pre_count = job_count
  collect_jobs(&block)
  # By the time we check, the buffer (if any) has flushed, so `total`
  # reflects everything the block pushed — a flat count is reliable.
  enqueue_empty_marker if job_count == pre_count
  @mutable = false
  self
end

#mutable?Boolean

Returns:

  • (Boolean)


117
118
119
# File 'lib/wurk/batch.rb', line 117

def mutable?
  @mutable
end

#on(event, callback, options = {}) ⇒ Object

Register a callback. Multiple callbacks of the same event are allowed. The callback target may be a Class, "Foo#bar" string spec, or anything responding to name. options must be JSON-serializable.

Raises:

  • (ArgumentError)


164
165
166
167
168
169
170
171
172
173
# File 'lib/wurk/batch.rb', line 164

def on(event, callback, options = {})
  sym = event.to_sym
  raise ArgumentError, "invalid event #{event.inspect}" unless VALID_EVENTS.include?(sym)
  raise ArgumentError, 'callback options must be a Hash' unless options.is_a?(Hash)

  entry = [sym.to_s, callback_target(callback), options]
  @callbacks << entry
  persist_callback!(entry) if @flushed_once
  self
end

#parentObject



111
112
113
114
115
# File 'lib/wurk/batch.rb', line 111

def parent
  return nil if @parent_bid.nil? || @parent_bid.empty?

  Batch.new(@parent_bid)
end

#remove_jobs(*jids) ⇒ Object

Remove jobs from the batch. Decrements pending/total by exactly the count of jids actually removed (idempotent for repeated calls).



127
128
129
130
131
132
133
134
135
136
137
138
# File 'lib/wurk/batch.rb', line 127

def remove_jobs(*jids)
  return 0 if jids.empty?

  Wurk.redis do |conn|
    removed = conn.call('SREM', "b-#{@bid}-jids", *jids).to_i
    if removed.positive?
      conn.call('HINCRBY', "b-#{@bid}", 'pending', -removed)
      conn.call('HINCRBY', "b-#{@bid}", 'total', -removed)
    end
    removed
  end
end

#statusObject



152
153
154
# File 'lib/wurk/batch.rb', line 152

def status
  Status.new(@bid)
end

#tagsObject



94
95
96
# File 'lib/wurk/batch.rb', line 94

def tags
  @tags.dup
end

#tags=(value) ⇒ Object

Hash assignment writes strings — Sidekiq's UI / third-party gems expect String tags. Array-coercion lets callers pass a String or Set.



90
91
92
# File 'lib/wurk/batch.rb', line 90

def tags=(value)
  @tags = Array(value).map(&:to_s)
end

#valid?Boolean

Returns:

  • (Boolean)


148
149
150
# File 'lib/wurk/batch.rb', line 148

def valid?
  Wurk.redis { |conn| conn.call('HGET', "b-#{@bid}", 'invalidated') } != '1'
end