Class: Wurk::Limiter::Concurrent
- Defined in:
- lib/wurk/limiter/concurrent.rb
Overview
Atomic slot acquisition in a ZSET. Score = expiry epoch; the acquire
script first evicts expired slots (bumping the reclaimed metric)
then ZADDs if there's headroom.
On exhaustion: spin loop with backoff. The spec says "blocks via
Redis stream XREAD" — that's a perf optimization; the visible
behavior is identical: blocks up to wait_timeout then OverLimit
(or silent return for policy: :ignore).
Constant Summary collapse
- WAIT_SLEEP =
0.05- METRIC_FIELDS =
%w[held held_time immediate waited wait_time overages reclaimed].freeze
Instance Method Summary collapse
- #size ⇒ Object
- #state_keys ⇒ Object protected
-
#status ⇒ Object
Uniform
{ used:, limit:, reset_at:, available? }(#16) merged with the concurrent-only metric counters (§1.5) the dashboard already renders. - #type ⇒ Object
- #within_limit(&block) ⇒ Object
Constructor Details
This class inherits a constructor from Wurk::Limiter::Base
Instance Method Details
#size ⇒ Object
22 23 24 |
# File 'lib/wurk/limiter/concurrent.rb', line 22 def size Wurk::Limiter.redis { |c| c.call('ZCARD', state_key).to_i } end |
#state_keys ⇒ Object (protected)
73 74 75 |
# File 'lib/wurk/limiter/concurrent.rb', line 73 def state_keys [state_key, stats_key] end |
#status ⇒ Object
Uniform { used:, limit:, reset_at:, available? } (#16) merged with
the concurrent-only metric counters (§1.5) the dashboard already
renders. Slots free on release rather than on a clock, so reset_at
is the soonest in-flight slot expiry (a worst-case "available by"),
or nil when idle.
31 32 33 34 35 |
# File 'lib/wurk/limiter/concurrent.rb', line 31 def status used = size build_status(used: used, limit: @options[:limit], reset_at: soonest_expiry) .merge(metrics) end |
#type ⇒ Object
20 |
# File 'lib/wurk/limiter/concurrent.rb', line 20 def type = :concurrent |
#within_limit(&block) ⇒ Object
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
# File 'lib/wurk/limiter/concurrent.rb', line 37 def within_limit(&block) raise ArgumentError, 'block required' unless block started = monotime deadline = started + @options[:wait_timeout] slot = random_id acquired_at = nil loop do result = acquire(slot) if result[0].to_i == 1 acquired_at = monotime break end return if @options[:policy] == :ignore remaining = deadline - monotime if remaining <= 0 bump_counter('overages') raise OverLimit, self end sleep [remaining, WAIT_SLEEP].min end begin incr_immediate_or_waited(acquired_at - started) block.call ensure release(slot) bump_counter('held_time', (monotime - acquired_at).to_i) if acquired_at end end |