import threading
import time
import MySQLdb
class Pool:
def __init__(self, server):
self.server = server
self.condition = threading.Condition(threading.Lock())
self.size = {
False: self.server.config['db']['pool']['write'],
True: self.server.config['db']['pool']['read'] }
self.busy = { False: [], True: [] }
self.ready = { False: [], True: [] }
def asquire(self, readonly):
type_name = "read" if readonly else "write"
count = 0
with self.condition:
while True:
count = len(self.ready[readonly]) + len(self.busy[readonly])
if len(self.ready[readonly]) > 0:
connection = self.ready[readonly][0];
del self.ready[readonly][0]
try:
with connection.cursor() as cursor:
cursor.execute("SELECT 1")
except Exception:
print("close broken %s db connection (count: %s)" % (type_name, count - 1))
try: connection.close()
except Exception: pass
continue
self.busy[readonly].append(connection)
return connection
elif len(self.busy[readonly]) + len(self.ready[readonly]) < self.size[readonly]:
self.busy[readonly].append(None)
break
else:
self.condition.wait()
print("open new %s db connection (count: %s)" % (type_name, count + 1))
connection = None
for _ in range(1, 10):
try:
connection = MySQLdb.connect(**self.server.config['db']['connection'])
except Exception:
time.sleep(self.server.config['db']['retrytime'])
if not connection:
connection = MySQLdb.connect(**self.server.config['db']['connection'])
with self.condition:
self.busy[readonly].remove(None)
self.busy[readonly].append(connection)
return connection
def release(self, connection, readonly):
with self.condition:
self.busy[readonly].remove(connection)
if len(self.busy[readonly]) + len(self.ready[readonly]) < self.size[readonly]:
connection.rollback()
self.ready[readonly].append(connection)
self.condition.notify()
else:
type_name = "read" if readonly else "write"
count = len(self.ready[readonly]) + len(self.busy[readonly])
print("close extra %s database connection (count: %s)" % (type_name, count - 1))
try: connection.close()
except Exception: pass