import hashlib
import exception
from model.base import ModelItemBase, ModelManagerBase
class User(ModelItemBase):
def __init__(self, users, row):
super().__init__(users, row)
self.id = int(row['id'])
self.login = str(row['login'])
self.name = str(row['name'])
self.superuser_ready = False
self.superuser = None
if self.id == self.rights.user_id:
self._password = str(row['password'])
def get_superuser(self):
if not self.superuser_ready:
self.superuser = None
if self.id == self.rights.user_id or self.rights.issuperuser():
self.superuser = self.rights.get_superuser(self.id)
self.superuser_ready = True
return self.superuser
def reset_cache(self):
self.manager.reset_cache(self.id, self.login)
def change_password(self, newpassword, oldpassword = None):
if self.id == self.rights.user_id:
if oldpassword is None:
raise exception.ModelDeny()
oldhash = self.manager.gen_password_hash(self.id, oldpassword)
if oldhash != self._password:
raise exception.ModelWrongData(self.t('Password incorrect'))
if self.id != self.rights.user_id and not self.rights.issuperuser():
raise exception.ModelDeny()
hash = self.manager.gen_password_hash(self.id, newpassword)
self.connection.execute('UPDATE %T SET `password`=%s WHERE `id`=%d', self.table(), hash, self.id)
self.reset_cache()
self.server.sessions.delete_by_user_id(self.id)
def can_update(self):
return self.id == self.rights.user_id or self.rights.issuperuser()
def update(self, name):
if self.can_update():
self.connection.execute('UPDATE %T SET `name`=%s WHERE `id`=%d', self.table(), name, self.id)
self.reset_cache()
else:
raise exception.ModelDeny()
def can_delete(self):
return self.server.config['users']['selfdelete'] if self.id == self.rights.user_id else self.rights.issuperuser()
def delete(self, password = None):
if self.can_delete():
if self.id == self.rights.user_id:
if password is None:
raise exception.ModelDeny()
hash = self.manager.gen_password_hash(self.id, str(password))
if hash != self._password:
raise exception.ModelWrongData(self.t('Password incorrect'))
repositories = self.model.repositories.get_list(self)
for repo in repositories:
repo.delete()
self.rights.delete_list(user_id = self.id)
self.rights.delete_list(target_type = self.table(), target_id = self.id)
self.connection.execute('DELETE FROM %T WHERE `id`=%d', self.table(), self.id)
self.reset_cache()
self.server.sessions.delete_by_user_id(self.id)
else:
raise exception.ModelDeny()
def set_superuser(self, allowed):
if id != self.rights.user_id and self.rights.issuperuser():
self.rights.set_superuser(self.id, allowed)
else:
raise exception.ModelDeny()
class Users(ModelManagerBase):
def __init__(self, model):
super().__init__(model)
def table(self):
return 'users'
def itemtype(self):
return User
def reset_cache(self, id, login):
super().reset_cache(id)
self.connection.cache.reset(self.table(), {'login': login})
def gen_password_hash(self, id, password):
salt = self.server.config['users']['salt']
return hashlib.sha512(bytes(str(id) + '|' + str(salt) + '|' + password, 'utf8')).hexdigest()
def verify_login(self, login):
return self.model.verify_identifier(login)
def can_create(self):
return self.rights.issuperuser() or self.server.config['users']['selfcreate']
def create(self, login, password, name):
if not self.can_create():
raise exception.ModelDeny()
if not self.verify_login(login):
raise exception.ModelWrongData(self.t('Login incorrect'))
if self.get_by_login(login):
raise exception.ModelWrongData(self.t('Login already exists'))
self.connection.execute("INSERT INTO %T SET `login`=%s, `password`='', `name`=%s", self.table(), login, name)
id = self.connection.insert_id()
hash = self.gen_password_hash(int(id), password)
self.connection.execute('UPDATE %T SET `password`=%s WHERE `id`=%d', self.table(), hash, id)
self.reset_cache(id, login)
return self.get_by_id(id)
def check_password(self, login, password):
assert(type(login) is str)
assert(type(password) is str)
rows = self.connection.cache.select(self.table(), {'login': login})
if not rows or len(rows) > 1:
return False
row = rows[0]
id = int(row['id'])
if str(row['password']) == self.gen_password_hash(id, password):
return id
return 0
def get_by_id(self, id):
assert(type(id) is int)
row = None
if id == self.rights.user_id or self.rights.issuperuser() or self.server.config['users']['showprofile']:
row = self.connection.cache.row(self.table(), id)
if not row:
return None
return self.itemtype()(self, row)
def get_by_login(self, login):
assert(type(login) is str)
rows = self.connection.cache.select(self.table(), {'login': login})
if not rows or len(rows) > 1:
return None
row = rows[0]
if int(row['id']) == self.rights.user_id or self.rights.issuperuser() or self.server.config['users']['showprofile']:
return User(self, row)
return None
def can_list(self):
return self.rights.issuperuser() or (self.server.config['users']['showlist'] and self.server.config['users']['showprofile'])
def get_list(self):
result = list()
if self.can_list():
rows = self.connection.query_dict('SELECT * FROM %T ORDER BY `login`', self.table())
for row in rows:
result.append(User(self, row))
elif self.rights.user_id:
current_user = self.get_by_id(self.rights.user_id)
if current_user:
result.append(current_user)
return result