Blame db/pool.py

cbf076
cbf076
import threading
cbf076
import time
cbf076
import MySQLdb
cbf076
cbf076
cbf076
class Pool:
cbf076
  def __init__(self, server):
cbf076
    self.server = server
cbf076
    self.condition = threading.Condition(threading.Lock())
cbf076
cbf076
    self.size  = {
cbf076
      False: self.server.config['db']['pool']['write'],
cbf076
      True:  self.server.config['db']['pool']['read'] }
cbf076
    self.busy  = { False: [], True: [] }
cbf076
    self.ready = { False: [], True: [] }
cbf076
cbf076
cbf076
  def asquire(self, readonly):
cbf076
    type_name = "read" if readonly else "write"
cbf076
    count = 0
cbf076
cbf076
    with self.condition:
cbf076
      while True:
cbf076
        count = len(self.ready[readonly]) + len(self.busy[readonly])
cbf076
        if len(self.ready[readonly]) > 0:
cbf076
          connection = self.ready[readonly][0];
cbf076
          del self.ready[readonly][0]
cbf076
          try:
cbf076
            with connection.cursor() as cursor:
cbf076
              cursor.execute("SELECT 1")
cbf076
          except Exception:
cbf076
            print("close broken %s db connection (count: %s)" % (type_name, count - 1))
cbf076
            try: connection.close()
cbf076
            except Exception: pass
cbf076
            continue
cbf076
          self.busy[readonly].append(connection)
cbf076
          return connection
cbf076
        elif len(self.busy[readonly]) + len(self.ready[readonly]) < self.size[readonly]:
cbf076
          self.busy[readonly].append(None)
cbf076
          break
cbf076
        else:
cbf076
          self.condition.wait()
cbf076
cbf076
    print("open new %s db connection (count: %s)" % (type_name, count + 1))
cbf076
    connection = None
cbf076
    for _ in range(1, 10):
cbf076
      try:
cbf076
        connection = MySQLdb.connect(**self.server.config['db']['connection'])
cbf076
      except Exception:
cbf076
        time.sleep(self.server.config['db']['retrytime'])
cbf076
    if not connection:
cbf076
      connection = MySQLdb.connect(**self.server.config['db']['connection'])
4656ad
cbf076
    with self.condition:
cbf076
      self.busy[readonly].remove(None)
cbf076
      self.busy[readonly].append(connection)
cbf076
    return connection
cbf076
cbf076
cbf076
  def release(self, connection, readonly):
cbf076
    with self.condition:
cbf076
      self.busy[readonly].remove(connection)
cbf076
      if len(self.busy[readonly]) + len(self.ready[readonly]) < self.size[readonly]:
cbf076
        connection.rollback()
cbf076
        self.ready[readonly].append(connection)
cbf076
        self.condition.notify()
cbf076
      else:
cbf076
        type_name = "read" if readonly else "write"
cbf076
        count = len(self.ready[readonly]) + len(self.busy[readonly])
cbf076
        print("close extra %s database connection (count: %s)" % (type_name, count - 1))
cbf076
        try: connection.close()
cbf076
        except Exception: pass
cbf076