Blob Blame Raw

import exception

from model.base import ModelItemBase, ModelManagerBase
from model.users import User



class Sslcert(ModelItemBase):
  def __init__(self, sslcerts, row, user = None):
    super().__init__(sslcerts, row)

    self.id = int(row['id'])
    self.user_id = int(row['user_id'])
    self.data = str(row['data'])

    assert(not user or (type(user) is User and user.id == self.user_id))
    self.user = user

  def get_user(self):
    if self.user == None:
      self.user = self.model.users.get_by_id(self.user_id)
    assert(self.user)
    assert(self.user.id == self.user_id)
    return self.user

  def reset_cache(self):
    self.manager.reset_cache(self.id, self.data)

  def can_delete(self):
    return self.user_id == self.rights.user_id or self.rights.issuperuser()

  def delete(self):
    if self.can_delete():
      self.connection.execute(
        'DELETE FROM %T WHERE `id`=%d',
        self.table(), self.id )
      self.reset_cache()
    else:
      raise exception.ModelDeny()


class Sslcerts(ModelManagerBase):
  def table(self):
    return 'sslcerts'

  def itemtype(self):
    return Sslcert

  def reset_cache(self, id, data):
    super().reset_cache(id)
    self.connection.cache.reset(self.table(), {'data': data})

  def extract_data(self, data):
    prefix = '-----BEGIN CERTIFICATE-----'
    suffix = '-----END CERTIFICATE-----'
    data = str(data)
    i0 = data.find(prefix)
    i1 = data.find(suffix)
    if i0 >= 0 and i1 >= 0 and i0 + len(prefix) <= i1:
      data = data[i0 + len(prefix):i1]
    data = ''.join(data.split())
    return data

  def verify_data(self, data):
    b64chars = 'ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/='
    if not data or not type(data) is str:
      return False
    for c in data:
      if c not in b64chars:
        return False
    return True

  def can_read(self, user):
    assert(type(user) is User)
    return user.id == self.rights.user_id \
        or self.rights.issuperuser()

  def can_create(self, user):
    assert(type(user) is User)
    return user.id == self.rights.user_id or self.rights.issuperuser()

  def create(self, user, data):
    if not self.can_create(user):
      raise exception.ModelDeny()
    if not self.verify_data(data):
      raise exception.ModelWrongData(self.t('Bad SSL certificate data'))
    if self.get_by_data(data):
      raise exception.ModelWrongData(self.t('SSL Certificate is not unique'))

    self.connection.execute(
      'INSERT INTO %T SET `user_id`=%d, `data`=%s',
      self.table(), user.id, data )
    id = self.connection.insert_id()
    self.reset_cache(id, data)

    return self.get_by_id(id, user)

  def get_by_id(self, id, user = None):
    assert(type(id) is int)
    row = self.connection.cache.row(self.table(), id)
    if not row:
      return None
    if not user:
      user = self.model.users.get_by_id(row['user_id'])
      if not user:
        return None
    if not self.can_read(user):
      return None
    return Sslcert(self, row, user)

  def get_by_data(self, data):
    assert(type(data) is str)
    rows = self.connection.cache.select(self.table(), {'data': data})
    if not rows or len(rows) > 1:
      return None
    return Sslcert(self, rows[0])

  def get_list(self, user):
    assert(type(user) is User)
    result = list()
    if not self.can_read(user):
      return result
    rows = self.connection.query_dict('SELECT * FROM %T WHERE `user_id`=%d ORDER BY `data`', self.table(), user.id)
    for row in rows:
      result.append(Sslcert(self, row, user))
    return result