|
|
e5b0ac |
|
|
|
e5b0ac |
import exception
|
|
|
e5b0ac |
|
|
|
e146e6 |
from model.base import ModelItemBase, ModelManagerBase
|
|
|
e146e6 |
from model.users import User
|
|
|
e5b0ac |
|
|
|
e5b0ac |
|
|
|
e146e6 |
class RepositoryRight:
|
|
|
e146e6 |
def __init__(self, user, mode):
|
|
|
e146e6 |
assert(not user or type(user) is User)
|
|
|
e146e6 |
assert(type(mode) is str)
|
|
|
e146e6 |
self.all_users = not user
|
|
|
e146e6 |
self.user = user
|
|
|
e146e6 |
self.mode = mode
|
|
|
e146e6 |
|
|
|
e146e6 |
|
|
|
e146e6 |
class Repository(ModelItemBase):
|
|
|
e5b0ac |
def __init__(self, repositories, row, user = None):
|
|
|
e146e6 |
super().__init__(repositories, row)
|
|
|
e5b0ac |
|
|
|
e5b0ac |
self.id = int(row['id'])
|
|
|
e5b0ac |
self.user_id = int(row['user_id'])
|
|
|
e5b0ac |
self.name = str(row['name'])
|
|
|
e5b0ac |
self.type = str(row['type'])
|
|
|
e5b0ac |
self.title = str(row['title'])
|
|
|
e5b0ac |
self.description = str(row['description'])
|
|
|
e5b0ac |
|
|
|
e146e6 |
assert(not user or (type(user) is User and user.id == self.user_id))
|
|
|
e5b0ac |
self.user = user
|
|
|
e146e6 |
self.repotype = self.manager.repotypes[self.type]
|
|
|
e146e6 |
self.user_rights = None
|
|
|
e5b0ac |
|
|
|
e5b0ac |
def get_user(self):
|
|
|
e5b0ac |
if self.user == None:
|
|
|
e5b0ac |
self.user = self.model.users.get_by_id(self.user_id)
|
|
|
e5b0ac |
assert(self.user)
|
|
|
e5b0ac |
assert(self.user.id == self.user_id)
|
|
|
e5b0ac |
return self.user
|
|
|
e5b0ac |
|
|
|
e146e6 |
def get_user_rights(self):
|
|
|
e146e6 |
if self.user_rights is None:
|
|
|
e146e6 |
self.user_rights = list()
|
|
|
e146e6 |
rights = self.rights.get_list(target_type = self.table(), target_id = self.id)
|
|
|
e146e6 |
if self.can_update():
|
|
|
e146e6 |
self.user_rights = rights
|
|
|
e146e6 |
else:
|
|
|
e146e6 |
for i in rights:
|
|
|
e146e6 |
if i.user_id == 0 or i.user_id == self.rights.user_id:
|
|
|
e146e6 |
self.user_rights.append(i)
|
|
|
e146e6 |
assert(not self.user_rights is None)
|
|
|
e146e6 |
return self.user_rights
|
|
|
e146e6 |
|
|
|
e5b0ac |
def reset_cache(self):
|
|
|
e146e6 |
self.manager.reset_cache(self.id, self.user_id, self.name)
|
|
|
e5b0ac |
|
|
|
e5b0ac |
def gen_subpath(self):
|
|
|
e146e6 |
return self.manager.gen_subpath(self.get_user().login, self.name)
|
|
|
e5b0ac |
|
|
|
e5b0ac |
def gen_internalurl(self):
|
|
|
e5b0ac |
return self.repotype.gen_internalurl( self.gen_subpath() )
|
|
|
e5b0ac |
|
|
|
3976df |
def can_write(self):
|
|
|
3976df |
return self.can_update() or self.rights.isallowed(self.table(), self.id, self.manager.REPOWRITE)
|
|
|
e146e6 |
|
|
|
e5b0ac |
def can_update(self):
|
|
|
e5b0ac |
return self.user_id == self.rights.user_id or self.rights.issuperuser()
|
|
|
e5b0ac |
|
|
|
e5b0ac |
def update(self, title, description):
|
|
|
e5b0ac |
if self.can_update():
|
|
|
e5b0ac |
self.connection.execute(
|
|
|
e5b0ac |
'UPDATE %T SET `title`=%s, `description`=%s WHERE `id`=%d',
|
|
|
e146e6 |
self.table(), title, description, self.id )
|
|
|
e5b0ac |
self.reset_cache()
|
|
|
e5b0ac |
else:
|
|
|
e5b0ac |
raise exception.ModelDeny()
|
|
|
e5b0ac |
|
|
|
e5b0ac |
def can_delete(self):
|
|
|
e5b0ac |
return self.user_id == self.rights.user_id or self.rights.issuperuser()
|
|
|
e146e6 |
|
|
|
e146e6 |
def delete(self):
|
|
|
e5b0ac |
if self.can_delete():
|
|
|
e146e6 |
self.rights.delete_list(target_type = self.table(), target_id = self.id)
|
|
|
e5b0ac |
self.connection.execute(
|
|
|
e5b0ac |
'DELETE FROM %T WHERE `id`=%d',
|
|
|
e146e6 |
self.table(), self.id )
|
|
|
e5b0ac |
self.reset_cache()
|
|
|
e5b0ac |
self.connection.call_on_commit(self.repotype.delete, self.gen_subpath())
|
|
|
e5b0ac |
else:
|
|
|
e5b0ac |
raise exception.ModelDeny()
|
|
|
e146e6 |
|
|
|
e146e6 |
def set_user_right(self, user_id, mode, allowed, all_users = False):
|
|
|
e146e6 |
assert(all_users == (user_id == 0))
|
|
|
e146e6 |
assert(type(user_id) is int)
|
|
|
e146e6 |
if self.can_update():
|
|
|
e146e6 |
if not mode in (self.manager.READ, self.manager.REPOWRITE):
|
|
|
e146e6 |
raise exception.ModelWrongData(self.t('Wrong mode'))
|
|
|
89ea6c |
if not user_id and mode == self.manager.REPOWRITE:
|
|
|
89ea6c |
raise exception.ModelWrongData(self.t('Cannot grant write access for all users'))
|
|
|
e146e6 |
self.rights.set(user_id, self.table(), self.id, mode, allowed)
|
|
|
e146e6 |
else:
|
|
|
e146e6 |
raise exception.ModelDeny()
|
|
|
e5b0ac |
|
|
|
e5b0ac |
|
|
|
e146e6 |
class Repositories(ModelManagerBase):
|
|
|
e146e6 |
READ = 'read'
|
|
|
e146e6 |
REPOWRITE = 'repowrite'
|
|
|
e5b0ac |
|
|
|
e5b0ac |
def __init__(self, model):
|
|
|
e5b0ac |
super().__init__(model)
|
|
|
e5b0ac |
self.repotypes = self.server.repotypes
|
|
|
e5b0ac |
|
|
|
e146e6 |
def table(self):
|
|
|
e146e6 |
return 'repositories'
|
|
|
e146e6 |
|
|
|
e146e6 |
def itemtype(self):
|
|
|
e146e6 |
return Repository
|
|
|
e146e6 |
|
|
|
e5b0ac |
def reset_cache(self, id, user_id, name):
|
|
|
e146e6 |
super().reset_cache(id)
|
|
|
e146e6 |
self.connection.cache.reset(self.table(), {'user_id': user_id, 'name': name})
|
|
|
e5b0ac |
|
|
|
e5b0ac |
def verify_name(self, name):
|
|
|
e5b0ac |
return self.model.verify_identifier(name)
|
|
|
e5b0ac |
|
|
|
e5b0ac |
def gen_subpath(self, login, name):
|
|
|
e5b0ac |
assert( self.model.verify_path_entry(login) )
|
|
|
e5b0ac |
assert( self.model.verify_path_entry(name) )
|
|
|
e5b0ac |
return login + '/' + name
|
|
|
e5b0ac |
|
|
|
e146e6 |
def can_read(self, user, id):
|
|
|
e146e6 |
assert(type(user) is User)
|
|
|
e146e6 |
assert(type(id) is int)
|
|
|
e146e6 |
return user.id == self.rights.user_id \
|
|
|
e146e6 |
or self.rights.issuperuser() \
|
|
|
e146e6 |
or self.rights.isallowed(self.table(), id, self.READ)
|
|
|
e146e6 |
|
|
|
e146e6 |
def can_create(self, user):
|
|
|
e146e6 |
assert(type(user) is User)
|
|
|
e146e6 |
return user.id == self.rights.user_id or self.rights.issuperuser()
|
|
|
e5b0ac |
|
|
|
e146e6 |
def create(self, user, name, repotype, title, description):
|
|
|
e146e6 |
if not self.can_create(user):
|
|
|
e5b0ac |
raise exception.ModelDeny()
|
|
|
e5b0ac |
if not self.verify_name(name):
|
|
|
e5b0ac |
raise exception.ModelWrongData(self.t('Repository name is incorrect'))
|
|
|
e5b0ac |
if not type(repotype) is str or not repotype in self.repotypes:
|
|
|
e5b0ac |
raise exception.ModelWrongData(self.t('Repository type is incorrect'))
|
|
|
e146e6 |
if self.get_by_name(user, name):
|
|
|
e5b0ac |
raise exception.ModelWrongData(self.t('Repository already exists'))
|
|
|
e5b0ac |
|
|
|
e5b0ac |
self.connection.execute(
|
|
|
e5b0ac |
'INSERT INTO %T SET `user_id`=%d, `name`=%s, `type`=%s, `title`=%s, `description`=%s',
|
|
|
e146e6 |
self.table(), user.id, name, repotype, title, description )
|
|
|
e5b0ac |
id = self.connection.insert_id()
|
|
|
e146e6 |
self.reset_cache(id, user.id, name)
|
|
|
e5b0ac |
|
|
|
e5b0ac |
subpath = self.gen_subpath(user.login, name)
|
|
|
e5b0ac |
repotype_instance = self.repotypes[repotype]
|
|
|
e5b0ac |
self.connection.call_on_rollback(repotype_instance.delete, subpath)
|
|
|
e5b0ac |
repotype_instance.create(subpath)
|
|
|
e5b0ac |
|
|
|
e5b0ac |
return self.get_by_id(id, user)
|
|
|
e5b0ac |
|
|
|
e5b0ac |
def get_by_id(self, id, user = None):
|
|
|
e5b0ac |
assert(type(id) is int)
|
|
|
e146e6 |
row = self.connection.cache.row(self.table(), id)
|
|
|
e5b0ac |
if not row:
|
|
|
e5b0ac |
return None
|
|
|
e5b0ac |
if not user:
|
|
|
e5b0ac |
user = self.model.users.get_by_id(row['user_id'])
|
|
|
e5b0ac |
if not user:
|
|
|
e5b0ac |
return None
|
|
|
e146e6 |
if not self.can_read(user, row['id']):
|
|
|
e146e6 |
return None
|
|
|
e5b0ac |
return Repository(self, row, user)
|
|
|
e5b0ac |
|
|
|
e146e6 |
def get_by_name(self, user, name):
|
|
|
e146e6 |
assert(type(user) is User)
|
|
|
e5b0ac |
assert(type(name) is str)
|
|
|
e5b0ac |
|
|
|
e146e6 |
rows = self.connection.cache.select(self.table(), {'user_id': user.id, 'name': name})
|
|
|
e5b0ac |
if not rows or len(rows) > 1:
|
|
|
e5b0ac |
return None
|
|
|
e5b0ac |
row = rows[0]
|
|
|
e146e6 |
if not self.can_read(user, row['id']):
|
|
|
e146e6 |
return None
|
|
|
e5b0ac |
return Repository(self, row, user)
|
|
|
e5b0ac |
|
|
|
e5b0ac |
def get_list(self, user):
|
|
|
e146e6 |
assert(type(user) is User)
|
|
|
e5b0ac |
result = list()
|
|
|
e146e6 |
rows = self.connection.query_dict('SELECT * FROM %T WHERE `user_id`=%d ORDER BY `name`', self.table(), user.id)
|
|
|
e5b0ac |
for row in rows:
|
|
|
e146e6 |
if self.can_read(user, row['id']):
|
|
|
e146e6 |
result.append(Repository(self, row, user))
|
|
|
e5b0ac |
return result
|
|
|
e5b0ac |
|