Blame model/users.py

b838e2
b838e2
b838e2
import hashlib
b838e2
b838e2
import exception
b838e2
e146e6
from model.base import ModelItemBase, ModelManagerBase
b838e2
b838e2
e146e6
class User(ModelItemBase):
b838e2
  def __init__(self, users, row):
e146e6
    super().__init__(users, row)
b838e2
    
b838e2
    self.id = int(row['id'])
b838e2
    self.login = str(row['login'])
b838e2
    self.name = str(row['name'])
e146e6
    self.superuser_ready = False
b838e2
    self.superuser = None
b838e2
    if self.id == self.rights.user_id:
b838e2
      self._password = str(row['password'])
e146e6
e146e6
  def get_superuser(self):
e146e6
    if not self.superuser_ready:
e146e6
      self.superuser = None
e146e6
      if self.id == self.rights.user_id or self.rights.issuperuser():
e146e6
        self.superuser = self.rights.get_superuser(self.id)
e146e6
      self.superuser_ready = True
e146e6
    return self.superuser
b838e2
b838e2
  def reset_cache(self):
e146e6
    self.manager.reset_cache(self.id, self.login)
b838e2
    
b838e2
  def change_password(self, newpassword, oldpassword = None):
b838e2
    if self.id == self.rights.user_id:
b838e2
      if oldpassword is None:
b838e2
        raise exception.ModelDeny()
e146e6
      oldhash = self.manager.gen_password_hash(self.id, oldpassword)
b838e2
      if oldhash != self._password:
b838e2
        raise exception.ModelWrongData(self.t('Password incorrect'))
b838e2
    if self.id != self.rights.user_id and not self.rights.issuperuser():
b838e2
      raise exception.ModelDeny()
e146e6
    hash = self.manager.gen_password_hash(self.id, newpassword)
e146e6
    self.connection.execute('UPDATE %T SET `password`=%s WHERE `id`=%d', self.table(), hash, self.id)
b838e2
    self.reset_cache()
b838e2
    self.server.sessions.delete_by_user_id(self.id)
b838e2
b838e2
  def can_update(self):
b838e2
    return self.id == self.rights.user_id or self.rights.issuperuser()
b838e2
  
b838e2
  def update(self, name):
b838e2
    if self.can_update():
e146e6
      self.connection.execute('UPDATE %T SET `name`=%s WHERE `id`=%d', self.table(), name, self.id)
b838e2
      self.reset_cache()
b838e2
    else:
b838e2
      raise exception.ModelDeny()
b838e2
b838e2
  def can_delete(self):
b838e2
    return self.server.config['users']['selfdelete'] if self.id == self.rights.user_id else self.rights.issuperuser()
b838e2
b838e2
  def delete(self, password = None):
b838e2
    if self.can_delete():
b838e2
      if self.id == self.rights.user_id:
b838e2
        if password is None:
b838e2
          raise exception.ModelDeny()
e146e6
        hash = self.manager.gen_password_hash(self.id, str(password))
b838e2
        if hash != self._password:
b838e2
          raise exception.ModelWrongData(self.t('Password incorrect'))
e146e6
e146e6
      repositories = self.model.repositories.get_list(self)
e146e6
      for repo in repositories:
e146e6
        repo.delete()
e146e6
b838e2
      self.rights.delete_list(user_id = self.id)
e146e6
      self.rights.delete_list(target_type = self.table(), target_id = self.id)
e146e6
      self.connection.execute('DELETE FROM %T WHERE `id`=%d', self.table(), self.id)
b838e2
      self.reset_cache()
b838e2
      self.server.sessions.delete_by_user_id(self.id)
b838e2
    else:
b838e2
      raise exception.ModelDeny()
b838e2
    
b838e2
  def set_superuser(self, allowed):
b838e2
    if id != self.rights.user_id and self.rights.issuperuser():
b838e2
      self.rights.set_superuser(self.id, allowed)
b838e2
    else:
b838e2
      raise exception.ModelDeny()
b838e2
b838e2
e146e6
class Users(ModelManagerBase):
b838e2
  def __init__(self, model):
b838e2
    super().__init__(model)
e146e6
e146e6
  def table(self):
e146e6
    return 'users'
e146e6
    
e146e6
  def itemtype(self):
e146e6
    return User
b838e2
    
b838e2
  def reset_cache(self, id, login):
e146e6
    super().reset_cache(id)
e146e6
    self.connection.cache.reset(self.table(), {'login': login})
b838e2
b838e2
  def gen_password_hash(self, id, password):
b838e2
    salt = self.server.config['users']['salt']
b838e2
    return hashlib.sha512(bytes(str(id) + '|' + str(salt) + '|' + password, 'utf8')).hexdigest()
b838e2
b838e2
  def verify_login(self, login):
a30080
    return self.model.verify_identifier(login)
b838e2
b838e2
  def can_create(self):
b838e2
    return self.rights.issuperuser() or self.server.config['users']['selfcreate']
b838e2
b838e2
  def create(self, login, password, name):
b838e2
    if not self.can_create():
b838e2
      raise exception.ModelDeny()
b838e2
    if not self.verify_login(login):
b838e2
      raise exception.ModelWrongData(self.t('Login incorrect'))
b838e2
    if self.get_by_login(login):
b838e2
      raise exception.ModelWrongData(self.t('Login already exists'))
e146e6
    self.connection.execute("INSERT INTO %T SET `login`=%s, `password`='', `name`=%s", self.table(), login, name)
b838e2
    id = self.connection.insert_id()
b838e2
    hash = self.gen_password_hash(int(id), password)
e146e6
    self.connection.execute('UPDATE %T SET `password`=%s WHERE `id`=%d', self.table(), hash, id)
b838e2
    self.reset_cache(id, login)
b838e2
    return self.get_by_id(id)
b838e2
b838e2
  def check_password(self, login, password):
b838e2
    assert(type(login) is str)
b838e2
    assert(type(password) is str)
e146e6
    rows = self.connection.cache.select(self.table(), {'login': login})
b838e2
    if not rows or len(rows) > 1:
b838e2
      return False
b838e2
    row = rows[0]
b838e2
    id = int(row['id'])
b838e2
    if str(row['password']) == self.gen_password_hash(id, password):
b838e2
      return id
b838e2
    return 0
b838e2
b838e2
  def get_by_id(self, id):
b838e2
    assert(type(id) is int)
b838e2
    row = None
b838e2
    if id == self.rights.user_id or self.rights.issuperuser() or self.server.config['users']['showprofile']:
e146e6
      row = self.connection.cache.row(self.table(), id)
b838e2
    if not row:
b838e2
      return None
e146e6
    return self.itemtype()(self, row)
b838e2
    
b838e2
  def get_by_login(self, login):
b838e2
    assert(type(login) is str)
e146e6
    rows = self.connection.cache.select(self.table(), {'login': login})
b838e2
    if not rows or len(rows) > 1:
b838e2
      return None
b838e2
    row = rows[0]
b838e2
    if int(row['id']) == self.rights.user_id or self.rights.issuperuser() or self.server.config['users']['showprofile']:
b838e2
      return User(self, row)
b838e2
    return None
b838e2
  
b838e2
  def can_list(self):
b838e2
    return self.rights.issuperuser() or (self.server.config['users']['showlist'] and self.server.config['users']['showprofile'])
b838e2
  
b838e2
  def get_list(self):
b838e2
    result = list()
b838e2
    if self.can_list():
e146e6
      rows = self.connection.query_dict('SELECT * FROM %T ORDER BY `login`', self.table())
b838e2
      for row in rows:
b838e2
        result.append(User(self, row))
b838e2
    elif self.rights.user_id:
b838e2
      current_user = self.get_by_id(self.rights.user_id)
b838e2
      if current_user:
b838e2
        result.append(current_user)
b838e2
    return result
b838e2