Blame model/rights.py

b838e2
b838e2
b838e2
class Right:
b838e2
  def __init__(self, row):
b838e2
    self.user_id = int(row['user_id'])
b838e2
    self.target_type = str(row['target_type'])
b838e2
    self.target_id = int(row['target_id'])
b838e2
    self.mode = str(row['mode'])
b838e2
b838e2
b838e2
class MyRights:
b838e2
  def __init__(self, internal):
b838e2
    self.internal = internal
b838e2
    self.user_id = self.internal.user_id
b838e2
    
b838e2
  def issuperuser(self):
b838e2
    return self.internal.issuperuser()
b838e2
b838e2
  def isallowed(self, target_type, target_id, mode):
b838e2
    return self.internal.isallowed(target_type, target_id, mode)
b838e2
b838e2
  def isallowed_root(self, mode):
b838e2
    return self.internal.isallowed_root(mode)
b838e2
b838e2
b838e2
class InternalRights:
b838e2
  TABLE = 'rights'
b838e2
  ROOT = 'root'
b838e2
b838e2
  def __init__(self, connection, user_id = 0, superuser = False):
b838e2
    assert(type(user_id) is int)
b838e2
    assert(type(superuser) is bool)
b838e2
    self.connection = connection
b838e2
    self.user_id = user_id
b838e2
    self.superuser = superuser
b838e2
    
b838e2
  def issuperuser(self):
b838e2
    if self.superuser:
b838e2
      return True
b838e2
    if self.user_id and self.get(self.user_id, self.ROOT, 0, self.ROOT):
b838e2
      return True
b838e2
    
b838e2
  def isallowed(self, target_type, target_id, mode):
b838e2
    if self.issuperuser():
b838e2
      return True
b838e2
    if self.user_id and self.get(self.user_id, target_type, target_id, mode):
b838e2
      return True
b838e2
    return False
b838e2
b838e2
  def build_where(self, user_id = None, target_type = None, target_id = None, mode = None):
b838e2
    where = list()
b838e2
    args = list()
b838e2
    if user_id:
b838e2
      assert(type(user_id) is int)
b838e2
      where.append("`user_id`=%d")
b838e2
      args.append(user_id)
b838e2
    if target_type:
b838e2
      assert(type(target_type) is str)
b838e2
      where.append("`target_type`=%s")
b838e2
      args.append(target_type)
b838e2
    if target_id:
b838e2
      assert(type(target_id) is int)
b838e2
      where.append("`target_id`=%d")
b838e2
      args.append(target_id)
b838e2
    if mode:
b838e2
      assert(type(mode) is str)
b838e2
      where.append("`mode`=%s")
b838e2
      args.append(mode)
b838e2
    if not where:
b838e2
      return '', list()
b838e2
    return ' WHERE ' + ' AND '.join(where), args
b838e2
    
b838e2
  def get_list(self, user_id = None, target_type = None, target_id = None, mode = None):
b838e2
    where, args = self.build_where(user_id, target_type, target_id, mode)
b838e2
    rows = connection.query_dict('SELECT * FROM %T' + where, self.TABLE, *args)
b838e2
    rights = list()
b838e2
    for row in rows:
b838e2
      rights.append( Right(row) )
b838e2
    return rights
b838e2
  
b838e2
  def delete_list(self, user_id = None, target_type = None, target_id = None, mode = None):
b838e2
    where, args = self.build_where(user_id, target_type, target_id, mode)
b838e2
    self.connection.execute('DELETE FROM %T' + where, self.TABLE, *args)
b838e2
    self.connection.cache.reset(self.TABLE)
b838e2
b838e2
  def reset_cache(self, user_id, target_type, target_id, mode):
b838e2
    assert(type(user_id) is int)
b838e2
    assert(type(target_type) is str)
b838e2
    assert(type(target_id) is int)
b838e2
    assert(type(mode) is str)
b838e2
    self.connection.cache.reset(
b838e2
      self.TABLE,
b838e2
      { 'user_id': user_id,
b838e2
        'target_type': target_type,
b838e2
        'target_id': target_id,
b838e2
        'mode': mode })
b838e2
  
b838e2
  def get(self, user_id, target_type, target_id, mode):
b838e2
    assert(type(user_id) is int)
b838e2
    assert(type(target_type) is str)
b838e2
    assert(type(target_id) is int)
b838e2
    assert(type(mode) is str)
b838e2
    rows = self.connection.cache.select(
b838e2
      self.TABLE,
b838e2
      { 'user_id': user_id,
b838e2
        'target_type': target_type,
b838e2
        'target_id': target_id,
b838e2
        'mode': mode })
b838e2
    return bool(rows)
b838e2
b838e2
  def set(self, user_id, target_type, target_id, mode, allowed):
b838e2
    assert(type(user_id) is int)
b838e2
    assert(type(target_type) is str)
b838e2
    assert(type(target_id) is int)
b838e2
    assert(type(mode) is str)
b838e2
    if not allowed:
b838e2
      self.connection.execute(
b838e2
        '''DELETE FROM %T WHERE
b838e2
                `user_id`=%d
b838e2
            AND `target_type`=%s
b838e2
            AND `target_id`=%d
b838e2
            AND `mode`=%s''',
b838e2
        self.TABLE, user_id, target_type, target_id, mode )
b838e2
      self.reset_cache(user_id, target_type, target_id, mode)
b838e2
    elif not self.get(user_id, target_type, target_id, mode):
b838e2
      self.connection.execute(
b838e2
        '''INSERT INTO %T SET
b838e2
                `user_id`=%d,
b838e2
                `target_type`=%s,
b838e2
                `target_id`=%d,
b838e2
                `mode`=%s''',
b838e2
        self.TABLE, user_id, target_type, target_id, mode )
b838e2
      self.reset_cache(user_id, target_type, target_id, mode)
b838e2
      
b838e2
  def isallowed_root(self, mode):
b838e2
    return self.isallowed(self.ROOT, 0, mode)
b838e2
  def get_list_root(self, user_id = None, mode = None):
b838e2
    return self.get_list(user_id, self.ROOT, 0, mode)
b838e2
  def delete_list_root(self, user_id = None, mode = None):
b838e2
    return self.delete_list_root(user_id, self.ROOT, 0, mode)
b838e2
  def get_root(self, user_id, mode):
b838e2
    return self.get(user_id, self.ROOT, 0, mode)
b838e2
  def set_root(self, user_id, mode, allowed):
b838e2
    return self.set(user_id, self.ROOT, 0, mode, allowed)
b838e2
b838e2
  def get_superuser(self, user_id):
b838e2
    return self.get_root(user_id, self.ROOT)
b838e2
  def set_superuser(self, user_id, allowed):
b838e2
    return self.set_root(user_id, self.ROOT, allowed)
b838e2