The Blog
Ensuring balanced reads with PyMongo
Over the past year we’ve been evaluating and migrating many of our game services over to mongoDB. Its feature set gives us complete freedom to iterate with developers during game production, its performance is sufficient to power our infrastructure during peak load, our operations team appreciate its powerful administration tools, and 10gen has so far proven itself a reliable technology partner and steward of the Mongo roadmap. One of the very important features of PyMongo that we have made use of is the MasterSlaveConnection, but it’s not without its caveats, one of those being that it does not guarantee you are balancing your reads across a replica set.
Before we dig in, a few items of note. First, the situation I’m covering here is specifically when you’re connecting directly to a replica set. There may be a similar pattern to follow on a sharded database, but that’s outside the scope of this post and what our team is working with. Secondly, if you’re not familiar with the MasterSlaveConnection, the gist of the class is that it directs all writes to the current master, and randomly choses a slave for each read-only query. Lastly, we have some pending patches to pymongo 1.9 that I recommend applying before you begin, as they’ll affect your ability to use the MasterSlaveConnection and seamlessly restart or shutdown hosts in your replica set.
The situation we’re correcting for here is where we have a replica set, with one or more slaves, and we want to maintain balanced reads across them. Our application is long running, and at any time the operations team may remove a host from the replica set to perform maintenance. Mongo and pymongo will work together to ensure high availability failover, but once you’ve brought your replica set back to full capacity, your application will not connect to that host unless there’s another socket disconnect, and even then not guarantee it will connect to the original host. You’ve now lost a substantial amount of performance and scalability until your application has restarted.
from pymongo.connection import Connection, _str_to_nodefrom pymongo.master_slave_connection import MasterSlaveConnectionfrom pymongo.errors import AutoReconnect,DuplicateKeyError,CollectionInvalid
import time
# validate slaves every 5 minutesVALIDATE_INTERVAL = 5 * 60
class ClusterConnection(MasterSlaveConnection): def __init__(self, *args, **kwargs): super(ClusterConnection,self).__init__(*args, **kwargs) self._last_validate_time = time.time()
# This is a good time to overload the tz_aware property if the default value of # True doesn't work for your use. @property def tz_aware(self): return False
def validate_slaves(self, slave_uris): ''' If we're at the check interval, confirm that all slaves are connected to their intended hosts and if not, reconnect them. ''' if time.time()-self._last_validate_time < VALIDATE_INTERVAL: return
hosts_ports = [_str_to_node(uri) for uri in slave_uris]
# Walk a copy of the current slave list so that we can manipulate it. For # each connection that is not pointing to a configured slave, disconnect # it and remove from the list. for slave in self.slaves[:]: host_port = (slave._Connection__host, slave._Connection__port) if host_port not in hosts_ports: slave.disconnect() self.slaves.remove( slave ) else: hosts_ports.remove( host_port )
# For all hosts where there wasn't an existing connection, create one for host,port in hosts_ports: self.slaves.append( Connection( host=host, port=port, slave_okay=True, _connect=False) )
self._last_validate_time = time.time()
def with_reconnect(func): ''' Handle when AutoReconnect is raised from pymongo. This is the standard error raised for everything from "host disconnected" to "couldn't connect to host" and more.
The sleep handles the edge case when the state of a replica set changes, and the cursor raises AutoReconnect because the master may have changed. It can take some time for the replica set to stop raising this exception, and the small sleep and iteration count gives us a couple of seconds before we fail completely. See also http://jira.mongodb.org/browse/PYTHON-216 ''' @functools.wraps(func) def _reconnector(*args, **kwargs): for x in xrange(0,20): try: return func(*args, **kwargs) except AutoReconnect: time.sleep(0.250) pass raise return _reconnector
class ApplicationDatabaseInterface(object): ''' An example of a class you'd use for interfacing with the database. '''
def __init__(self, *args, **kwargs): self._connection = None self._hosts = kwargs.get('hosts') # a list of all hosts including the master
@with_reconnect def query(self, q): conn = self.connection() # TODO: perform the query
def connection(self): ''' Get the current connection to use for the transaction. Opens new connection if there isn't on already. ''' rval = self._connection if not rval: rval = ClusterConnection( Connection( self._hosts ), [Connection(host, slave_okay=True, _connect=False) for host in self._hosts] ) else: rval.validate_slaves( self._hosts ) return rvalRead through connection() to understand the basics of instantiating a MasterSlaveConnection, or in this case our subclass ClusterConnection. It is initialized with a single Connection to the master, and then for each host in our list, we create a slave Connection. The Connection class is smart in that it can maintain a list of hosts, both configured and discovered from the replica set, and connect to any one of them at any time. The master Connection will always connect to the current master out of our list of hosts and handle when the master changes. The slave connections are configured with slave_okay=True so that they’ll stay connected to whichever host we tell them and failover to other slaves, and _connect=False is passed so that if a slave is not currently available, Connection and MasterSlaveConnection can kindly follow their AutoReconnect paths without interrupting your application. So long as you have just the master up, you’ll never know that the replica set may be impaired at the time you initialize the connection.
The class ApplicationDatabaseInterface is simply an example of whatever you use to maintain an interface to the database within your application. The only requirement here is that we have some ability to cache an existing MasterSlaveConnection and call validate_slaves() on it before we use it. I recommend validating the slaves only once per application “transaction”, however that may apply to your use case (see query() in the example above).
The validate_slaves() method is really where the important work is done. This example implementation will check for us every 5 minutes, but your criteria and interval can be whatever suites your needs. First it uses the same code as pymongo to parse the slave URIs so you can support host and port schemes. It walks through all of the slaves and if the current host and port of a Connection are not in the list of configured hosts, closes that connection and removes it from the slave list. If there are any hosts in our list of slaves which do not have a connection, it then re-populates the slave list with a fresh Connection for that host. The new connection also includes the _connect=False flag, so we’ll never get an AutoReconnect exception in case a slave is not available at the time this code is run.
The reason we remove connections from the slave list is because the Connection class doesn’t maintain its internal list of possible hosts in any priority. If we simply closed the connection it will reconnect, but we aren’t guaranteed that it will reconnect to its originally-configured host, and that’s what we’re trying to ensure.
The last important bit of this is the with_reconnect decorator. It’s not strictly required, but for completeness, you should have something like this in your stack when communicating with Mongo. In addition to connection drops, there are situations such as this where the driver will raise an AutoReconnect because the replica set is in transition. I’ve found that 2 seconds is about the maximum amount of time it takes for pymongo and the replica set to agree that all is well with the world, so this example decorator gives enough time for that situation to resolve itself.







Comments
[...] and intelligently sending the writes and reads to master and slave respectively. Inspired by Aaron Westendorf‘s blog-post, we extended this module to support a) balanced reads to secondaries b) [...]
Inspired by Aaron Westendorfâs gist, we have developed module to:
- Balanced read-write:
** all writes are going to master
** all reads are going to slaves
** reads are directed to master only if there are no slaves in ReplicaSet – Define set of ReplicaSets and their nodes
- Bind collections to particular ReplicaSets
- Provide convenient methods to get without worrying about mapping between Replica Set and Collection itself
- Get “w” number that stands for number of active slaves in the Replica Set
- Get Fixed connection, that shall be used by map/reduce tasks in ReplicaSet
Code is available at GitHub: https://gist.github.com/1365738