Class: Wurk::JobSet
Overview
ZSET-of-jobs view. Reverse-paged iteration so callers see newest-first (highest score, i.e. furthest-out retry/schedule). Mutations use ZREM-by-value when the exact bytes are known and a (score, jid) scan otherwise.
Spec: docs/target/sidekiq-free.md §19.5.
Direct Known Subclasses
Instance Method Summary collapse
-
#delete_by_jid(score, jid) ⇒ Object
(also: #delete)
Scan the score bracket for a jid match, ZREM the exact bytes once found.
-
#delete_by_value(name, value) ⇒ Object
ZREM by exact bytes.
-
#each ⇒ Object
Newest-first paged ZRANGE.
-
#fetch(score, jid = nil) ⇒ Object
O(score) lookup.
-
#find_job(jid) ⇒ Object
ZSCAN-based search by jid substring.
-
#kill_all(notify_failure: true, ex: nil) ⇒ Object
Moves every job in this set to the dead set.
-
#pop_each ⇒ Object
ZPOPMIN loop.
-
#remove_job(entry) ⇒ Object
Removes the exact (score, jid)-matching member.
-
#retry_all ⇒ Object
Re-enqueues every job in this set via the client.
-
#schedule(timestamp, message) ⇒ Object
ZADD with NX so re-scheduling the same payload doesn't reset its score.
Constructor Details
This class inherits a constructor from Wurk::SortedSet
Instance Method Details
#delete_by_jid(score, jid) ⇒ Object Also known as: delete
Scan the score bracket for a jid match, ZREM the exact bytes once found.
Returns true on success. Aliased as delete for Sidekiq wire-compat.
Per-row JSON rescue so a single malformed entry can't shadow a valid
match at the same score.
181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 |
# File 'lib/wurk/job_set.rb', line 181 def delete_by_jid(score, jid) # rubocop:disable Naming/PredicateMethod Wurk.redis do |conn| rows = conn.call('ZRANGEBYSCORE', @name, score.to_f, score.to_f) rows.each do |raw| parsed = begin Wurk.load_json(raw) rescue ::JSON::ParserError nil end next unless parsed && parsed['jid'] == jid return conn.call('ZREM', @name, raw).to_i.positive? end end false end |
#delete_by_value(name, value) ⇒ Object
ZREM by exact bytes. Returns true when ≥1 element was removed. Method
name is Sidekiq wire-compat — delete_by_value? would break the alias.
172 173 174 175 |
# File 'lib/wurk/job_set.rb', line 172 def delete_by_value(name, value) # rubocop:disable Naming/PredicateMethod removed = Wurk.redis { |conn| conn.call('ZREM', name, value) } removed.to_i.positive? end |
#each ⇒ Object
Newest-first paged ZRANGE. Yields a SortedEntry per row.
75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 |
# File 'lib/wurk/job_set.rb', line 75 def each return enum_for(:each) unless block_given? page = 0 added = 0 loop do start = page * PAGE_SIZE stop = start + PAGE_SIZE - 1 slice = Wurk.redis { |conn| conn.call('ZRANGE', @name, start, stop, 'REV', 'WITHSCORES') } slice.each do |value, score| yield SortedEntry.new(self, score, value) added += 1 end break if slice.size < PAGE_SIZE page += 1 end added end |
#fetch(score, jid = nil) ⇒ Object
O(score) lookup. score accepts Time, Numeric, or a Range of either.
Returns the matching entries (possibly multiple at the same exact
score). When jid is set, narrows to the single (score, jid) pair.
145 146 147 148 149 150 151 |
# File 'lib/wurk/job_set.rb', line 145 def fetch(score, jid = nil) results = Wurk.redis { |conn| conn.call('ZRANGEBYSCORE', @name, *range_args(score), 'WITHSCORES') } entries = results.map { |value, sc| SortedEntry.new(self, sc, value) } return entries unless jid entries.select { |e| e.jid == jid } end |
#find_job(jid) ⇒ Object
ZSCAN-based search by jid substring. Returns the first matching entry or nil. O(n) on the ZSET — callers iterating many jids should switch to per-jid hashes or the score-based fetch.
156 157 158 159 160 161 162 |
# File 'lib/wurk/job_set.rb', line 156 def find_job(jid) scan(jid) do |value, score| entry = SortedEntry.new(self, score, value) return entry if entry.jid == jid end nil end |
#kill_all(notify_failure: true, ex: nil) ⇒ Object
Moves every job in this set to the dead set. Death handlers fire per
entry by default — each(&:kill) equivalence with Sidekiq; pass
notify_failure: false to suppress. Returns the count of jobs moved.
128 129 130 131 132 133 134 135 136 137 138 139 140 |
# File 'lib/wurk/job_set.rb', line 128 def kill_all(notify_failure: true, ex: nil) count = 0 dead = DeadSet.new until size.zero? each do |entry| entry.send(:remove_job) do || dead.kill(Wurk.dump_json(), notify_failure: notify_failure, ex: ex) end count += 1 end end count end |
#pop_each ⇒ Object
ZPOPMIN loop. Each iteration pops the single oldest member (lowest score, e.g. earliest scheduled-at) and yields the raw JSON + score. Used by the scheduled-poller: it enqueues each popped job through the client. Stops when the set is empty.
99 100 101 102 103 104 105 106 107 108 109 |
# File 'lib/wurk/job_set.rb', line 99 def pop_each loop do result = Wurk.redis { |conn| conn.call('ZPOPMIN', @name, 1) } break if result.nil? || result.empty? # Newer redis-client returns nested `[[value, score]]` even with COUNT 1; # older `[value, score]`. Normalize both. value, score = result.first.is_a?(Array) ? result.first : result yield value, score.to_f end end |
#remove_job(entry) ⇒ Object
Removes the exact (score, jid)-matching member. Backs SortedEntry#delete when no cached value bytes are present.
166 167 168 |
# File 'lib/wurk/job_set.rb', line 166 def remove_job(entry) delete_by_value(@name, entry.value) || delete_by_jid(entry.score, entry.jid) end |
#retry_all ⇒ Object
Re-enqueues every job in this set via the client. Lossy on errors mid-iteration; callers expecting transactional behavior should batch the work themselves.
114 115 116 117 118 119 120 121 122 123 |
# File 'lib/wurk/job_set.rb', line 114 def retry_all count = 0 until size.zero? each do |entry| entry.retry count += 1 end end count end |