diff --git a/config.py b/config.py index 782aa72..149aa43 100644 --- a/config.py +++ b/config.py @@ -1,2 +1,13 @@ -config = dict() +config = { + 'db': { + 'connection': { + 'host' : '127.0.0.1', + 'user' : 'root', + 'passwd' : 'password', + 'db' : 'earthworm', + 'charset' : 'utf8mb4', + }, + }, +} + diff --git a/db/connection.py b/db/connection.py new file mode 100644 index 0000000..be3d60e --- /dev/null +++ b/db/connection.py @@ -0,0 +1,63 @@ + +import datetime +import traceback +import MySQLdb + + +class Connection: + def __init__(self, pool, internal, readonly = True): + self.pool = pool + self.internal = internal + self.readonly = readonly + self.finished = True + self.begin() + + def cursor(self): + assert not self.finished + return self.internal.cursor(MySQLdb.cursors.DictCursor) + + def insert_id(self, *args, **kwargs): + assert not self.finished + return self.internal.insert_id(*args, **kwargs) + + def escape(self, *args, **kwargs): + r = self.internal.escape(*args, **kwargs) + return r.decode("utf8") if type(r) is bytes else r + + def escape_string(self, *args, **kwargs): + r = self.internal.escape_string(*args, **kwargs) + return r.decode("utf8") if type(r) is bytes else r + + def begin(self): + assert self.finished + self.finished = False + with self.cursor() as cursor: + cursor.execute("SET autocommit=0") + if self.readonly: + with self.cursor() as cursor: + cursor.execute("SET TRANSACTION ISOLATION LEVEL REPEATABLE READ") + with self.cursor() as cursor: + cursor.execute("START TRANSACTION READ ONLY, WITH CONSISTENT SNAPSHOT") + else: + with self.cursor() as cursor: + cursor.execute("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE") + with self.cursor() as cursor: + cursor.execute("START TRANSACTION READ WRITE") + self.now = datetime.datetime.now(self.pool.server.config['timezone']) + + def commit(self): + assert not self.finished + self.internal.commit() + self.finished = True + + def rollback(self): + assert not self.finished + self.internal.rollback() + self.finished = True + + def release(self): + if not self.finished: + try: self.rollback() + except Exception as e: + print(traceback.format_exc()) + print(e) diff --git a/db/holder.py b/db/holder.py new file mode 100644 index 0000000..c14dbeb --- /dev/null +++ b/db/holder.py @@ -0,0 +1,22 @@ + +from db.connection import Connection + + +class Holder: + def __init__(self, pool, internal = None, readonly = True): + self.pool = pool + self.internal = internal + self.readonly = readonly + + def __enter__(self): + if not self.internal: + self.internal = self.pool.asquire(self.readonly) + self.connection = Connection( + self.pool, + internal = self.internal, + readonly = self.readonly ) + return self.connection + + def __exit__(self, _exc_type, _exc_value, _traceback): + self.connection.release() + self.pool.release(self.connection.internal, self.readonly) diff --git a/db/pool.py b/db/pool.py new file mode 100644 index 0000000..8e81db4 --- /dev/null +++ b/db/pool.py @@ -0,0 +1,74 @@ + +import threading +import time +import MySQLdb + + +class Pool: + def __init__(self, server): + self.server = server + self.condition = threading.Condition(threading.Lock()) + + self.size = { + False: self.server.config['db']['pool']['write'], + True: self.server.config['db']['pool']['read'] } + self.busy = { False: [], True: [] } + self.ready = { False: [], True: [] } + + + def asquire(self, readonly): + type_name = "read" if readonly else "write" + count = 0 + + with self.condition: + while True: + count = len(self.ready[readonly]) + len(self.busy[readonly]) + if len(self.ready[readonly]) > 0: + connection = self.ready[readonly][0]; + del self.ready[readonly][0] + try: + with connection.cursor() as cursor: + cursor.execute("SELECT 1") + except Exception: + print("close broken %s db connection (count: %s)" % (type_name, count - 1)) + try: connection.close() + except Exception: pass + continue + self.busy[readonly].append(connection) + return connection + elif len(self.busy[readonly]) + len(self.ready[readonly]) < self.size[readonly]: + self.busy[readonly].append(None) + break + else: + self.condition.wait() + + print("open new %s db connection (count: %s)" % (type_name, count + 1)) + connection = None + for _ in range(1, 10): + try: + connection = MySQLdb.connect(**self.server.config['db']['connection']) + except Exception: + time.sleep(self.server.config['db']['retrytime']) + if not connection: + connection = MySQLdb.connect(**self.server.config['db']['connection']) + + with self.condition: + self.busy[readonly].remove(None) + self.busy[readonly].append(connection) + return connection + + + def release(self, connection, readonly): + with self.condition: + self.busy[readonly].remove(connection) + if len(self.busy[readonly]) + len(self.ready[readonly]) < self.size[readonly]: + connection.rollback() + self.ready[readonly].append(connection) + self.condition.notify() + else: + type_name = "read" if readonly else "write" + count = len(self.ready[readonly]) + len(self.busy[readonly]) + print("close extra %s database connection (count: %s)" % (type_name, count - 1)) + try: connection.close() + except Exception: pass + diff --git a/deps.txt b/deps.txt new file mode 100644 index 0000000..91e7eb3 --- /dev/null +++ b/deps.txt @@ -0,0 +1,8 @@ + +datetime +threading +time +traceback + +MySQLdb + diff --git a/main.py b/main.py index 930eccd..5ca7db4 100644 --- a/main.py +++ b/main.py @@ -1,18 +1,31 @@ -import uwsgi - from config import config from server import Server from request import Request +from db.holder import Holder as ConnectionHolder import template.common + server = Server(config) + def application(env, start_response): request = Request(server, env, start_response) - request.template = template.common.instance - - content = '
' + request.t("Hello World!") + '
' \ - + '' + "Env:\n" + str(env) + '
' - return request.complete_content(content) + readonly = request.method == 'GET' + with ConnectionHolder(request.server.dbpool, readonly = readonly) as conn: + request.connection = conn + + request.template = template.common.instance + content = '' + request.t("Hello World!") + '
' \ + + '' + "Env:\n" + str(env) + '
' + + tables = [] + with request.connection.cursor() as cursor: + cursor.execute('SHOW TABLES') + for row in cursor: + tables.append(str(list(row.values())[0])) + content += 'DB tables: ' + ', '.join(tables) + '
' + + return request.complete_content(content) + diff --git a/request.py b/request.py index 66d86be..ea7e913 100644 --- a/request.py +++ b/request.py @@ -9,6 +9,10 @@ class Request: self.server = server self.environ = environ self.start_response = start_response + self.method = str(self.environ["REQUEST_METHOD"]) + assert self.method == 'GET' or self.method == 'POST' + + self.connection = None self.session = None self.template = None diff --git a/server.py b/server.py index 2e0a6a6..9167496 100644 --- a/server.py +++ b/server.py @@ -1,6 +1,34 @@ +import datetime +from db.pool import Pool + + class Server: def __init__(self, config): - self.config = config - self.urlprefix = config.get('urlprefix', '') - self.urlprefix_data = self.urlprefix + '/data' + urlprefix = str(config.get('urlprefix', '')) + + config_db = config.get('db', dict()) + config_db_pool = config_db.get('pool', dict()) + + self.config = { + 'urlprefix' : urlprefix, + 'urldataprefix' : str(config.get('urldataprefix', urlprefix + '/data')), + 'timezone' : config.get('timezone', datetime.timezone.utc), + + 'db' : { + 'connection' : dict(config_db.get('connection', dict())), + 'retrytime' : float(config_db.get('retrytime', 0)), + 'pool': { + 'read' : int(config_db_pool.get('read' , 10)), + 'write' : int(config_db_pool.get('write', 10)), + }, + }, + } + + assert type(self.config['timezone']) is datetime.timezone + assert self.config['db']['retrytime'] >= 0 + assert self.config['db']['pool']['read'] > 0 + assert self.config['db']['pool']['write'] > 0 + + self.dbpool = Pool(self) + diff --git a/template/common.py b/template/common.py index 63768f1..c3fa52d 100644 --- a/template/common.py +++ b/template/common.py @@ -10,11 +10,11 @@ class Common(Template):