Class: Wurk::JobSet

Inherits:
SortedSet show all
Defined in:
lib/wurk/job_set.rb

Overview

ZSET-of-jobs view. Reverse-paged iteration so callers see newest-first (highest score, i.e. furthest-out retry/schedule). Mutations use ZREM-by-value when the exact bytes are known and a (score, jid) scan otherwise.

Spec: docs/target/sidekiq-free.md §19.5.

Direct Known Subclasses

DeadSet, RetrySet, ScheduledSet

Instance Method Summary collapse

Constructor Details

This class inherits a constructor from Wurk::SortedSet

Instance Method Details

#delete_by_jid(score, jid) ⇒ Object Also known as: delete

Scan the score bracket for a jid match, ZREM the exact bytes once found. Returns true on success. Aliased as delete for Sidekiq wire-compat. Per-row JSON rescue so a single malformed entry can't shadow a valid match at the same score.



181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
# File 'lib/wurk/job_set.rb', line 181

def delete_by_jid(score, jid) # rubocop:disable Naming/PredicateMethod
  Wurk.redis do |conn|
    rows = conn.call('ZRANGEBYSCORE', @name, score.to_f, score.to_f)
    rows.each do |raw|
      parsed = begin
        Wurk.load_json(raw)
      rescue ::JSON::ParserError
        nil
      end
      next unless parsed && parsed['jid'] == jid

      return conn.call('ZREM', @name, raw).to_i.positive?
    end
  end
  false
end

#delete_by_value(name, value) ⇒ Object

ZREM by exact bytes. Returns true when ≥1 element was removed. Method name is Sidekiq wire-compat — delete_by_value? would break the alias.



172
173
174
175
# File 'lib/wurk/job_set.rb', line 172

def delete_by_value(name, value) # rubocop:disable Naming/PredicateMethod
  removed = Wurk.redis { |conn| conn.call('ZREM', name, value) }
  removed.to_i.positive?
end

#eachObject

Newest-first paged ZRANGE. Yields a SortedEntry per row.



75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
# File 'lib/wurk/job_set.rb', line 75

def each
  return enum_for(:each) unless block_given?

  page  = 0
  added = 0
  loop do
    start = page * PAGE_SIZE
    stop  = start + PAGE_SIZE - 1
    slice = Wurk.redis { |conn| conn.call('ZRANGE', @name, start, stop, 'REV', 'WITHSCORES') }
    slice.each do |value, score|
      yield SortedEntry.new(self, score, value)
      added += 1
    end
    break if slice.size < PAGE_SIZE

    page += 1
  end
  added
end

#fetch(score, jid = nil) ⇒ Object

O(score) lookup. score accepts Time, Numeric, or a Range of either. Returns the matching entries (possibly multiple at the same exact score). When jid is set, narrows to the single (score, jid) pair.



145
146
147
148
149
150
151
# File 'lib/wurk/job_set.rb', line 145

def fetch(score, jid = nil)
  results = Wurk.redis { |conn| conn.call('ZRANGEBYSCORE', @name, *range_args(score), 'WITHSCORES') }
  entries = results.map { |value, sc| SortedEntry.new(self, sc, value) }
  return entries unless jid

  entries.select { |e| e.jid == jid }
end

#find_job(jid) ⇒ Object

ZSCAN-based search by jid substring. Returns the first matching entry or nil. O(n) on the ZSET — callers iterating many jids should switch to per-jid hashes or the score-based fetch.



156
157
158
159
160
161
162
# File 'lib/wurk/job_set.rb', line 156

def find_job(jid)
  scan(jid) do |value, score|
    entry = SortedEntry.new(self, score, value)
    return entry if entry.jid == jid
  end
  nil
end

#kill_all(notify_failure: true, ex: nil) ⇒ Object

Moves every job in this set to the dead set. Death handlers fire per entry by default — each(&:kill) equivalence with Sidekiq; pass notify_failure: false to suppress. Returns the count of jobs moved.



128
129
130
131
132
133
134
135
136
137
138
139
140
# File 'lib/wurk/job_set.rb', line 128

def kill_all(notify_failure: true, ex: nil)
  count = 0
  dead = DeadSet.new
  until size.zero?
    each do |entry|
      entry.send(:remove_job) do |message|
        dead.kill(Wurk.dump_json(message), notify_failure: notify_failure, ex: ex)
      end
      count += 1
    end
  end
  count
end

#pop_eachObject

ZPOPMIN loop. Each iteration pops the single oldest member (lowest score, e.g. earliest scheduled-at) and yields the raw JSON + score. Used by the scheduled-poller: it enqueues each popped job through the client. Stops when the set is empty.



99
100
101
102
103
104
105
106
107
108
109
# File 'lib/wurk/job_set.rb', line 99

def pop_each
  loop do
    result = Wurk.redis { |conn| conn.call('ZPOPMIN', @name, 1) }
    break if result.nil? || result.empty?

    # Newer redis-client returns nested `[[value, score]]` even with COUNT 1;
    # older `[value, score]`. Normalize both.
    value, score = result.first.is_a?(Array) ? result.first : result
    yield value, score.to_f
  end
end

#remove_job(entry) ⇒ Object

Removes the exact (score, jid)-matching member. Backs SortedEntry#delete when no cached value bytes are present.



166
167
168
# File 'lib/wurk/job_set.rb', line 166

def remove_job(entry)
  delete_by_value(@name, entry.value) || delete_by_jid(entry.score, entry.jid)
end

#retry_allObject

Re-enqueues every job in this set via the client. Lossy on errors mid-iteration; callers expecting transactional behavior should batch the work themselves.



114
115
116
117
118
119
120
121
122
123
# File 'lib/wurk/job_set.rb', line 114

def retry_all
  count = 0
  until size.zero?
    each do |entry|
      entry.retry
      count += 1
    end
  end
  count
end

#schedule(timestamp, message) ⇒ Object

ZADD with NX so re-scheduling the same payload doesn't reset its score. Mirrors Sidekiq::JobSet#schedule exactly.



70
71
72
# File 'lib/wurk/job_set.rb', line 70

def schedule(timestamp, message)
  Wurk.redis { |conn| conn.call('ZADD', @name, timestamp.to_f.to_s, Wurk.dump_json(message)) }
end