# -*- coding: utf-8 -*-
"""
(c) 2017 - Copyright Red Hat Inc
Authors:
Pierre-Yves Chibon <pingou@pingoured.fr>
"""
from __future__ import unicode_literals
__requires__ = ['SQLAlchemy >= 0.8']
import pkg_resources
import datetime
import os
import shutil
import sys
import tempfile
import time
import unittest
from io import open
import pygit2
from mock import patch, MagicMock
sys.path.insert(0, os.path.join(os.path.dirname(
os.path.abspath(__file__)), '..'))
import pagure.lib.git
import pagure.lib.query
import tests
from pagure.lib.repo import PagureRepo
CORE_CONFIG = """repo test
R = @all
RW+ = pingou
repo docs/test
R = @all
RW+ = pingou
repo tickets/test
RW+ = pingou
repo requests/test
RW+ = pingou
repo test2
R = @all
RW+ = pingou
repo docs/test2
R = @all
RW+ = pingou
repo tickets/test2
RW+ = pingou
repo requests/test2
RW+ = pingou
repo somenamespace/test3
R = @all
RW+ = pingou
repo docs/somenamespace/test3
R = @all
RW+ = pingou
repo tickets/somenamespace/test3
RW+ = pingou
repo requests/somenamespace/test3
RW+ = pingou"""
class PagureLibGitoliteConfigtests(tests.Modeltests):
""" Tests for pagure.lib.git """
maxDiff = None
def setUp(self):
""" Set up the environnment, ran before every tests. """
super(PagureLibGitoliteConfigtests, self).setUp()
tests.create_projects(self.session)
self.outputconf = os.path.join(self.path, 'test_gitolite.conf')
self.preconf = os.path.join(self.path, 'header_gitolite')
with open(self.preconf, 'w') as stream:
stream.write('# this is a header that is manually added\n')
stream.write('\n')
stream.write('@group1 = foo bar baz\n')
stream.write('@group2 = threebean puiterwijk kevin pingou\n')
self.postconf = os.path.join(self.path, 'footer_gitolite')
with open(self.postconf, 'w', encoding="utf-8") as stream:
stream.write('# end of generated configuration\n')
stream.write(r'# \ó/')
stream.write('\n# end of footer\n')
def tearDown(self):
""" Tearn down the environnment, ran before every tests. """
super(PagureLibGitoliteConfigtests, self).tearDown()
if os.path.exists(self.outputconf):
os.unlink(self.outputconf)
self.assertFalse(os.path.exists(self.outputconf))
def test_write_gitolite_pre_post_projectNone(self):
""" Test the write_gitolite_acls function of pagure.lib.git with
a postconf set """
helper = pagure.lib.git_auth.get_git_auth_helper('gitolite3')
helper.write_gitolite_acls(
self.session,
self.outputconf,
project=None,
preconf=self.preconf,
postconf=self.postconf
)
self.assertTrue(os.path.exists(self.outputconf))
with open(self.outputconf, 'r') as stream:
data = stream.read()
exp = r"""# this is a header that is manually added
@group1 = foo bar baz
@group2 = threebean puiterwijk kevin pingou
# end of header
%s
# end of body
# end of generated configuration
# \ó/
# end of footer
""" % CORE_CONFIG
#print data
self.assertEqual(data, exp)
def test_write_gitolite_pre_post_projectNone_to_existing_file(self):
""" Test the write_gitolite_acls function of pagure.lib.git with
a postconf set with existing output file """
with open(self.outputconf, 'w') as stream:
pass
helper = pagure.lib.git_auth.get_git_auth_helper('gitolite3')
helper.write_gitolite_acls(
self.session,
self.outputconf,
project=None,
preconf=self.preconf,
postconf=self.postconf
)
self.assertTrue(os.path.exists(self.outputconf))
with open(self.outputconf) as stream:
data = stream.read()
self.assertEqual(data, '')
def test_write_gitolite_pre_post_project_1(self):
""" Test the write_gitolite_acls function of pagure.lib.git with
a postconf set """
with open(self.outputconf, 'w') as stream:
pass
helper = pagure.lib.git_auth.get_git_auth_helper('gitolite3')
helper.write_gitolite_acls(
self.session,
self.outputconf,
project=-1,
preconf=self.preconf,
postconf=self.postconf
)
self.assertTrue(os.path.exists(self.outputconf))
with open(self.outputconf) as stream:
data = stream.read()
exp = r"""# this is a header that is manually added
@group1 = foo bar baz
@group2 = threebean puiterwijk kevin pingou
# end of header
%s
# end of body
# end of generated configuration
# \ó/
# end of footer
""" % CORE_CONFIG
#print data
self.assertEqual(data, exp)
def test_write_gitolite_pre_post_project_test(self):
""" Test the write_gitolite_acls function of pagure.lib.git with
a postconf set """
with open(self.outputconf, 'w') as stream:
pass
project = pagure.lib.query._get_project(self.session, 'test')
helper = pagure.lib.git_auth.get_git_auth_helper('gitolite3')
helper.write_gitolite_acls(
self.session,
self.outputconf,
project=project,
preconf=self.preconf,
postconf=self.postconf
)
self.assertTrue(os.path.exists(self.outputconf))
with open(self.outputconf) as stream:
data = stream.read()
exp = r"""# this is a header that is manually added
@group1 = foo bar baz
@group2 = threebean puiterwijk kevin pingou
# end of header
repo test
R = @all
RW+ = pingou
repo docs/test
R = @all
RW+ = pingou
repo tickets/test
RW+ = pingou
repo requests/test
RW+ = pingou
# end of body
# end of generated configuration
# \ó/
# end of footer
"""
#print data
self.assertEqual(data, exp)
def test_write_gitolite_pre_post_project_test_full_file(self):
""" Test the write_gitolite_acls function of pagure.lib.git with
a postconf set """
# Re-generate the gitolite config for all the projects
self.test_write_gitolite_pre_post_project_1()
self.assertTrue(os.path.exists(self.outputconf))
project = pagure.lib.query._get_project(self.session, 'test')
project.user_id = 2
self.session.add(project)
self.session.commit()
project = pagure.lib.query._get_project(self.session, 'test')
msg = pagure.lib.query.add_user_to_project(
self.session,
project=project,
new_user='pingou',
user='foo',
access='commit'
)
self.assertEqual(msg, 'User added')
self.session.commit()
project = pagure.lib.query._get_project(self.session, 'test')
helper = pagure.lib.git_auth.get_git_auth_helper('gitolite3')
helper.write_gitolite_acls(
self.session,
self.outputconf,
project=project,
preconf=self.preconf,
postconf=self.postconf
)
self.assertTrue(os.path.exists(self.outputconf))
with open(self.outputconf) as stream:
data = stream.read()
exp = r"""# this is a header that is manually added
@group1 = foo bar baz
@group2 = threebean puiterwijk kevin pingou
# end of header
repo test2
R = @all
RW+ = pingou
repo docs/test2
R = @all
RW+ = pingou
repo tickets/test2
RW+ = pingou
repo requests/test2
RW+ = pingou
repo somenamespace/test3
R = @all
RW+ = pingou
repo docs/somenamespace/test3
R = @all
RW+ = pingou
repo tickets/somenamespace/test3
RW+ = pingou
repo requests/somenamespace/test3
RW+ = pingou
repo test
R = @all
RW+ = foo
RW+ = pingou
repo docs/test
R = @all
RW+ = foo
RW+ = pingou
repo tickets/test
RW+ = foo
RW+ = pingou
repo requests/test
RW+ = foo
RW+ = pingou
# end of body
# end of generated configuration
# \ó/
# end of footer
"""
#print data
self.assertEqual(data, exp)
@patch.dict('pagure.config.config',
{'ENABLE_DOCS': False, 'ENABLE_TICKETS': False})
def test_write_gitolite_disabled_docs_tickets(self):
""" Test the write_gitolite_acls function when docs and tickets
are disabled """
# Re-generate the gitolite config for all the projects
project = pagure.lib.query._get_project(self.session, 'test')
project.user_id = 2
self.session.add(project)
self.session.commit()
project = pagure.lib.query._get_project(self.session, 'test')
msg = pagure.lib.query.add_user_to_project(
self.session,
project=project,
new_user='pingou',
user='foo',
access='commit'
)
self.assertEqual(msg, 'User added')
self.session.commit()
project = pagure.lib.query._get_project(self.session, 'test')
helper = pagure.lib.git_auth.get_git_auth_helper('gitolite3')
helper.write_gitolite_acls(
self.session,
self.outputconf,
project=project,
)
self.assertTrue(os.path.exists(self.outputconf))
with open(self.outputconf) as stream:
data = stream.read()
exp = """repo test
R = @all
RW+ = foo
RW+ = pingou
repo requests/test
RW+ = foo
RW+ = pingou
repo test2
R = @all
RW+ = pingou
repo requests/test2
RW+ = pingou
repo somenamespace/test3
R = @all
RW+ = pingou
repo requests/somenamespace/test3
RW+ = pingou
# end of body
"""
self.assertEqual(data, exp)
class PagureLibGitoliteGroupConfigtests(tests.Modeltests):
""" Tests for generating the gitolite configuration file for a group
change
"""
maxDiff = None
def setUp(self):
""" Set up the environnment, ran before every tests. """
super(PagureLibGitoliteGroupConfigtests, self).setUp()
tests.create_projects(self.session)
pagure.lib.query.add_group(
self.session,
group_name='grp',
display_name='grp group',
description=None,
group_type='user',
user='pingou',
is_admin=False,
blacklist=[],
)
pagure.lib.query.add_group(
self.session,
group_name='grp2',
display_name='grp2 group',
description=None,
group_type='user',
user='foo',
is_admin=False,
blacklist=[],
)
self.session.commit()
self.outputconf = os.path.join(self.path, 'test_gitolite.conf')
self.preconf = os.path.join(self.path, 'header_gitolite')
with open(self.preconf, 'w') as stream:
stream.write('# this is a header that is manually added\n')
stream.write('\n')
stream.write('@group1 = foo bar baz\n')
stream.write('@group2 = threebean puiterwijk kevin pingou\n')
self.postconf = os.path.join(self.path, 'footer_gitolite')
with open(self.postconf, 'w') as stream:
stream.write('# end of generated configuration\n')
stream.write(r'# \ó/')
stream.write('\n# end of footer\n')
def tearDown(self):
""" Tearn down the environnment, ran before every tests. """
super(PagureLibGitoliteGroupConfigtests, self).tearDown()
if os.path.exists(self.outputconf):
os.unlink(self.outputconf)
self.assertFalse(os.path.exists(self.outputconf))
def test_write_gitolite_project_test_group(self):
""" Test the write_gitolite_acls when updating a single group. """
with open(self.outputconf, 'w') as stream:
pass
project = pagure.lib.query._get_project(self.session, 'test')
group = pagure.lib.query.search_groups(self.session, group_name='grp')
helper = pagure.lib.git_auth.get_git_auth_helper('gitolite3')
helper.write_gitolite_acls(
self.session,
self.outputconf,
project=project,
preconf=self.preconf,
postconf=self.postconf,
group=group,
)
self.assertTrue(os.path.exists(self.outputconf))
with open(self.outputconf) as stream:
data = stream.read()
exp = r"""# this is a header that is manually added
@group1 = foo bar baz
@group2 = threebean puiterwijk kevin pingou
# end of header
@grp = pingou
repo test
R = @all
RW+ = pingou
repo docs/test
R = @all
RW+ = pingou
repo tickets/test
RW+ = pingou
repo requests/test
RW+ = pingou
# end of body
# end of generated configuration
# \ó/
# end of footer
"""
#print data
self.assertEqual(data, exp)
def test_write_gitolite_project_test_all_groups(self):
""" Test the write_gitolite_acls when updating all groups. """
with open(self.outputconf, 'w') as stream:
pass
project = pagure.lib.query._get_project(self.session, 'test')
helper = pagure.lib.git_auth.get_git_auth_helper('gitolite3')
helper.write_gitolite_acls(
self.session,
self.outputconf,
project=project,
preconf=self.preconf,
postconf=self.postconf,
)
self.assertTrue(os.path.exists(self.outputconf))
with open(self.outputconf) as stream:
data = stream.read()
exp = r"""# this is a header that is manually added
@group1 = foo bar baz
@group2 = threebean puiterwijk kevin pingou
# end of header
@grp = pingou
@grp2 = foo
# end of groups
repo test
R = @all
RW+ = pingou
repo docs/test
R = @all
RW+ = pingou
repo tickets/test
RW+ = pingou
repo requests/test
RW+ = pingou
# end of body
# end of generated configuration
# \ó/
# end of footer
"""
#print data
self.assertEqual(data, exp)
def test_write_gitolite_project_all_projects_groups(self):
""" Test the generating the entire gitolite config. """
with open(self.outputconf, 'w') as stream:
pass
helper = pagure.lib.git_auth.get_git_auth_helper('gitolite3')
helper.write_gitolite_acls(
self.session,
self.outputconf,
project=-1,
preconf=self.preconf,
postconf=self.postconf,
)
self.assertTrue(os.path.exists(self.outputconf))
with open(self.outputconf) as stream:
data = stream.read()
exp = r"""# this is a header that is manually added
@group1 = foo bar baz
@group2 = threebean puiterwijk kevin pingou
# end of header
@grp = pingou
@grp2 = foo
# end of groups
%s
# end of body
# end of generated configuration
# \ó/
# end of footer
""" % CORE_CONFIG
#print data
self.assertEqual(data, exp)
def test_write_gitolite_project_all_projects_one_group(self):
""" Test the generating the entire gitolite config. """
# Generate the full gitolite config that we will update
self.test_write_gitolite_project_all_projects_groups()
project = pagure.lib.query._get_project(self.session, 'test')
group = pagure.lib.query.search_groups(self.session, group_name='grp')
# Let's add `foo` to `grp` so something changes
msg = pagure.lib.query.add_user_to_group(
self.session,
username='foo',
group=group,
user='pingou',
is_admin=False,
)
self.session.commit()
self.assertEqual(msg, 'User `foo` added to the group `grp`.')
# Let's add `foo` to `test` so the project changes as well
msg = pagure.lib.query.add_user_to_project(
self.session,
project=project,
new_user='foo',
user='pingou',
access='commit'
)
self.assertEqual(msg, 'User added')
self.session.commit()
helper = pagure.lib.git_auth.get_git_auth_helper('gitolite3')
helper.write_gitolite_acls(
self.session,
self.outputconf,
project=project,
group=group,
preconf=self.preconf,
postconf=self.postconf,
)
self.assertTrue(os.path.exists(self.outputconf))
with open(self.outputconf) as stream:
data = stream.read()
exp = r"""# this is a header that is manually added
@group1 = foo bar baz
@group2 = threebean puiterwijk kevin pingou
# end of header
@grp = foo pingou
@grp2 = foo
# end of groups
repo test2
R = @all
RW+ = pingou
repo docs/test2
R = @all
RW+ = pingou
repo tickets/test2
RW+ = pingou
repo requests/test2
RW+ = pingou
repo somenamespace/test3
R = @all
RW+ = pingou
repo docs/somenamespace/test3
R = @all
RW+ = pingou
repo tickets/somenamespace/test3
RW+ = pingou
repo requests/somenamespace/test3
RW+ = pingou
repo test
R = @all
RW+ = pingou
RW+ = foo
repo docs/test
R = @all
RW+ = pingou
RW+ = foo
repo tickets/test
RW+ = pingou
RW+ = foo
repo requests/test
RW+ = pingou
RW+ = foo
# end of body
# end of generated configuration
# \ó/
# end of footer
"""
#print data
self.assertEqual(data, exp)
def test_write_gitolite_delete_group(self):
""" Test the updating the gitolite config after having
deleted a group.
"""
# Generate the full gitolite config that we will update
self.test_write_gitolite_project_all_projects_groups()
# Delete the group `grp`
self.assertEqual(len(pagure.lib.query.search_groups(self.session)), 2)
group = pagure.lib.query.search_groups(self.session, group_name='grp')
self.session.delete(group)
self.session.commit()
self.assertEqual(len(pagure.lib.query.search_groups(self.session)), 1)
helper = pagure.lib.git_auth.get_git_auth_helper('gitolite3')
helper.write_gitolite_acls(
self.session,
self.outputconf,
project=None,
preconf=self.preconf,
postconf=self.postconf,
)
self.assertTrue(os.path.exists(self.outputconf))
with open(self.outputconf) as stream:
data = stream.read()
exp = r"""# this is a header that is manually added
@group1 = foo bar baz
@group2 = threebean puiterwijk kevin pingou
# end of header
@grp2 = foo
# end of groups
%s
# end of body
# end of generated configuration
# \ó/
# end of footer
""" % CORE_CONFIG
#print data
self.assertEqual(data, exp)
@patch('pagure.lib.git_auth.get_git_auth_helper')
def test_task_generate_gitolite_acls_one_group(self, get_helper):
""" Test the generate_gitolite_acls task to ensure if group is None
then None is passed to the helper. """
helper = MagicMock()
get_helper.return_value = helper
pagure.lib.query.SESSIONMAKER = self.session.session_factory
pagure.lib.tasks.generate_gitolite_acls(
namespace=None, name='test', user=None, group=None)
get_helper.assert_called_with()
args = helper.generate_acls.call_args
self.assertIsNone(args[1].get('group'))
self.assertIsNotNone(args[1].get('project'))
def test_write_gitolite_project_test_private(self):
""" Test the write_gitolite_acls function of pagure.lib.git with
a postconf set """
# Make the test project private
project = pagure.lib.query._get_project(self.session, 'test')
project.private = True
self.session.add(project)
self.session.commit()
# Re-generate the gitolite config just for this project
helper = pagure.lib.git_auth.get_git_auth_helper('gitolite3')
helper.write_gitolite_acls(
self.session,
self.outputconf,
project=None,
)
self.assertTrue(os.path.exists(self.outputconf))
with open(self.outputconf) as stream:
data = stream.read()
exp = """@grp = pingou
@grp2 = foo
# end of groups
repo test
RW+ = pingou
repo docs/test
RW+ = pingou
repo tickets/test
RW+ = pingou
repo requests/test
RW+ = pingou
repo test2
R = @all
RW+ = pingou
repo docs/test2
R = @all
RW+ = pingou
repo tickets/test2
RW+ = pingou
repo requests/test2
RW+ = pingou
repo somenamespace/test3
R = @all
RW+ = pingou
repo docs/somenamespace/test3
R = @all
RW+ = pingou
repo tickets/somenamespace/test3
RW+ = pingou
repo requests/somenamespace/test3
RW+ = pingou
# end of body
"""
#print data
self.assertEqual(data, exp)
def test_remove_acls(self):
""" Test the remove_acls function of pagure.lib.git when deleting
a project """
pagure.config.config['GITOLITE_CONFIG'] = self.outputconf
with open(self.outputconf, 'w') as stream:
pass
helper = pagure.lib.git_auth.get_git_auth_helper('gitolite3')
helper.write_gitolite_acls(
self.session,
self.outputconf,
project=-1,
)
self.assertTrue(os.path.exists(self.outputconf))
with open(self.outputconf) as stream:
data = stream.read()
exp = """@grp = pingou
@grp2 = foo
# end of groups
%s
# end of body
""" % CORE_CONFIG
#print data
self.assertEqual(data, exp)
# Test removing a project from the existing config
project = pagure.lib.query.get_authorized_project(
self.session, project_name='test')
helper.remove_acls(self.session, project=project)
with open(self.outputconf) as stream:
data = stream.read()
exp = """@grp = pingou
@grp2 = foo
# end of groups
repo test2
R = @all
RW+ = pingou
repo docs/test2
R = @all
RW+ = pingou
repo tickets/test2
RW+ = pingou
repo requests/test2
RW+ = pingou
repo somenamespace/test3
R = @all
RW+ = pingou
repo docs/somenamespace/test3
R = @all
RW+ = pingou
repo tickets/somenamespace/test3
RW+ = pingou
repo requests/somenamespace/test3
RW+ = pingou
# end of body
"""
#print data
self.assertEqual(data, exp)
def test_remove_acls_no_project(self):
""" Test the remove_acls function of pagure.lib.git when no project
is specified """
pagure.config.config['GITOLITE_CONFIG'] = self.outputconf
with open(self.outputconf, 'w') as stream:
pass
helper = pagure.lib.git_auth.get_git_auth_helper('gitolite3')
helper.write_gitolite_acls(
self.session,
self.outputconf,
project=-1,
)
self.assertTrue(os.path.exists(self.outputconf))
with open(self.outputconf) as stream:
data = stream.read()
exp = """@grp = pingou
@grp2 = foo
# end of groups
%s
# end of body
""" % CORE_CONFIG
#print data
self.assertEqual(data, exp)
# Test nothing changes if no project is specified
self.assertRaises(
RuntimeError,
helper.remove_acls,
self.session,
project=None
)
with open(self.outputconf) as stream:
data = stream.read()
self.assertEqual(data, exp)
if __name__ == '__main__':
unittest.main(verbosity=2)