Class: Wurk::Middleware::Expiry
- Inherits:
-
Object
- Object
- Wurk::Middleware::Expiry
- Includes:
- ServerMiddleware
- Defined in:
- lib/wurk/middleware/expiry.rb
Overview
Server middleware. Drops jobs whose expiry timestamp (stamped at push
by the client from sidekiq_options expires_in:) has passed before
perform gets a chance to start. Once perform is invoked, expiry no
longer preempts โ long-running jobs that started in time finish.
The skip path:
* bumps Wurk::Processor::EXPIRED so the heartbeat flushes
`stat:expired` + `stat:expired:YYYY-MM-DD` to Redis, surfacing the
count in Wurk::Stats and the dashboard
* emits `jobs.expired` via Wurk::Metrics::Statsd (no-op when no client
is configured)
* returns without yielding โ no exception, so JobRetry treats it as a
clean exit and the processor acks the UoW
* counts as a batch success: because this middleware is registered
AFTER `Wurk::Batch::ServerMiddleware`, returning unwinds back through
batch's `yield`, and batch's `ack_success` still runs on the way out
The expired job is also counted toward PROCESSED โ Processor#stats's ensure block always increments PROCESSED, so EXPIRED is an additive subset (executed = processed - failed - expired). Matches Sidekiq Pro.
Spec: docs/target/sidekiq-pro.md ยง7.
Instance Attribute Summary collapse
-
#config ⇒ Object
included
from ServerMiddleware
Returns the value of attribute config.
Instance Method Summary collapse
- #call(_job_instance, job, _queue) ⇒ Object
- #logger ⇒ Object included from ServerMiddleware
- #redis ⇒ Object included from ServerMiddleware
- #redis_pool ⇒ Object included from ServerMiddleware
Instance Attribute Details
#config ⇒ Object Originally defined in module ServerMiddleware
Returns the value of attribute config.
Instance Method Details
#call(_job_instance, job, _queue) ⇒ Object
34 35 36 37 38 39 40 41 42 43 44 45 |
# File 'lib/wurk/middleware/expiry.rb', line 34 def call(_job_instance, job, _queue) expiry = job['expiry'] return yield unless expiry if ::Time.now.to_f > expiry.to_f Wurk::Processor::EXPIRED.incr Wurk::Metrics::Statsd.increment('jobs.expired', tags: ["class:#{job['class']}"]) return end yield end |