Blob Blame Raw

import exception

from model.base import ModelBase


class Repository(ModelBase):
  def __init__(self, repositories, row, user = None):
    self.repositories = repositories
    super().__init__(self.repositories.model)
    
    self.id = int(row['id'])
    self.user_id = int(row['user_id'])
    self.name = str(row['name'])
    self.type = str(row['type'])
    self.title = str(row['title'])
    self.description = str(row['description'])
    
    assert(not user or user.id == self.user_id)
    self.user = user
    self.repotype = self.repositories.repotypes[self.type]

  def get_user(self):
    if self.user == None:
      self.user = self.model.users.get_by_id(self.user_id)
    assert(self.user)
    assert(self.user.id == self.user_id)
    return self.user

  def reset_cache(self):
    self.repositories.reset_cache(self.id, self.user_id, self.name)
    
  def gen_subpath(self):
    return self.repositories.gen_subpath(self.get_user().login, self.name)

  def gen_internalurl(self):
    return self.repotype.gen_internalurl( self.gen_subpath() )
    
  def can_update(self):
    return self.user_id == self.rights.user_id or self.rights.issuperuser()
  
  def update(self, title, description):
    if self.can_update():
      self.connection.execute(
        'UPDATE %T SET `title`=%s, `description`=%s WHERE `id`=%d',
        self.repositories.TABLE, title, description, self.id )
      self.reset_cache()
    else:
      raise exception.ModelDeny()

  def can_delete(self):
    return self.user_id == self.rights.user_id or self.rights.issuperuser()

  def delete(self, password = None):
    if self.can_delete():
      self.connection.execute(
        'DELETE FROM %T WHERE `id`=%d',
        self.repositories.TABLE, self.id )
      self.reset_cache()
      self.connection.call_on_commit(self.repotype.delete, self.gen_subpath())
    else:
      raise exception.ModelDeny()


class Repositories(ModelBase):
  TABLE = 'repositories'
  
  def __init__(self, model):
    super().__init__(model)
    self.repotypes = self.server.repotypes
    
  def reset_cache(self, id, user_id, name):
    self.connection.cache.reset_row(self.TABLE, id)
    self.connection.cache.reset(self.TABLE, {'user_id': user_id, 'name': name})

  def verify_name(self, name):
    return self.model.verify_identifier(name)

  def gen_subpath(self, login, name):
    assert( self.model.verify_path_entry(login) )
    assert( self.model.verify_path_entry(name) )
    return login + '/' + name

  def can_create(self, user_id):
    assert(type(user_id) is int and user_id)
    return user_id == self.rights.user_id or self.rights.issuperuser()

  def create(self, user_id, name, repotype, title, description, user = None):
    assert(not user or user.id == user_id)
    if not self.can_create(user_id):
      raise exception.ModelDeny()
    if not self.verify_name(name):
      raise exception.ModelWrongData(self.t('Repository name is incorrect'))
    if not type(repotype) is str or not repotype in self.repotypes:
      raise exception.ModelWrongData(self.t('Repository type is incorrect'))
    if self.get_by_name(user_id, name):
      raise exception.ModelWrongData(self.t('Repository already exists'))
    
    if not user:
      user = self.model.users.get_by_id(user_id)
      if not user:
        raise exception.ModelWrongData(self.t('User not found'))
    
    self.connection.execute(
      'INSERT INTO %T SET `user_id`=%d, `name`=%s, `type`=%s, `title`=%s, `description`=%s',
      self.TABLE, user_id, name, repotype, title, description )
    id = self.connection.insert_id()
    self.reset_cache(id, user_id, name)
    
    subpath = self.gen_subpath(user.login, name)
    repotype_instance = self.repotypes[repotype]
    self.connection.call_on_rollback(repotype_instance.delete, subpath)
    repotype_instance.create(subpath)

    return self.get_by_id(id, user)

  def get_by_id(self, id, user = None):
    assert(type(id) is int)
    row = self.connection.cache.row(self.TABLE, id)
    if not row:
      return None
    if not user:
      user = self.model.users.get_by_id(row['user_id'])
      if not user:
        return None
    return Repository(self, row, user)
    
  def get_by_name(self, user_id, name, user = None):
    assert(not user or user.id == user_id)
    assert(type(user_id) is int)
    assert(type(name) is str)
    
    rows = self.connection.cache.select(self.TABLE, {'user_id': user_id, 'name': name})
    if not rows or len(rows) > 1:
      return None
    row = rows[0]
    if not user:
      user = self.model.users.get_by_id(row['user_id'])
      if not user:
        return None
    return Repository(self, row, user)
  
  def get_list(self, user):
    assert(user)
    result = list()
    rows = self.connection.query_dict('SELECT * FROM %T WHERE `user_id`=%d ORDER BY `name`', self.TABLE, user.id)
    for row in rows:
      result.append(Repository(self, row, user))
    return result