神刀安全网

Concurrency – TimedSemaphore implementation in Ruby

Ruby is a programming language with a focus on simplicity and productivity. However, it lacks in performance comparing to other programming languages, especially in applications which make a lot of non-blocking operations like database and network calls. Concurrency can help with this, even though Ruby doesn’t support true multithreading (talking about MRI). The problem is that Ruby doesn’t provide many synchronization primitives, like some other languages designed with concurrency in mind (e.g Go).

Ruby’s standard library provides following synchronization mechanisms: Mutex , Monitor , and  ConditionVariable . They can lead to complex code if used a lot. Yet, they are enough to build higher level mechanisms on top of them, like semaphores, countdown latches, cyclic barriers, etc. I suggest taking a look at the concurrent-ruby gem. This library implements modern concurrency tools, inspired by other languages like Erlang, Java, and others.

A TimedSemaphore is a specialized implementation of a Semaphore that gives a number of permits in a given time frame. The idea is taken from the Apache Commons Lang package . A use case for it is to limit the load on a resource. Consider an application that issues network calls to an external API to collect some data. Typical API’s have limitations based on calls per second. A counting semaphore couldn’t be used, since for example, 10 calls per second aren’t the same as 10 calls at a time. Here a TimedSemaphore could be used to guarantee that only a given number of API calls are issued per second.

Here is the code for the TimedSemaphore:

require 'concurrent'   class TimedSemaphore   def initialize(num_of_ops, num_of_seconds)     @count = 0     @limit = num_of_ops     @period = num_of_seconds     @lock = Monitor.new     @condition = @lock.new_cond     @timer_task = nil   end     def acquire     @lock.synchronize do       # Sleep thread if all available permits are exhausted       @condition.wait while @limit > 0 && @count == @limit       @count += 1       # Start the timer for releasing all acquired permits       start_timer if @timer_task.nil?     end   end     private     def start_timer     @lock.synchronize do       @timer_task = Concurrent::ScheduledTask.execute(@period) { end_of_period }     end   end     def end_of_period     @lock.synchronize do       @timer_task = nil       @count = 0       # Wakes up all sleeping threads waiting for this condition       @condition.broadcast     end   end end 

A thread can request a permit using the acquire method. However, there is an additional timing dimension: a thread cannot release his permit, but all permits are automatically released at the end of the given time. If a thread calls acquire and the available permits are already used for this time frame, the thread waits. After the time frame ends all permits requested so far are released and waiting threads are waked up again, so that they can try to acquire a new permit. This basically means that in the specified number of seconds only the specified number of operations is possible.

Here is a basic example of using a TimedSemaphore:

threads = [] semaphore = TimedSemaphore.new(2, 3)   10.times do   threads << Thread.new do     semaphore.acquire     puts Time.now   end end   threads.map(&:join) 

This example will produce something like this:

2016-03-20 23:32:48 +0100 2016-03-20 23:32:48 +0100 2016-03-20 23:32:51 +0100 2016-03-20 23:32:51 +0100 2016-03-20 23:32:54 +0100 2016-03-20 23:32:54 +0100 2016-03-20 23:32:57 +0100 2016-03-20 23:32:57 +0100 2016-03-20 23:33:00 +0100 2016-03-20 23:33:00 +0100 

转载本站任何文章请注明:转载至神刀安全网,谢谢神刀安全网 » Concurrency – TimedSemaphore implementation in Ruby

分享到:更多 ()

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址
分享按钮