Module: Wurk::IterableJob
- Defined in:
- lib/wurk/iterable_job.rb,
lib/wurk/iterable_job/csv_enumerator.rb,
lib/wurk/iterable_job/active_record_enumerator.rb
Overview
Iterable jobs split long-running work into small, idempotent chunks.
Override #build_enumerator (yielding [item, new_cursor] pairs) and
#each_iteration(item, *args); the framework drives the loop, persists
the cursor, and resumes after interruption.
Defining #perform on the including class is refused at method_added
— IterableJob owns the run loop. User code overrides #each_iteration.
State lives in the it-<jid> HASH (sidekiq-free.md §1.5):
ex : execution count (int)
c : cursor (JSON string)
rt : runtime accumulated (float seconds)
cancelled : timestamp (int) if cancelled
Spec: docs/target/sidekiq-free.md §6.4.
Defined Under Namespace
Modules: MethodAddedGuard Classes: ActiveRecordEnumerator, CsvEnumerator
Constant Summary collapse
- Interrupted =
Alias to the canonical
Wurk::Job::Interrupted. The exception lives onWurk::Jobso non-iterable code paths (manualinterrupted?checks) can raise the same class; the interrupt-handler middleware rescues by theWurk::Job::Interruptedname. Wurk::Job::Interrupted
- STATE_TTL =
Default expiry for an iteration state HASH while the job is running or awaiting resume. Refreshed on every checkpoint.
30 * 86_400
- STATE_FLUSH_INTERVAL =
Cursor flush + cancellation poll cadence. Both share the timer so a long-running iteration that hits the 5-second mark checkpoints and checks for cross-process cancellation in the same tick.
5- CANCELLATION_PERIOD =
Shorter TTL applied once the state is marked cancelled. The HASH outlives
cancel!long enough for live workers to observe the flag but is reaped well before the 30-day default would expire. 3 * 86_400
Instance Attribute Summary collapse
-
#current_object ⇒ Object
readonly
--- iteration state accessors --------------------------------------.
Instance Method Summary collapse
- #active_record_batches_enumerator(relation, cursor:) ⇒ Object
- #active_record_records_enumerator(relation, cursor:) ⇒ Object
- #active_record_relations_enumerator(relation, cursor:) ⇒ Object
- #arguments ⇒ Object
- #around_iteration ⇒ Object
-
#array_enumerator(array, cursor:) ⇒ Object
--- enumerator builders (§6.4) ------------------------------------- Helpers user code calls from
#build_enumeratorto get a resumable enumerator of[item, cursor]pairs. -
#build_enumerator(cursor:) ⇒ Object
User overrides — must return an Enumerator yielding
[item, new_cursor]pairs. -
#cancel! ⇒ Object
Mark this iteration cancelled.
-
#cancelled? ⇒ Boolean
True once
cancel!has been called locally, OR — for cross-process cancellation — once thecancelledfield appears in theit-<jid>HASH. - #csv_batches_enumerator(csv, cursor:) ⇒ Object
- #csv_enumerator(csv, cursor:) ⇒ Object
- #cursor ⇒ Object
- #each_iteration ⇒ Object
-
#iteration_key ⇒ Object
Redis HASH key holding iteration state for this job.
- #on_cancel ⇒ Object
- #on_complete ⇒ Object
- #on_resume ⇒ Object
-
#on_start ⇒ Object
--- lifecycle hooks (no-op defaults; users override as needed) -----.
- #on_stop ⇒ Object
-
#perform(*args) ⇒ Object
Foundation run loop.
Instance Attribute Details
#current_object ⇒ Object (readonly)
--- iteration state accessors --------------------------------------
122 123 124 |
# File 'lib/wurk/iterable_job.rb', line 122 def current_object @current_object end |
Instance Method Details
#active_record_batches_enumerator(relation, cursor:) ⇒ Object
100 101 102 |
# File 'lib/wurk/iterable_job.rb', line 100 def active_record_batches_enumerator(relation, cursor:, **) ActiveRecordEnumerator.new(relation, cursor: cursor, **).batches end |
#active_record_records_enumerator(relation, cursor:) ⇒ Object
96 97 98 |
# File 'lib/wurk/iterable_job.rb', line 96 def active_record_records_enumerator(relation, cursor:, **) ActiveRecordEnumerator.new(relation, cursor: cursor, **).records end |
#active_record_relations_enumerator(relation, cursor:) ⇒ Object
104 105 106 |
# File 'lib/wurk/iterable_job.rb', line 104 def active_record_relations_enumerator(relation, cursor:, **) ActiveRecordEnumerator.new(relation, cursor: cursor, **).relations end |
#arguments ⇒ Object
124 125 126 |
# File 'lib/wurk/iterable_job.rb', line 124 def arguments @arguments ||= [] end |
#around_iteration ⇒ Object
116 117 118 |
# File 'lib/wurk/iterable_job.rb', line 116 def around_iteration yield end |
#array_enumerator(array, cursor:) ⇒ Object
--- enumerator builders (§6.4) -------------------------------------
Helpers user code calls from #build_enumerator to get a resumable
enumerator of [item, cursor] pairs. Cursor parity with Sidekiq:
array/CSV use the integer index; ActiveRecord uses the record's
primary key.
81 82 83 84 85 86 |
# File 'lib/wurk/iterable_job.rb', line 81 def array_enumerator(array, cursor:) raise ArgumentError, 'array must be an Array' unless array.is_a?(::Array) x = array.each_with_index.drop(cursor || 0) x.to_enum { x.size } end |
#build_enumerator(cursor:) ⇒ Object
User overrides — must return an Enumerator yielding [item, new_cursor]
pairs. The cursor must round-trip through JSON.
66 67 68 69 |
# File 'lib/wurk/iterable_job.rb', line 66 def build_enumerator(*, cursor:) _ = cursor raise NotImplementedError, "#{self.class} must override #build_enumerator" end |
#cancel! ⇒ Object
Mark this iteration cancelled. Sets the in-process flag immediately
(so the next cancelled? check inside the run loop trips) and, when
a jid is bound, writes the timestamp to the it-<jid> HASH so other
processes observe it on their next 5-second poll.
Returns the integer epoch-seconds timestamp written.
138 139 140 141 142 143 144 |
# File 'lib/wurk/iterable_job.rb', line 138 def cancel! ts_ms = ::Process.clock_gettime(::Process::CLOCK_REALTIME, :millisecond) @cancelled_at ||= ts_ms ts = ts_ms / 1000 persist_cancellation(ts) ts end |
#cancelled? ⇒ Boolean
True once cancel! has been called locally, OR — for cross-process
cancellation — once the cancelled field appears in the it-<jid>
HASH. The remote check is rate-limited to once per STATE_FLUSH_INTERVAL
to keep the hot loop cheap.
150 151 152 153 154 155 156 157 158 |
# File 'lib/wurk/iterable_job.rb', line 150 def cancelled? return true if @cancelled_at ts = poll_remote_cancellation return false unless ts @cancelled_at = ts * 1000 true end |
#csv_batches_enumerator(csv, cursor:) ⇒ Object
92 93 94 |
# File 'lib/wurk/iterable_job.rb', line 92 def csv_batches_enumerator(csv, cursor:, **) CsvEnumerator.new(csv).batches(cursor: cursor, **) end |
#csv_enumerator(csv, cursor:) ⇒ Object
88 89 90 |
# File 'lib/wurk/iterable_job.rb', line 88 def csv_enumerator(csv, cursor:) CsvEnumerator.new(csv).rows(cursor: cursor) end |
#cursor ⇒ Object
128 129 130 |
# File 'lib/wurk/iterable_job.rb', line 128 def cursor @cursor end |
#each_iteration ⇒ Object
71 72 73 |
# File 'lib/wurk/iterable_job.rb', line 71 def each_iteration(*) raise NotImplementedError, "#{self.class} must override #each_iteration" end |
#iteration_key ⇒ Object
Redis HASH key holding iteration state for this job. Wire-compat
with Sidekiq's it-<jid> schema (sidekiq-free.md §1.5).
162 163 164 |
# File 'lib/wurk/iterable_job.rb', line 162 def iteration_key "it-#{jid}" end |
#on_cancel ⇒ Object
113 |
# File 'lib/wurk/iterable_job.rb', line 113 def on_cancel; end |
#on_complete ⇒ Object
114 |
# File 'lib/wurk/iterable_job.rb', line 114 def on_complete; end |
#on_resume ⇒ Object
111 |
# File 'lib/wurk/iterable_job.rb', line 111 def on_resume; end |
#on_start ⇒ Object
--- lifecycle hooks (no-op defaults; users override as needed) -----
110 |
# File 'lib/wurk/iterable_job.rb', line 110 def on_start; end |
#on_stop ⇒ Object
112 |
# File 'lib/wurk/iterable_job.rb', line 112 def on_stop; end |
#perform(*args) ⇒ Object
Foundation run loop. Loads any persisted state, drives the enumerator,
checkpoints every STATE_FLUSH_INTERVAL, and on interruption persists
the final cursor before re-raising so the interrupt-handler middleware
can re-push the job at the head of the queue.
170 171 172 173 174 175 176 177 178 179 180 181 182 |
# File 'lib/wurk/iterable_job.rb', line 170 def perform(*args) reset_run_state(args) load_state fire_lifecycle_start @executions += 1 run_iterations(args) finalize_complete rescue Interrupted finalize_interrupted raise end |