class Right:
def __init__(self, row):
self.user_id = int(row['user_id'])
self.target_type = str(row['target_type'])
self.target_id = int(row['target_id'])
self.mode = str(row['mode'])
class MyRights:
def __init__(self, internal):
self.internal = internal
self.user_id = self.internal.user_id
def issuperuser(self):
return self.internal.issuperuser()
def isallowed(self, target_type, target_id, mode):
return self.internal.isallowed(target_type, target_id, mode)
def isallowed_root(self, mode):
return self.internal.isallowed_root(mode)
class InternalRights:
TABLE = 'rights'
ROOT = 'root'
def __init__(self, connection, user_id = 0, superuser = False):
assert(type(user_id) is int)
assert(type(superuser) is bool)
self.connection = connection
self.user_id = user_id
self.superuser = superuser
def issuperuser(self):
if self.superuser:
return True
if self.user_id and self.get(self.user_id, self.ROOT, 0, self.ROOT):
return True
def isallowed(self, target_type, target_id, mode):
if self.issuperuser():
return True
if self.user_id and self.get(self.user_id, target_type, target_id, mode):
return True
return False
def build_where(self, user_id = None, target_type = None, target_id = None, mode = None):
where = list()
args = list()
if user_id:
assert(type(user_id) is int)
where.append("`user_id`=%d")
args.append(user_id)
if target_type:
assert(type(target_type) is str)
where.append("`target_type`=%s")
args.append(target_type)
if target_id:
assert(type(target_id) is int)
where.append("`target_id`=%d")
args.append(target_id)
if mode:
assert(type(mode) is str)
where.append("`mode`=%s")
args.append(mode)
if not where:
return '', list()
return ' WHERE ' + ' AND '.join(where), args
def get_list(self, user_id = None, target_type = None, target_id = None, mode = None):
where, args = self.build_where(user_id, target_type, target_id, mode)
rows = connection.query_dict('SELECT * FROM %T' + where, self.TABLE, *args)
rights = list()
for row in rows:
rights.append( Right(row) )
return rights
def delete_list(self, user_id = None, target_type = None, target_id = None, mode = None):
where, args = self.build_where(user_id, target_type, target_id, mode)
self.connection.execute('DELETE FROM %T' + where, self.TABLE, *args)
self.connection.cache.reset(self.TABLE)
def reset_cache(self, user_id, target_type, target_id, mode):
assert(type(user_id) is int)
assert(type(target_type) is str)
assert(type(target_id) is int)
assert(type(mode) is str)
self.connection.cache.reset(
self.TABLE,
{ 'user_id': user_id,
'target_type': target_type,
'target_id': target_id,
'mode': mode })
def get(self, user_id, target_type, target_id, mode):
assert(type(user_id) is int)
assert(type(target_type) is str)
assert(type(target_id) is int)
assert(type(mode) is str)
rows = self.connection.cache.select(
self.TABLE,
{ 'user_id': user_id,
'target_type': target_type,
'target_id': target_id,
'mode': mode })
return bool(rows)
def set(self, user_id, target_type, target_id, mode, allowed):
assert(type(user_id) is int)
assert(type(target_type) is str)
assert(type(target_id) is int)
assert(type(mode) is str)
if not allowed:
self.connection.execute(
'''DELETE FROM %T WHERE
`user_id`=%d
AND `target_type`=%s
AND `target_id`=%d
AND `mode`=%s''',
self.TABLE, user_id, target_type, target_id, mode )
self.reset_cache(user_id, target_type, target_id, mode)
elif not self.get(user_id, target_type, target_id, mode):
self.connection.execute(
'''INSERT INTO %T SET
`user_id`=%d,
`target_type`=%s,
`target_id`=%d,
`mode`=%s''',
self.TABLE, user_id, target_type, target_id, mode )
self.reset_cache(user_id, target_type, target_id, mode)
def isallowed_root(self, mode):
return self.isallowed(self.ROOT, 0, mode)
def get_list_root(self, user_id = None, mode = None):
return self.get_list(user_id, self.ROOT, 0, mode)
def delete_list_root(self, user_id = None, mode = None):
return self.delete_list_root(user_id, self.ROOT, 0, mode)
def get_root(self, user_id, mode):
return self.get(user_id, self.ROOT, 0, mode)
def set_root(self, user_id, mode, allowed):
return self.set(user_id, self.ROOT, 0, mode, allowed)
def get_superuser(self, user_id):
return self.get_root(user_id, self.ROOT)
def set_superuser(self, user_id, allowed):
return self.set_root(user_id, self.ROOT, allowed)