Class: Wurk::Cron::Poller
- Inherits:
-
Object
- Object
- Wurk::Cron::Poller
- Includes:
- Wurk::Component
- Defined in:
- lib/wurk/cron.rb
Overview
Once-per-minute tick driver. Only the cluster leader iterates the LoopSet
and enqueues; non-leaders return early. Missed-tick warning when
wall-clock has drifted more than MISSED_TICK_THRESHOLD seconds past the
expected fire.
Instance Attribute Summary collapse
-
#config ⇒ Object
included
from Wurk::Component
readonly
Returns the value of attribute config.
Instance Method Summary collapse
- #default_tag(dir = Dir.pwd) ⇒ Object included from Wurk::Component
- #enqueue_if_due(loop_obj) ⇒ Object
-
#fire(loop_obj) ⇒ Object
Fire one loop right now, bypassing both the leader gate and the schedule due-check, recording history + advancing the fire marks exactly like a real tick.
-
#fire_event(event, oneshot: true, reverse: false, reraise: false) ⇒ Object
included
from Wurk::Component
Invokes lifecycle hooks for
event. - #handle_exception(ex, ctx = {}) ⇒ Object included from Wurk::Component
- #hostname ⇒ Object included from Wurk::Component
- #identity ⇒ Object included from Wurk::Component
-
#initialize(config) ⇒ Poller
constructor
A new instance of Poller.
-
#leader? ⇒ Boolean
included
from Wurk::Component
True iff this process currently holds the cluster
dear-leaderlock. -
#logger ⇒ Object
included
from Wurk::Component
--- delegated to config -------------------------------------------.
- #mono_ms ⇒ Object included from Wurk::Component
- #process_nonce ⇒ Object included from Wurk::Component
-
#real_ms ⇒ Object
included
from Wurk::Component
--- clocks ---------------------------------------------------------.
- #redis ⇒ Object included from Wurk::Component
-
#safe_thread(name, priority: nil, &block) ⇒ Object
included
from Wurk::Component
Spawns a named thread that runs
blockunderwatchdog(name). - #start ⇒ Object
- #terminate ⇒ Object
-
#tick ⇒ Object
Leader-gated by the single cluster lock (Component#leader? reads
dear-leader): non-leaders return early and never iterate the LoopSet. -
#tid ⇒ Object
included
from Wurk::Component
--- identity -------------------------------------------------------.
-
#watchdog(last_words) ⇒ Object
included
from Wurk::Component
Wraps a block at a thread boundary: any unhandled exception is reported via handle_exception (so it lands in error_handlers / the log) and then re-raised.
Constructor Details
#initialize(config) ⇒ Poller
Returns a new instance of Poller.
489 490 491 492 493 494 495 496 497 498 499 |
# File 'lib/wurk/cron.rb', line 489 def initialize(config) @config = config @done = false @mutex = ::Mutex.new @sleeper = ::ConditionVariable.new @client = Client.new(config: config) @thread = nil # Operators never need to touch this; integration tests shrink it so a # due loop fires within the test window instead of waiting a full minute. @tick_interval = config[:cron_tick_interval] || DEFAULT_TICK_SECONDS end |
Instance Attribute Details
#config ⇒ Object (readonly) Originally defined in module Wurk::Component
Returns the value of attribute config.
Instance Method Details
#default_tag(dir = Dir.pwd) ⇒ Object Originally defined in module Wurk::Component
#enqueue_if_due(loop_obj) ⇒ Object
533 534 535 536 537 538 539 540 541 542 543 544 545 546 |
# File 'lib/wurk/cron.rb', line 533 def enqueue_if_due(loop_obj) return if loop_obj.paused? now = ::Time.now.to_i prev_fire, next_fire = read_fire_marks(loop_obj.lid) next_fire ||= loop_obj.next_fire_at(prev_fire || (now - @tick_interval)) return if next_fire.nil? || next_fire > now warn_missed_tick(loop_obj, next_fire, now) jid = enqueue!(loop_obj) future = loop_obj.next_fire_after(next_fire, now) record_fire(loop_obj, jid, now, future) jid end |
#fire(loop_obj) ⇒ Object
Fire one loop right now, bypassing both the leader gate and the schedule due-check, recording history + advancing the fire marks exactly like a real tick. Powers Cron.fire! (deterministic specs / manual "run now"); the scheduled, leader-gated path stays #tick.
552 553 554 555 556 557 |
# File 'lib/wurk/cron.rb', line 552 def fire(loop_obj) now = ::Time.now.to_i jid = enqueue!(loop_obj) record_fire(loop_obj, jid, now, loop_obj.next_fire_at(now)) jid end |
#fire_event(event, oneshot: true, reverse: false, reraise: false) ⇒ Object Originally defined in module Wurk::Component
Invokes lifecycle hooks for event. Hooks run in registration order
(or LIFO when reverse: true, used for teardown). A raise in one hook
is reported via handle_exception and does NOT stop the next hook unless
reraise: true (used in tests / fail-fast boot). oneshot: true
clears the bucket after dispatch so the event can't fire twice.
#handle_exception(ex, ctx = {}) ⇒ Object Originally defined in module Wurk::Component
#hostname ⇒ Object Originally defined in module Wurk::Component
#identity ⇒ Object Originally defined in module Wurk::Component
#leader? ⇒ Boolean Originally defined in module Wurk::Component
True iff this process currently holds the cluster dear-leader lock.
Per spec, the check is performed at call time (Wurk does not cache);
callers must not poll faster than the 60s follower cadence. Returns
false unconditionally when WURK_LEADER=false (or SIDEKIQ_LEADER=false)
is set on the process (opt-out hot-standby). Any Redis error is swallowed →
false, so a transient partition can't propagate as an exception into user
code.
Spec: docs/target/sidekiq-ent.md §6.1.
#logger ⇒ Object Originally defined in module Wurk::Component
--- delegated to config -------------------------------------------
#mono_ms ⇒ Object Originally defined in module Wurk::Component
#process_nonce ⇒ Object Originally defined in module Wurk::Component
#real_ms ⇒ Object Originally defined in module Wurk::Component
--- clocks ---------------------------------------------------------
#redis ⇒ Object Originally defined in module Wurk::Component
#safe_thread(name, priority: nil, &block) ⇒ Object Originally defined in module Wurk::Component
Spawns a named thread that runs block under watchdog(name). The
parent must retain the returned Thread; otherwise GC may not, but
report_on_exception is disabled so we don't double-log on death.
#start ⇒ Object
501 502 503 504 505 506 507 508 509 510 511 512 |
# File 'lib/wurk/cron.rb', line 501 def start @poller_thread ||= safe_thread('cron-poller') do # rubocop:disable Naming/MemoizedInstanceVariableName # Wait one interval before the first tick: don't fire a catch-up burst # the instant we boot (the leader is barely settled), and let a # short-lived process exit without ticking at all. wait until @done tick wait end end end |
#terminate ⇒ Object
514 515 516 517 518 519 |
# File 'lib/wurk/cron.rb', line 514 def terminate @mutex.synchronize do @done = true @sleeper.signal end end |
#tick ⇒ Object
Leader-gated by the single cluster lock (Component#leader? reads
dear-leader): non-leaders return early and never iterate the LoopSet.
The Launcher owns the lock's renewal — the poller no longer runs (or
expires) its own.
525 526 527 528 529 530 531 |
# File 'lib/wurk/cron.rb', line 525 def tick return unless leader? LoopSet.new(@config).each { |lp| enqueue_if_due(lp) } rescue StandardError => e handle_exception(e, { context: 'cron-poller' }) end |
#tid ⇒ Object Originally defined in module Wurk::Component
--- identity -------------------------------------------------------
#watchdog(last_words) ⇒ Object Originally defined in module Wurk::Component
Wraps a block at a thread boundary: any unhandled exception is reported
via handle_exception (so it lands in error_handlers / the log) and then
re-raised. last_words is the component label included in the context.