Blob Blame Raw


class Right:
  def __init__(self, row):
    self.user_id = int(row['user_id'])
    self.target_type = str(row['target_type'])
    self.target_id = int(row['target_id'])
    self.mode = str(row['mode'])


class MyRights:
  def __init__(self, internal, model):
    self.internal = internal
    self.model = model
    self.user_id = self.internal.user_id
    self.user = None
    
  def get_user():
    if self.user_id:
      assert self.user
      return None
    if not self.user:
      self.model.users.get_by_id(self.user_id)
      assert self.user
    return self.user
    
  def issuperuser(self):
    return self.internal.issuperuser()

  def isallowed(self, target_type, target_id, mode):
    return self.internal.isallowed(target_type, target_id, mode)

  def isallowed_root(self, mode):
    return self.internal.isallowed_root(mode)


class InternalRights:
  TABLE = 'rights'
  ROOT = 'root'

  def __init__(self, connection, user_id = 0, superuser = False):
    assert(type(user_id) is int)
    assert(type(superuser) is bool)
    self.connection = connection
    self.user_id = user_id
    self.superuser = superuser
    
  def issuperuser(self):
    if self.superuser:
      return True
    if self.user_id and self.get(self.user_id, self.ROOT, 0, self.ROOT):
      return True
    
  def isallowed(self, target_type, target_id, mode):
    if self.issuperuser():
      return True
    if self.user_id and self.get(self.user_id, target_type, target_id, mode):
      return True
    if self.get(0, target_type, target_id, mode):
      return True
    return False

  def build_where(self, user_id = None, target_type = None, target_id = None, mode = None):
    where = list()
    args = list()
    if not user_id is None:
      assert(type(user_id) is int)
      where.append("`user_id`=%d")
      args.append(user_id)
    if not target_type is None:
      assert(type(target_type) is str)
      where.append("`target_type`=%s")
      args.append(target_type)
    if not target_id is None:
      assert(type(target_id) is int)
      where.append("`target_id`=%d")
      args.append(target_id)
    if not mode is None:
      assert(type(mode) is str)
      where.append("`mode`=%s")
      args.append(mode)
    if not where:
      return '', list()
    return ' WHERE ' + ' AND '.join(where), args
    
  def get_list(self, user_id = None, target_type = None, target_id = None, mode = None):
    where, args = self.build_where(user_id, target_type, target_id, mode)
    rows = self.connection.query_dict('SELECT * FROM %T' + where, self.TABLE, *args)
    rights = list()
    for row in rows:
      rights.append( Right(row) )
    return rights
  
  def delete_list(self, user_id = None, target_type = None, target_id = None, mode = None):
    where, args = self.build_where(user_id, target_type, target_id, mode)
    self.connection.execute('DELETE FROM %T' + where, self.TABLE, *args)
    self.connection.cache.reset(self.TABLE)

  def reset_cache(self, user_id, target_type, target_id, mode):
    assert(type(user_id) is int)
    assert(type(target_type) is str)
    assert(type(target_id) is int)
    assert(type(mode) is str)
    self.connection.cache.reset(
      self.TABLE,
      { 'user_id': user_id,
        'target_type': target_type,
        'target_id': target_id,
        'mode': mode })
  
  def get(self, user_id, target_type, target_id, mode):
    assert(type(user_id) is int)
    assert(type(target_type) is str)
    assert(type(target_id) is int)
    assert(type(mode) is str)
    rows = self.connection.cache.select(
      self.TABLE,
      { 'user_id': user_id,
        'target_type': target_type,
        'target_id': target_id,
        'mode': mode })
    return bool(rows)

  def set(self, user_id, target_type, target_id, mode, allowed):
    assert(type(user_id) is int)
    assert(type(target_type) is str)
    assert(type(target_id) is int)
    assert(type(mode) is str)
    if not allowed:
      self.connection.execute(
        '''DELETE FROM %T WHERE
                `user_id`=%d
            AND `target_type`=%s
            AND `target_id`=%d
            AND `mode`=%s''',
        self.TABLE, user_id, target_type, target_id, mode )
      self.reset_cache(user_id, target_type, target_id, mode)
    elif not self.get(user_id, target_type, target_id, mode):
      self.connection.execute(
        '''INSERT INTO %T SET
                `user_id`=%d,
                `target_type`=%s,
                `target_id`=%d,
                `mode`=%s''',
        self.TABLE, user_id, target_type, target_id, mode )
      self.reset_cache(user_id, target_type, target_id, mode)
      
  def isallowed_root(self, mode):
    return self.isallowed(self.ROOT, 0, mode)
  def get_list_root(self, user_id = None, mode = None):
    return self.get_list(user_id, self.ROOT, 0, mode)
  def delete_list_root(self, user_id = None, mode = None):
    return self.delete_list_root(user_id, self.ROOT, 0, mode)
  def get_root(self, user_id, mode):
    return self.get(user_id, self.ROOT, 0, mode)
  def set_root(self, user_id, mode, allowed):
    return self.set(user_id, self.ROOT, 0, mode, allowed)

  def get_superuser(self, user_id):
    return self.get_root(user_id, self.ROOT)
  def set_superuser(self, user_id, allowed):
    return self.set_root(user_id, self.ROOT, allowed)