Class: Wurk::Scheduled::Poller

Inherits:
Object
  • Object
show all
Includes:
Component
Defined in:
lib/wurk/scheduled.rb

Overview

Single thread that wakes on a randomized interval, drains both ZSETs, then sleeps again. Random spread prevents the cluster from dogpiling Redis at the top of each cadence.

Constant Summary collapse

INITIAL_WAIT =
10

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config) ⇒ Poller

Returns a new instance of Poller.



122
123
124
125
126
127
128
129
130
131
# File 'lib/wurk/scheduled.rb', line 122

def initialize(config)
  @config = config
  @enq = (config[:scheduled_enq] || Enq).new(config)
  @done = false
  @mutex = ::Mutex.new
  @sleeper = ::ConditionVariable.new
  @thread = nil
  @rnd = ::Random.new
  @last_cleanup_ms = 0
end

Instance Attribute Details

#configObject (readonly) Originally defined in module Component

Returns the value of attribute config.

#rndObject

Returns the value of attribute rnd.



120
121
122
# File 'lib/wurk/scheduled.rb', line 120

def rnd
  @rnd
end

Instance Method Details

#default_tag(dir = Dir.pwd) ⇒ Object Originally defined in module Component

#enqueueObject

Called on every wake. Any raise inside the Enq is reported and the loop continues — a transient Redis blip must not kill the scheduler.



160
161
162
163
164
# File 'lib/wurk/scheduled.rb', line 160

def enqueue
  @enq.enqueue_jobs
rescue StandardError => e
  handle_exception(e, { context: 'scheduler' })
end

#fire_event(event, oneshot: true, reverse: false, reraise: false) ⇒ Object Originally defined in module Component

Invokes lifecycle hooks for event. Hooks run in registration order (or LIFO when reverse: true, used for teardown). A raise in one hook is reported via handle_exception and does NOT stop the next hook unless reraise: true (used in tests / fail-fast boot). oneshot: true clears the bucket after dispatch so the event can't fire twice.

#handle_exception(ex, ctx = {}) ⇒ Object Originally defined in module Component

#hostnameObject Originally defined in module Component

#identityObject Originally defined in module Component

#leader?Boolean Originally defined in module Component

True iff this process currently holds the cluster dear-leader lock. Per spec, the check is performed at call time (Wurk does not cache); callers must not poll faster than the 60s follower cadence. Returns false unconditionally when WURK_LEADER=false (or SIDEKIQ_LEADER=false) is set on the process (opt-out hot-standby). Any Redis error is swallowed → false, so a transient partition can't propagate as an exception into user code.

Spec: docs/target/sidekiq-ent.md §6.1.

Returns:

  • (Boolean)

#loggerObject Originally defined in module Component

--- delegated to config -------------------------------------------

#mono_msObject Originally defined in module Component

#process_nonceObject Originally defined in module Component

#real_msObject Originally defined in module Component

--- clocks ---------------------------------------------------------

#redisObject Originally defined in module Component

#safe_thread(name, priority: nil, &block) ⇒ Object Originally defined in module Component

Spawns a named thread that runs block under watchdog(name). The parent must retain the returned Thread; otherwise GC may not, but report_on_exception is disabled so we don't double-log on death.

#startObject

Spawns the scheduler thread. INITIAL_WAIT delays the first sweep so a fleet-wide deploy doesn't have every freshly-booted process hit Redis simultaneously.



136
137
138
139
140
141
142
143
144
145
# File 'lib/wurk/scheduled.rb', line 136

def start
  @thread ||= safe_thread('scheduler') do # rubocop:disable Naming/MemoizedInstanceVariableName
    initial_wait
    until @done
      enqueue
      wait
    end
    logger.info('Scheduler exiting...')
  end
end

#terminateObject

Idempotent. Wakes the sleeping thread so it observes @done and exits. Also propagates the stop signal to @enq so any in-flight drain loop short-circuits instead of running to completion.



150
151
152
153
154
155
156
# File 'lib/wurk/scheduled.rb', line 150

def terminate
  @mutex.synchronize do
    @done = true
    @enq.terminate
    @sleeper.signal
  end
end

#tidObject Originally defined in module Component

--- identity -------------------------------------------------------

#watchdog(last_words) ⇒ Object Originally defined in module Component

Wraps a block at a thread boundary: any unhandled exception is reported via handle_exception (so it lands in error_handlers / the log) and then re-raised. last_words is the component label included in the context.