Class: Wurk::Cron::Poller

Inherits:
Object
  • Object
show all
Includes:
Wurk::Component
Defined in:
lib/wurk/cron.rb

Overview

Once-per-minute tick driver. Only the cluster leader iterates the LoopSet and enqueues; non-leaders return early. Missed-tick warning when wall-clock has drifted more than MISSED_TICK_THRESHOLD seconds past the expected fire.

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config) ⇒ Poller

Returns a new instance of Poller.



489
490
491
492
493
494
495
496
497
498
499
# File 'lib/wurk/cron.rb', line 489

def initialize(config)
  @config = config
  @done = false
  @mutex = ::Mutex.new
  @sleeper = ::ConditionVariable.new
  @client = Client.new(config: config)
  @thread = nil
  # Operators never need to touch this; integration tests shrink it so a
  # due loop fires within the test window instead of waiting a full minute.
  @tick_interval = config[:cron_tick_interval] || DEFAULT_TICK_SECONDS
end

Instance Attribute Details

#configObject (readonly) Originally defined in module Wurk::Component

Returns the value of attribute config.

Instance Method Details

#default_tag(dir = Dir.pwd) ⇒ Object Originally defined in module Wurk::Component

#enqueue_if_due(loop_obj) ⇒ Object



533
534
535
536
537
538
539
540
541
542
543
544
545
546
# File 'lib/wurk/cron.rb', line 533

def enqueue_if_due(loop_obj)
  return if loop_obj.paused?

  now = ::Time.now.to_i
  prev_fire, next_fire = read_fire_marks(loop_obj.lid)
  next_fire ||= loop_obj.next_fire_at(prev_fire || (now - @tick_interval))
  return if next_fire.nil? || next_fire > now

  warn_missed_tick(loop_obj, next_fire, now)
  jid = enqueue!(loop_obj)
  future = loop_obj.next_fire_after(next_fire, now)
  record_fire(loop_obj, jid, now, future)
  jid
end

#fire(loop_obj) ⇒ Object

Fire one loop right now, bypassing both the leader gate and the schedule due-check, recording history + advancing the fire marks exactly like a real tick. Powers Cron.fire! (deterministic specs / manual "run now"); the scheduled, leader-gated path stays #tick.



552
553
554
555
556
557
# File 'lib/wurk/cron.rb', line 552

def fire(loop_obj)
  now = ::Time.now.to_i
  jid = enqueue!(loop_obj)
  record_fire(loop_obj, jid, now, loop_obj.next_fire_at(now))
  jid
end

#fire_event(event, oneshot: true, reverse: false, reraise: false) ⇒ Object Originally defined in module Wurk::Component

Invokes lifecycle hooks for event. Hooks run in registration order (or LIFO when reverse: true, used for teardown). A raise in one hook is reported via handle_exception and does NOT stop the next hook unless reraise: true (used in tests / fail-fast boot). oneshot: true clears the bucket after dispatch so the event can't fire twice.

#handle_exception(ex, ctx = {}) ⇒ Object Originally defined in module Wurk::Component

#hostnameObject Originally defined in module Wurk::Component

#identityObject Originally defined in module Wurk::Component

#leader?Boolean Originally defined in module Wurk::Component

True iff this process currently holds the cluster dear-leader lock. Per spec, the check is performed at call time (Wurk does not cache); callers must not poll faster than the 60s follower cadence. Returns false unconditionally when WURK_LEADER=false (or SIDEKIQ_LEADER=false) is set on the process (opt-out hot-standby). Any Redis error is swallowed → false, so a transient partition can't propagate as an exception into user code.

Spec: docs/target/sidekiq-ent.md §6.1.

Returns:

  • (Boolean)

#loggerObject Originally defined in module Wurk::Component

--- delegated to config -------------------------------------------

#mono_msObject Originally defined in module Wurk::Component

#process_nonceObject Originally defined in module Wurk::Component

#real_msObject Originally defined in module Wurk::Component

--- clocks ---------------------------------------------------------

#redisObject Originally defined in module Wurk::Component

#safe_thread(name, priority: nil, &block) ⇒ Object Originally defined in module Wurk::Component

Spawns a named thread that runs block under watchdog(name). The parent must retain the returned Thread; otherwise GC may not, but report_on_exception is disabled so we don't double-log on death.

#startObject



501
502
503
504
505
506
507
508
509
510
511
512
# File 'lib/wurk/cron.rb', line 501

def start
  @poller_thread ||= safe_thread('cron-poller') do # rubocop:disable Naming/MemoizedInstanceVariableName
    # Wait one interval before the first tick: don't fire a catch-up burst
    # the instant we boot (the leader is barely settled), and let a
    # short-lived process exit without ticking at all.
    wait
    until @done
      tick
      wait
    end
  end
end

#terminateObject



514
515
516
517
518
519
# File 'lib/wurk/cron.rb', line 514

def terminate
  @mutex.synchronize do
    @done = true
    @sleeper.signal
  end
end

#tickObject

Leader-gated by the single cluster lock (Component#leader? reads dear-leader): non-leaders return early and never iterate the LoopSet. The Launcher owns the lock's renewal — the poller no longer runs (or expires) its own.



525
526
527
528
529
530
531
# File 'lib/wurk/cron.rb', line 525

def tick
  return unless leader?

  LoopSet.new(@config).each { |lp| enqueue_if_due(lp) }
rescue StandardError => e
  handle_exception(e, { context: 'cron-poller' })
end

#tidObject Originally defined in module Wurk::Component

--- identity -------------------------------------------------------

#watchdog(last_words) ⇒ Object Originally defined in module Wurk::Component

Wraps a block at a thread boundary: any unhandled exception is reported via handle_exception (so it lands in error_handlers / the log) and then re-raised. last_words is the component label included in the context.