Module: Wurk::Unique
- Defined in:
- lib/wurk/unique.rb
Overview
Sidekiq Enterprise unique jobs. Best-effort dedup at enqueue time keyed
by a SHA256 digest of [class, queue, args] (overridable via
sidekiq_unique_context). Three lock-release strategies:
* `unique_until: :success` (default) — lock retained through retries;
server middleware DELs it on successful perform. Surviving across
a process crash is bounded by `unique_for` TTL.
* `unique_until: :start` — server middleware DELs the lock right
*before* invoking perform; a duplicate can be enqueued while the
first is running.
A job that dies automatically (retries exhausted / discarded) releases its lock via a death handler; manual UI kills keep the lock until TTL expiry (Ent wiki, Ent-Unique-Jobs).
Wire-compat (§3.9): single-key Redis layout — unique:<sha256> STRING
holding the owning JID. Scheduled jobs extend the TTL by the delay so
the lock covers the entire wait+execution window (§3.4).
This is the native replacement for the sidekiq-unique-jobs gem; see the
migration guide for the lock: → unique_until: translation table.
Spec: docs/target/sidekiq-ent.md §3.
Defined Under Namespace
Classes: ClientMiddleware, ServerMiddleware
Constant Summary collapse
- KEY_PREFIX =
'unique:'- DEFAULT_UNTIL =
:success- VALID_UNTIL =
%i[success start].freeze
- DEATH_HANDLER =
Ent parity: a job that dies automatically releases its lock so a duplicate can enqueue immediately. Manual API/UI kills keep the lock until TTL expiry (Ent wiki, Ent-Unique-Jobs) — they reach death handlers too (Sidekiq fires them on API kills), so we recognize the synthesized kill exception and skip the release for it. Atomic CAS-DEL via the shared Lua script mirrors ServerMiddleware#release.
lambda do |job, exception| next unless Wurk::Unique.enabled? next unless Wurk::Unique.coerce_ttl(job['unique_for']) next if exception.instance_of?(::RuntimeError) && exception. == DeadSet::API_KILL_MESSAGE Wurk.redis { |conn| Wurk::Unique.release_if_owner(conn, Wurk::Unique.lock_key_for(job), job['jid']) } end
Class Method Summary collapse
-
.coerce_ttl(value) ⇒ Object
Coerce
unique_forto a numeric seconds value. -
.disable! ⇒ Object
Test helper — not part of the public Sidekiq surface.
- .duration_like?(value) ⇒ Boolean
-
.enable! ⇒ Object
rubocop:disable Naming/PredicateMethod.
- .enabled? ⇒ Boolean
-
.lock_key(klass, queue, args) ⇒ Object
Compute the lock key for an arbitrary
(queue, klass, args)triple. -
.lock_key_for(job) ⇒ Object
Compute the lock key from a job payload, honoring
sidekiq_unique_contextwhen the worker class is loaded and defines it. -
.locked?(queue_or_klass, klass_or_args = nil, args = nil) ⇒ String?
Owning jid, or nil when the lock is free.
-
.release_if_owner(conn, key, jid) ⇒ Object
Atomic compare-and-delete of a unique lock key.
-
.unique_context(job) ⇒ Object
Default:
[class, queue, args].
Class Method Details
.coerce_ttl(value) ⇒ Object
Coerce unique_for to a numeric seconds value. Accepts Integer,
Numeric, ActiveSupport::Duration (any to_i-respondent), or false
(skip). Returns nil when uniqueness should be skipped.
148 149 150 151 152 153 154 155 156 157 158 159 |
# File 'lib/wurk/unique.rb', line 148 def self.coerce_ttl(value) return nil if value.nil? || value == false # `.to_i`, not `value`: ActiveSupport::Duration overrides `is_a?(Integer)` # to return true (it delegates to its underlying value), so `1.hour` passes # this guard — returning the raw Duration handed redis-client a non-Integer # EX arg and raised TypeError at enqueue (#253). return value.to_i if value.is_a?(Integer) && value.positive? return value.to_i if value.is_a?(Numeric) return value.to_i if duration_like?(value) nil end |
.disable! ⇒ Object
Test helper — not part of the public Sidekiq surface. Clears the flag so per-test enable!/disable! does not leak across runs.
92 93 94 95 |
# File 'lib/wurk/unique.rb', line 92 def disable! @enabled = false nil end |
.duration_like?(value) ⇒ Boolean
161 162 163 164 165 |
# File 'lib/wurk/unique.rb', line 161 def self.duration_like?(value) return false unless value.respond_to?(:to_i) value.respond_to?(:since) || value.class.name.to_s.include?('Duration') end |
.enable! ⇒ Object
rubocop:disable Naming/PredicateMethod
84 85 86 87 88 |
# File 'lib/wurk/unique.rb', line 84 def enable! # rubocop:disable Naming/PredicateMethod @enabled = true register_middleware! true end |
.enabled? ⇒ Boolean
80 81 82 |
# File 'lib/wurk/unique.rb', line 80 def enabled? @enabled == true end |
.lock_key(klass, queue, args) ⇒ Object
Compute the lock key for an arbitrary (queue, klass, args) triple.
Used by both the client middleware and the public locked? probe so
they cannot drift.
100 101 102 103 |
# File 'lib/wurk/unique.rb', line 100 def lock_key(klass, queue, args) context = [klass.to_s, queue.to_s, args] "#{KEY_PREFIX}#{Digest::SHA256.hexdigest(JSON.dump(context))}" end |
.lock_key_for(job) ⇒ Object
Compute the lock key from a job payload, honoring
sidekiq_unique_context when the worker class is loaded and
defines it.
108 109 110 111 |
# File 'lib/wurk/unique.rb', line 108 def lock_key_for(job) context = unique_context(job) "#{KEY_PREFIX}#{Digest::SHA256.hexdigest(JSON.dump(context))}" end |
.locked?(queue_or_klass, klass_or_args = nil, args = nil) ⇒ String?
Returns owning jid, or nil when the lock is free.
172 173 174 175 176 |
# File 'lib/wurk/unique.rb', line 172 def self.locked?(queue_or_klass, klass_or_args = nil, args = nil) queue, klass, payload = normalize_locked_args(queue_or_klass, klass_or_args, args) key = lock_key(klass, queue, payload) Wurk.redis { |c| c.call('GET', key) } end |
.release_if_owner(conn, key, jid) ⇒ Object
Atomic compare-and-delete of a unique lock key. Two-command
GET-then-DEL is not a real CAS — the key can expire between the GET
and DEL, letting a fresh enqueue grab it, and the bare DEL would
then drop the new owner's lock. Routed through a single Lua script
(Wurk::Lua::RELEASE_IF_OWNER) shared by ServerMiddleware#release
(normal success/start release) and DEATH_HANDLER (automatic-death
release) so the two paths cannot drift.
71 72 73 |
# File 'lib/wurk/unique.rb', line 71 def self.release_if_owner(conn, key, jid) Wurk::Lua::Loader.eval_cached(conn, :release_if_owner, keys: [key], argv: [jid]) end |
.unique_context(job) ⇒ Object
Default: [class, queue, args]. Workers may override by defining
self.sidekiq_unique_context(job) returning any JSON-serializable
value (e.g. a subset of args). Spec §3.5.
116 117 118 119 120 121 122 123 |
# File 'lib/wurk/unique.rb', line 116 def unique_context(job) klass = resolve_class(job['class']) if klass.respond_to?(:sidekiq_unique_context) klass.sidekiq_unique_context(job) else [job['class'], job['queue'], job['args']] end end |