Class: Wurk::WorkSet
- Inherits:
-
Object
- Object
- Wurk::WorkSet
- Includes:
- Enumerable
- Defined in:
- lib/wurk/work_set.rb
Overview
Live snapshot of currently-executing jobs across the cluster. Reads
<identity>:work HASH per registered process; each field is a thread
id → JSON payload. The data lags reality by up to one heartbeat (10s)
since heartbeats UNLINK and rewrite the hash atomically.
Wire-compat is sacred — every Redis call matches Sidekiq OSS exactly. Spec: docs/target/sidekiq-free.md §19.7.
Instance Method Summary collapse
-
#each ⇒ Object
Pipelined
<identity>:workHGETALL per known process. -
#find_work(jid) ⇒ Object
(also: #find_work_by_jid)
O(n) scan for a JID across all in-flight jobs.
-
#initialize(processes_key: Keys::PROCESSES) ⇒ WorkSet
constructor
Optional
processes_key:allows tests to operate on a namespaced SET; production callers always useKeys::PROCESSES(wire-compat). -
#size ⇒ Object
Sum of
busyHASH field across every known identity.
Constructor Details
#initialize(processes_key: Keys::PROCESSES) ⇒ WorkSet
Optional processes_key: allows tests to operate on a namespaced
SET; production callers always use Keys::PROCESSES (wire-compat).
16 17 18 |
# File 'lib/wurk/work_set.rb', line 16 def initialize(processes_key: Keys::PROCESSES) @processes_key = processes_key end |
Instance Method Details
#each ⇒ Object
Pipelined <identity>:work HGETALL per known process. Yields
(process_id, thread_id, Work). Result sorted by run_at so the
oldest in-flight job appears first — dashboards rely on this order.
23 24 25 26 27 |
# File 'lib/wurk/work_set.rb', line 23 def each return enum_for(:each) unless block_given? collect_rows.sort_by { |(_, _, work)| work.run_at }.each { |row| yield(*row) } end |
#find_work(jid) ⇒ Object Also known as: find_work_by_jid
O(n) scan for a JID across all in-flight jobs. Returns nil when no
match. Slow — not for app logic. Aliased as find_work_by_jid for
Sidekiq wire-compat.
46 47 48 49 50 51 |
# File 'lib/wurk/work_set.rb', line 46 def find_work(jid) each do |_process_id, _thread_id, work| return work if work.job.jid == jid end nil end |
#size ⇒ Object
Sum of busy HASH field across every known identity. Lagged by one
heartbeat. Pipelined HGET — unbounded by process count but each
call is O(1) on the Redis side.
32 33 34 35 36 37 38 39 40 41 |
# File 'lib/wurk/work_set.rb', line 32 def size Wurk.redis do |conn| procs = conn.call('SMEMBERS', @processes_key) next 0 if procs.empty? conn.pipelined do |pipe| procs.each { |key| pipe.call('HGET', key, 'busy') } end.sum(&:to_i) end end |