|
|
9e1462 |
|
|
|
9e1462 |
import exception
|
|
|
9e1462 |
|
|
|
9e1462 |
from model.base import ModelItemBase, ModelManagerBase
|
|
|
9e1462 |
from model.users import User
|
|
|
9e1462 |
|
|
|
9e1462 |
|
|
|
9e1462 |
|
|
|
9e1462 |
class Sslcert(ModelItemBase):
|
|
|
9e1462 |
def __init__(self, sslcerts, row, user = None):
|
|
|
9e1462 |
super().__init__(sslcerts, row)
|
|
|
9e1462 |
|
|
|
9e1462 |
self.id = int(row['id'])
|
|
|
9e1462 |
self.user_id = int(row['user_id'])
|
|
|
9e1462 |
self.data = str(row['data'])
|
|
|
9e1462 |
|
|
|
9e1462 |
assert(not user or (type(user) is User and user.id == self.user_id))
|
|
|
9e1462 |
self.user = user
|
|
|
9e1462 |
|
|
|
9e1462 |
def get_user(self):
|
|
|
9e1462 |
if self.user == None:
|
|
|
9e1462 |
self.user = self.model.users.get_by_id(self.user_id)
|
|
|
9e1462 |
assert(self.user)
|
|
|
9e1462 |
assert(self.user.id == self.user_id)
|
|
|
9e1462 |
return self.user
|
|
|
9e1462 |
|
|
|
9e1462 |
def reset_cache(self):
|
|
|
9e1462 |
self.manager.reset_cache(self.id, self.data)
|
|
|
9e1462 |
|
|
|
9e1462 |
def can_delete(self):
|
|
|
9e1462 |
return self.user_id == self.rights.user_id or self.rights.issuperuser()
|
|
|
9e1462 |
|
|
|
9e1462 |
def delete(self):
|
|
|
9e1462 |
if self.can_delete():
|
|
|
9e1462 |
self.connection.execute(
|
|
|
9e1462 |
'DELETE FROM %T WHERE `id`=%d',
|
|
|
9e1462 |
self.table(), self.id )
|
|
|
9e1462 |
self.reset_cache()
|
|
|
9e1462 |
else:
|
|
|
9e1462 |
raise exception.ModelDeny()
|
|
|
9e1462 |
|
|
|
9e1462 |
|
|
|
9e1462 |
class Sslcerts(ModelManagerBase):
|
|
|
9e1462 |
def table(self):
|
|
|
9e1462 |
return 'sslcerts'
|
|
|
9e1462 |
|
|
|
9e1462 |
def itemtype(self):
|
|
|
9e1462 |
return Sslcert
|
|
|
9e1462 |
|
|
|
9e1462 |
def reset_cache(self, id, data):
|
|
|
9e1462 |
super().reset_cache(id)
|
|
|
9e1462 |
self.connection.cache.reset(self.table(), {'data': data})
|
|
|
9e1462 |
|
|
|
9e1462 |
def extract_data(self, data):
|
|
|
9e1462 |
prefix = '-----BEGIN CERTIFICATE-----'
|
|
|
9e1462 |
suffix = '-----END CERTIFICATE-----'
|
|
|
9e1462 |
data = str(data)
|
|
|
9e1462 |
i0 = data.find(prefix)
|
|
|
9e1462 |
i1 = data.find(suffix)
|
|
|
9e1462 |
if i0 >= 0 and i1 >= 0 and i0 + len(prefix) <= i1:
|
|
|
9e1462 |
data = data[i0 + len(prefix):i1]
|
|
|
9e1462 |
data = ''.join(data.split())
|
|
|
9e1462 |
return data
|
|
|
9e1462 |
|
|
|
9e1462 |
def verify_data(self, data):
|
|
|
9e1462 |
b64chars = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/='
|
|
|
9e1462 |
if not data or not type(data) is str:
|
|
|
9e1462 |
return False
|
|
|
9e1462 |
for c in data:
|
|
|
9e1462 |
if c not in b64chars:
|
|
|
9e1462 |
return False
|
|
|
9e1462 |
return True
|
|
|
9e1462 |
|
|
|
9e1462 |
def can_read(self, user):
|
|
|
9e1462 |
assert(type(user) is User)
|
|
|
9e1462 |
return user.id == self.rights.user_id \
|
|
|
9e1462 |
or self.rights.issuperuser()
|
|
|
9e1462 |
|
|
|
9e1462 |
def can_create(self, user):
|
|
|
9e1462 |
assert(type(user) is User)
|
|
|
9e1462 |
return user.id == self.rights.user_id or self.rights.issuperuser()
|
|
|
9e1462 |
|
|
|
9e1462 |
def create(self, user, data):
|
|
|
9e1462 |
if not self.can_create(user):
|
|
|
9e1462 |
raise exception.ModelDeny()
|
|
|
9e1462 |
if not self.verify_data(data):
|
|
|
9e1462 |
raise exception.ModelWrongData(self.t('Bad SSL certificate data'))
|
|
|
9e1462 |
if self.get_by_data(data):
|
|
|
9e1462 |
raise exception.ModelWrongData(self.t('SSL Certificate is not unique'))
|
|
|
9e1462 |
|
|
|
9e1462 |
self.connection.execute(
|
|
|
9e1462 |
'INSERT INTO %T SET `user_id`=%d, `data`=%s',
|
|
|
9e1462 |
self.table(), user.id, data )
|
|
|
9e1462 |
id = self.connection.insert_id()
|
|
|
9e1462 |
self.reset_cache(id, data)
|
|
|
9e1462 |
|
|
|
9e1462 |
return self.get_by_id(id, user)
|
|
|
9e1462 |
|
|
|
9e1462 |
def get_by_id(self, id, user = None):
|
|
|
9e1462 |
assert(type(id) is int)
|
|
|
9e1462 |
row = self.connection.cache.row(self.table(), id)
|
|
|
9e1462 |
if not row:
|
|
|
9e1462 |
return None
|
|
|
9e1462 |
if not user:
|
|
|
9e1462 |
user = self.model.users.get_by_id(row['user_id'])
|
|
|
9e1462 |
if not user:
|
|
|
9e1462 |
return None
|
|
|
9e1462 |
if not self.can_read(user):
|
|
|
9e1462 |
return None
|
|
|
9e1462 |
return Sslcert(self, row, user)
|
|
|
9e1462 |
|
|
|
9e1462 |
def get_by_data(self, data):
|
|
|
9e1462 |
assert(type(data) is str)
|
|
|
9e1462 |
rows = self.connection.cache.select(self.table(), {'data': data})
|
|
|
9e1462 |
if not rows or len(rows) > 1:
|
|
|
9e1462 |
return None
|
|
|
9e1462 |
return Sslcert(self, rows[0])
|
|
|
9e1462 |
|
|
|
9e1462 |
def get_list(self, user):
|
|
|
9e1462 |
assert(type(user) is User)
|
|
|
9e1462 |
result = list()
|
|
|
9e1462 |
if not self.can_read(user):
|
|
|
9e1462 |
return result
|
|
|
9e1462 |
rows = self.connection.query_dict('SELECT * FROM %T WHERE `user_id`=%d ORDER BY `data`', self.table(), user.id)
|
|
|
9e1462 |
for row in rows:
|
|
|
9e1462 |
result.append(Sslcert(self, row, user))
|
|
|
9e1462 |
return result
|
|
|
9e1462 |
|