Blob Blame Raw


import hashlib

import exception

from model.base import ModelItemBase, ModelManagerBase


class User(ModelItemBase):
  def __init__(self, users, row):
    super().__init__(users, row)
    
    self.id = int(row['id'])
    self.login = str(row['login'])
    self.name = str(row['name'])
    self.superuser_ready = False
    self.superuser = None
    if self.id == self.rights.user_id:
      self._password = str(row['password'])

  def get_superuser(self):
    if not self.superuser_ready:
      self.superuser = None
      if self.id == self.rights.user_id or self.rights.issuperuser():
        self.superuser = self.rights.get_superuser(self.id)
      self.superuser_ready = True
    return self.superuser

  def reset_cache(self):
    self.manager.reset_cache(self.id, self.login)
    
  def change_password(self, newpassword, oldpassword = None):
    if self.id == self.rights.user_id:
      if oldpassword is None:
        raise exception.ModelDeny()
      oldhash = self.manager.gen_password_hash(self.id, oldpassword)
      if oldhash != self._password:
        raise exception.ModelWrongData(self.t('Password incorrect'))
    if self.id != self.rights.user_id and not self.rights.issuperuser():
      raise exception.ModelDeny()
    hash = self.manager.gen_password_hash(self.id, newpassword)
    self.connection.execute('UPDATE %T SET `password`=%s WHERE `id`=%d', self.table(), hash, self.id)
    self.reset_cache()
    self.server.sessions.delete_by_user_id(self.id)

  def can_update(self):
    return self.id == self.rights.user_id or self.rights.issuperuser()
  
  def update(self, name):
    if self.can_update():
      self.connection.execute('UPDATE %T SET `name`=%s WHERE `id`=%d', self.table(), name, self.id)
      self.reset_cache()
    else:
      raise exception.ModelDeny()

  def can_delete(self):
    return self.server.config['users']['selfdelete'] if self.id == self.rights.user_id else self.rights.issuperuser()

  def delete(self, password = None):
    if self.can_delete():
      if self.id == self.rights.user_id:
        if password is None:
          raise exception.ModelDeny()
        hash = self.manager.gen_password_hash(self.id, str(password))
        if hash != self._password:
          raise exception.ModelWrongData(self.t('Password incorrect'))

      repositories = self.model.repositories.get_list(self)
      for repo in repositories:
        repo.delete()

      self.rights.delete_list(user_id = self.id)
      self.rights.delete_list(target_type = self.table(), target_id = self.id)
      self.connection.execute('DELETE FROM %T WHERE `id`=%d', self.table(), self.id)
      self.reset_cache()
      self.server.sessions.delete_by_user_id(self.id)
    else:
      raise exception.ModelDeny()
    
  def set_superuser(self, allowed):
    if id != self.rights.user_id and self.rights.issuperuser():
      self.rights.set_superuser(self.id, allowed)
    else:
      raise exception.ModelDeny()


class Users(ModelManagerBase):
  def __init__(self, model):
    super().__init__(model)

  def table(self):
    return 'users'
    
  def itemtype(self):
    return User
    
  def reset_cache(self, id, login):
    super().reset_cache(id)
    self.connection.cache.reset(self.table(), {'login': login})

  def gen_password_hash(self, id, password):
    salt = self.server.config['users']['salt']
    return hashlib.sha512(bytes(str(id) + '|' + str(salt) + '|' + password, 'utf8')).hexdigest()

  def verify_login(self, login):
    return self.model.verify_identifier(login)

  def can_create(self):
    return self.rights.issuperuser() or self.server.config['users']['selfcreate']

  def create(self, login, password, name):
    if not self.can_create():
      raise exception.ModelDeny()
    if not self.verify_login(login):
      raise exception.ModelWrongData(self.t('Login incorrect'))
    if self.get_by_login(login):
      raise exception.ModelWrongData(self.t('Login already exists'))
    self.connection.execute("INSERT INTO %T SET `login`=%s, `password`='', `name`=%s", self.table(), login, name)
    id = self.connection.insert_id()
    hash = self.gen_password_hash(int(id), password)
    self.connection.execute('UPDATE %T SET `password`=%s WHERE `id`=%d', self.table(), hash, id)
    self.reset_cache(id, login)
    return self.get_by_id(id)

  def check_password(self, login, password):
    assert(type(login) is str)
    assert(type(password) is str)
    rows = self.connection.cache.select(self.table(), {'login': login})
    if not rows or len(rows) > 1:
      return False
    row = rows[0]
    id = int(row['id'])
    if str(row['password']) == self.gen_password_hash(id, password):
      return id
    return 0

  def get_by_id(self, id):
    assert(type(id) is int)
    row = None
    if id == self.rights.user_id or self.rights.issuperuser() or self.server.config['users']['showprofile']:
      row = self.connection.cache.row(self.table(), id)
    if not row:
      return None
    return self.itemtype()(self, row)
    
  def get_by_login(self, login):
    assert(type(login) is str)
    rows = self.connection.cache.select(self.table(), {'login': login})
    if not rows or len(rows) > 1:
      return None
    row = rows[0]
    if int(row['id']) == self.rights.user_id or self.rights.issuperuser() or self.server.config['users']['showprofile']:
      return User(self, row)
    return None
  
  def can_list(self):
    return self.rights.issuperuser() or (self.server.config['users']['showlist'] and self.server.config['users']['showprofile'])
  
  def get_list(self):
    result = list()
    if self.can_list():
      rows = self.connection.query_dict('SELECT * FROM %T ORDER BY `login`', self.table())
      for row in rows:
        result.append(User(self, row))
    elif self.rights.user_id:
      current_user = self.get_by_id(self.rights.user_id)
      if current_user:
        result.append(current_user)
    return result