|
|
cbf076 |
|
|
|
cbf076 |
import threading
|
|
|
cbf076 |
import time
|
|
|
cbf076 |
import MySQLdb
|
|
|
cbf076 |
|
|
|
cbf076 |
|
|
|
cbf076 |
class Pool:
|
|
|
cbf076 |
def __init__(self, server):
|
|
|
cbf076 |
self.server = server
|
|
|
cbf076 |
self.condition = threading.Condition(threading.Lock())
|
|
|
cbf076 |
|
|
|
cbf076 |
self.size = {
|
|
|
cbf076 |
False: self.server.config['db']['pool']['write'],
|
|
|
cbf076 |
True: self.server.config['db']['pool']['read'] }
|
|
|
cbf076 |
self.busy = { False: [], True: [] }
|
|
|
cbf076 |
self.ready = { False: [], True: [] }
|
|
|
cbf076 |
|
|
|
cbf076 |
|
|
|
cbf076 |
def asquire(self, readonly):
|
|
|
cbf076 |
type_name = "read" if readonly else "write"
|
|
|
cbf076 |
count = 0
|
|
|
cbf076 |
|
|
|
cbf076 |
with self.condition:
|
|
|
cbf076 |
while True:
|
|
|
cbf076 |
count = len(self.ready[readonly]) + len(self.busy[readonly])
|
|
|
cbf076 |
if len(self.ready[readonly]) > 0:
|
|
|
cbf076 |
connection = self.ready[readonly][0];
|
|
|
cbf076 |
del self.ready[readonly][0]
|
|
|
cbf076 |
try:
|
|
|
cbf076 |
with connection.cursor() as cursor:
|
|
|
cbf076 |
cursor.execute("SELECT 1")
|
|
|
cbf076 |
except Exception:
|
|
|
cbf076 |
print("close broken %s db connection (count: %s)" % (type_name, count - 1))
|
|
|
cbf076 |
try: connection.close()
|
|
|
cbf076 |
except Exception: pass
|
|
|
cbf076 |
continue
|
|
|
cbf076 |
self.busy[readonly].append(connection)
|
|
|
cbf076 |
return connection
|
|
|
cbf076 |
elif len(self.busy[readonly]) + len(self.ready[readonly]) < self.size[readonly]:
|
|
|
cbf076 |
self.busy[readonly].append(None)
|
|
|
cbf076 |
break
|
|
|
cbf076 |
else:
|
|
|
cbf076 |
self.condition.wait()
|
|
|
cbf076 |
|
|
|
cbf076 |
print("open new %s db connection (count: %s)" % (type_name, count + 1))
|
|
|
cbf076 |
connection = None
|
|
|
cbf076 |
for _ in range(1, 10):
|
|
|
cbf076 |
try:
|
|
|
cbf076 |
connection = MySQLdb.connect(**self.server.config['db']['connection'])
|
|
|
cbf076 |
except Exception:
|
|
|
cbf076 |
time.sleep(self.server.config['db']['retrytime'])
|
|
|
cbf076 |
if not connection:
|
|
|
cbf076 |
connection = MySQLdb.connect(**self.server.config['db']['connection'])
|
|
|
4656ad |
|
|
|
cbf076 |
with self.condition:
|
|
|
cbf076 |
self.busy[readonly].remove(None)
|
|
|
cbf076 |
self.busy[readonly].append(connection)
|
|
|
cbf076 |
return connection
|
|
|
cbf076 |
|
|
|
cbf076 |
|
|
|
cbf076 |
def release(self, connection, readonly):
|
|
|
cbf076 |
with self.condition:
|
|
|
cbf076 |
self.busy[readonly].remove(connection)
|
|
|
cbf076 |
if len(self.busy[readonly]) + len(self.ready[readonly]) < self.size[readonly]:
|
|
|
cbf076 |
connection.rollback()
|
|
|
cbf076 |
self.ready[readonly].append(connection)
|
|
|
cbf076 |
self.condition.notify()
|
|
|
cbf076 |
else:
|
|
|
cbf076 |
type_name = "read" if readonly else "write"
|
|
|
cbf076 |
count = len(self.ready[readonly]) + len(self.busy[readonly])
|
|
|
cbf076 |
print("close extra %s database connection (count: %s)" % (type_name, count - 1))
|
|
|
cbf076 |
try: connection.close()
|
|
|
cbf076 |
except Exception: pass
|
|
|
cbf076 |
|