import exception
from model.base import ModelItemBase, ModelManagerBase
from model.users import User
class Sslcert(ModelItemBase):
def __init__(self, sslcerts, row, user = None):
super().__init__(sslcerts, row)
self.id = int(row['id'])
self.user_id = int(row['user_id'])
self.data = str(row['data'])
assert(not user or (type(user) is User and user.id == self.user_id))
self.user = user
def get_user(self):
if self.user == None:
self.user = self.model.users.get_by_id(self.user_id)
assert(self.user)
assert(self.user.id == self.user_id)
return self.user
def reset_cache(self):
self.manager.reset_cache(self.id, self.data)
def can_delete(self):
return self.user_id == self.rights.user_id or self.rights.issuperuser()
def delete(self):
if self.can_delete():
self.connection.execute(
'DELETE FROM %T WHERE `id`=%d',
self.table(), self.id )
self.reset_cache()
else:
raise exception.ModelDeny()
class Sslcerts(ModelManagerBase):
def table(self):
return 'sslcerts'
def itemtype(self):
return Sslcert
def reset_cache(self, id, data):
super().reset_cache(id)
self.connection.cache.reset(self.table(), {'data': data})
def extract_data(self, data):
prefix = '-----BEGIN CERTIFICATE-----'
suffix = '-----END CERTIFICATE-----'
data = str(data)
i0 = data.find(prefix)
i1 = data.find(suffix)
if i0 >= 0 and i1 >= 0 and i0 + len(prefix) <= i1:
data = data[i0 + len(prefix):i1]
data = ''.join(data.split())
return data
def verify_data(self, data):
b64chars = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/='
if not data or not type(data) is str:
return False
for c in data:
if c not in b64chars:
return False
return True
def can_read(self, user):
assert(type(user) is User)
return user.id == self.rights.user_id \
or self.rights.issuperuser()
def can_create(self, user):
assert(type(user) is User)
return user.id == self.rights.user_id or self.rights.issuperuser()
def create(self, user, data):
if not self.can_create(user):
raise exception.ModelDeny()
if not self.verify_data(data):
raise exception.ModelWrongData(self.t('Bad SSL certificate data'))
if self.get_by_data(data):
raise exception.ModelWrongData(self.t('SSL Certificate is not unique'))
self.connection.execute(
'INSERT INTO %T SET `user_id`=%d, `data`=%s',
self.table(), user.id, data )
id = self.connection.insert_id()
self.reset_cache(id, data)
return self.get_by_id(id, user)
def get_by_id(self, id, user = None):
assert(type(id) is int)
row = self.connection.cache.row(self.table(), id)
if not row:
return None
if not user:
user = self.model.users.get_by_id(row['user_id'])
if not user:
return None
if not self.can_read(user):
return None
return Sslcert(self, row, user)
def get_by_data(self, data):
assert(type(data) is str)
rows = self.connection.cache.select(self.table(), {'data': data})
if not rows or len(rows) > 1:
return None
return Sslcert(self, rows[0])
def get_list(self, user):
assert(type(user) is User)
result = list()
if not self.can_read(user):
return result
rows = self.connection.query_dict('SELECT * FROM %T WHERE `user_id`=%d ORDER BY `data`', self.table(), user.id)
for row in rows:
result.append(Sslcert(self, row, user))
return result