Since Try Monad, Progress and Lockdown Rules and In the meanwhile, I wrote an interesting (well I thought so) blog about Enterprise System Integration.

This is having spent the last two years of my life integrating (in more ways than one!). It was an interesting reflection, perhaps therapeutic for me even.

Apart from that, I came across an interesting scenario in some source code recently. The scenario is this:

  • There is a set number of database connections available for each database i.e it has a connection pool.
  • There are multiple consumers of the connection pool. Each consumer can fork off multiple threads and each one can go off and access the database.
  • Each thread that accesses the database would consume one of these connections.
  • The scenario has multiple databases, i.e shards and each database has its own connection pool of x connections.

The following code was set up to control/limit the number of threads that can access a shard, such that only x threads can do so i.e only n threads for the connection pool's n connections in the connection pool.

This following specifies that for whichever database shard you're (a thread) is trying to connect to, there is a mutex you can own to isolate that access(and thus db connection) to your thread only. There are as many mutexes per shard as there are db connections in the connection pool.

Once it has a mutex it can lock it out for the duration of its database access (yield).

In code:

@@shard_mutex[shard].sample.synchronize do
        ::Multidb.use(shard) do
          yield if block_given?
        end
end

Basically, it's limiting how many threads can connect at once, as to not run out of database connections. 

The reason I was in this code was that it was deadlocking. This is the hand I was dealt. Also look at the way it randomly provides mutexes to threads to acquire, via the sample function... 

Then I remembered something "Acquire multiple locks in a fixed, global order". This sage advice comes from "Seven Concurrency Models in Seven Weeks" by Butcher which is staring down at me as I write this... But its also described here

And that when I started to realise that the above code randomly distributes mutex locked in no particular order.

There is also a re-entrant issue: If the block that is yielded somehow calls the above code again i.e it's recursive, then you'll get a deadlock - because you might get the same mutex as the one you originally obtained (a sample), only this time you'll be waiting for yourself(the first call which obtained the lock) to release the lock... deadlock 101.

I decided to try it without assigning mutexes randomly to threads and also not have threads potentially waiting on each other (two threads trying to access the same shard). I ended up with a function called wait_for_logical_connection() which drives a thread-safe Queue which blocks thread access to it when it's empty.

shard_result = wait_for_logical_connection shard do
        ::Multidb.use(shard) do
          yield if block_given?
        end
end

Basically, behind wait_for_logical_connection (shown below), this uses a Queue called shard_thread_leases which will block all threads if this queue is empty.

Its no longer is an array of x mutexes(we have x connections in the pool) per shard (which seemed a bit excessive to me to begin with) but is now an array of Queues - one for each shard.

Each thread wanting access to a shard can pick up a lease (an integer, not a mutex) from the front of the queue, and then it can do its database work and give the lease back. This is put back on the back of the queue and now will free up any thread waiting on the queue(if it's empty at this point).

No thread now obtains a mutex, just a number. The Queue handles blocking threads trying to obtain a number(lease). One mutex - the queue.

  def wait_for_logical_connection(shard)
    logger.debug "Waiting to access db on shard: #{shard}, thread leases left: #{@@shard_thread_leases[shard].size})"
    lease = @@shard_thread_leases[shard].pop # threads wait until more leases are available
    result = nil
    begin
      logger.debug  "Performing db access on shard: #{shard}, thread leases left: #{@@shard_thread_leases[shard].size})"
      result = yield
    ensure
     @@shard_thread_leases[shard].push lease # give back lease (add to back of queue)
    end
    result
  end

It now uses an ordered shard connection leases queue instead of possibly sharing mutexes.

The internals of the queue favours a simple lease system of an integer instead of a mutex:

@@shard_thread_leases ||= begin
        c = ThreadSafe::Cache.new
        pool_size = ::ActiveRecord::Base.connection.pool.instance_variable_get(:@size) || 10
        ::Multidb.main_shards.each do |s|
          c[s] = Queue.new # Limit of logical connections is based on limit on physical database connections.
          1.upto(pool_size) { |i| c[s] << i  } # A logical connection lease is object that can be checked in/out  
        end
        c
      end

Any thread can release its connection lease to the pool allowing any other threads (waiting on the pool) to be unblocked.

Any thread that does not release its connection lease(for whatever reason) will not hold up the remaining threads that are waiting. i.e threads wait for the queue(to have more leases), and not specifically individual threads holding a shared mutex which other threads need to be released first.

Each thread fetches a 'lease' of the queue and puts it back once it's done accessing the database. There are no more mutexes - only the queue is responsible for blocking threads which it cannot 'pop' or provide a lease to a thread wanting access to the db. If so whatever reason the calling code passing in a lambda that calls this code again, one of the other threads that returns a lease will unlock it.  

 

I shall sleep well tonight!


Comments powered by CComment