Blame model/sslcerts.py

9e1462
9e1462
import exception
9e1462
9e1462
from model.base import ModelItemBase, ModelManagerBase
9e1462
from model.users import User
9e1462
9e1462
9e1462
9e1462
class Sslcert(ModelItemBase):
9e1462
  def __init__(self, sslcerts, row, user = None):
9e1462
    super().__init__(sslcerts, row)
9e1462
9e1462
    self.id = int(row['id'])
9e1462
    self.user_id = int(row['user_id'])
9e1462
    self.data = str(row['data'])
9e1462
9e1462
    assert(not user or (type(user) is User and user.id == self.user_id))
9e1462
    self.user = user
9e1462
9e1462
  def get_user(self):
9e1462
    if self.user == None:
9e1462
      self.user = self.model.users.get_by_id(self.user_id)
9e1462
    assert(self.user)
9e1462
    assert(self.user.id == self.user_id)
9e1462
    return self.user
9e1462
9e1462
  def reset_cache(self):
9e1462
    self.manager.reset_cache(self.id, self.data)
9e1462
9e1462
  def can_delete(self):
9e1462
    return self.user_id == self.rights.user_id or self.rights.issuperuser()
9e1462
9e1462
  def delete(self):
9e1462
    if self.can_delete():
9e1462
      self.connection.execute(
9e1462
        'DELETE FROM %T WHERE `id`=%d',
9e1462
        self.table(), self.id )
9e1462
      self.reset_cache()
9e1462
    else:
9e1462
      raise exception.ModelDeny()
9e1462
9e1462
9e1462
class Sslcerts(ModelManagerBase):
9e1462
  def table(self):
9e1462
    return 'sslcerts'
9e1462
9e1462
  def itemtype(self):
9e1462
    return Sslcert
9e1462
9e1462
  def reset_cache(self, id, data):
9e1462
    super().reset_cache(id)
9e1462
    self.connection.cache.reset(self.table(), {'data': data})
9e1462
9e1462
  def extract_data(self, data):
9e1462
    prefix = '-----BEGIN CERTIFICATE-----'
9e1462
    suffix = '-----END CERTIFICATE-----'
9e1462
    data = str(data)
9e1462
    i0 = data.find(prefix)
9e1462
    i1 = data.find(suffix)
9e1462
    if i0 >= 0 and i1 >= 0 and i0 + len(prefix) <= i1:
9e1462
      data = data[i0 + len(prefix):i1]
9e1462
    data = ''.join(data.split())
9e1462
    return data
9e1462
9e1462
  def verify_data(self, data):
9e1462
    b64chars = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/='
9e1462
    if not data or not type(data) is str:
9e1462
      return False
9e1462
    for c in data:
9e1462
      if c not in b64chars:
9e1462
        return False
9e1462
    return True
9e1462
9e1462
  def can_read(self, user):
9e1462
    assert(type(user) is User)
9e1462
    return user.id == self.rights.user_id \
9e1462
        or self.rights.issuperuser()
9e1462
9e1462
  def can_create(self, user):
9e1462
    assert(type(user) is User)
9e1462
    return user.id == self.rights.user_id or self.rights.issuperuser()
9e1462
9e1462
  def create(self, user, data):
9e1462
    if not self.can_create(user):
9e1462
      raise exception.ModelDeny()
9e1462
    if not self.verify_data(data):
9e1462
      raise exception.ModelWrongData(self.t('Bad SSL certificate data'))
9e1462
    if self.get_by_data(data):
9e1462
      raise exception.ModelWrongData(self.t('SSL Certificate is not unique'))
9e1462
9e1462
    self.connection.execute(
9e1462
      'INSERT INTO %T SET `user_id`=%d, `data`=%s',
9e1462
      self.table(), user.id, data )
9e1462
    id = self.connection.insert_id()
9e1462
    self.reset_cache(id, data)
9e1462
9e1462
    return self.get_by_id(id, user)
9e1462
9e1462
  def get_by_id(self, id, user = None):
9e1462
    assert(type(id) is int)
9e1462
    row = self.connection.cache.row(self.table(), id)
9e1462
    if not row:
9e1462
      return None
9e1462
    if not user:
9e1462
      user = self.model.users.get_by_id(row['user_id'])
9e1462
      if not user:
9e1462
        return None
9e1462
    if not self.can_read(user):
9e1462
      return None
9e1462
    return Sslcert(self, row, user)
9e1462
9e1462
  def get_by_data(self, data):
9e1462
    assert(type(data) is str)
9e1462
    rows = self.connection.cache.select(self.table(), {'data': data})
9e1462
    if not rows or len(rows) > 1:
9e1462
      return None
9e1462
    return Sslcert(self, rows[0])
9e1462
9e1462
  def get_list(self, user):
9e1462
    assert(type(user) is User)
9e1462
    result = list()
9e1462
    if not self.can_read(user):
9e1462
      return result
9e1462
    rows = self.connection.query_dict('SELECT * FROM %T WHERE `user_id`=%d ORDER BY `data`', self.table(), user.id)
9e1462
    for row in rows:
9e1462
      result.append(Sslcert(self, row, user))
9e1462
    return result
9e1462