Blob Blame Raw
# -*- coding: utf-8 -*-

"""
 (c) 2015-2018 - Copyright Red Hat Inc

 Authors:
   Pierre-Yves Chibon <pingou@pingoured.fr>

"""

from __future__ import unicode_literals, absolute_import

import imp
import json
import logging
import os
import re
import resource
import shutil
import subprocess
import sys
import tempfile
import time
import unittest
from io import open, StringIO

logging.basicConfig(stream=sys.stderr)

from bs4 import BeautifulSoup
from contextlib import contextmanager
from datetime import date
from datetime import datetime
from datetime import timedelta
from functools import wraps
from six.moves.urllib.parse import urlparse, parse_qs

import mock
import pygit2
import redis
import six

from bs4 import BeautifulSoup
from celery.app.task import EagerResult
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.orm import scoped_session

if six.PY2:
    # Always enable performance counting for tests
    os.environ["PAGURE_PERFREPO"] = "true"

sys.path.insert(
    0, os.path.join(os.path.dirname(os.path.abspath(__file__)), "..")
)

import pagure
import pagure.api
from pagure.api.ci import jenkins
import pagure.flask_app
import pagure.lib.git
import pagure.lib.model
import pagure.lib.query
import pagure.lib.tasks_mirror
import pagure.perfrepo as perfrepo
from pagure.config import config as pagure_config, reload_config
from pagure.lib.repo import PagureRepo

HERE = os.path.join(os.path.dirname(os.path.abspath(__file__)))
LOG = logging.getLogger(__name__)
LOG.setLevel(logging.INFO)

PAGLOG = logging.getLogger("pagure")
PAGLOG.setLevel(logging.CRITICAL)
PAGLOG.handlers = []

if "PYTHONPATH" not in os.environ:
    os.environ["PYTHONPATH"] = os.path.normpath(os.path.join(HERE, "../"))

CONFIG_TEMPLATE = """
GIT_FOLDER = '%(path)s/repos'
ENABLE_DOCS = %(enable_docs)s
ENABLE_TICKETS = %(enable_tickets)s
REMOTE_GIT_FOLDER = '%(path)s/remotes'
DB_URL = '%(dburl)s'
ALLOW_PROJECT_DOWAIT = True
PAGURE_CI_SERVICES = ['jenkins']
EMAIL_SEND = False
TESTING = True
GIT_FOLDER = '%(path)s/repos'
REQUESTS_FOLDER = '%(path)s/repos/requests'
TICKETS_FOLDER = %(tickets_folder)r
DOCS_FOLDER = %(docs_folder)r
REPOSPANNER_PSEUDO_FOLDER = '%(path)s/repos/pseudo'
ATTACHMENTS_FOLDER = '%(path)s/attachments'
BROKER_URL = 'redis+socket://%(global_path)s/broker'
CELERY_CONFIG = {
    "task_always_eager": True,
    #"task_eager_propagates": True,
}
GIT_AUTH_BACKEND = '%(authbackend)s'
TEST_AUTH_STATUS = '%(path)s/testauth_status.json'
REPOBRIDGE_BINARY = '%(repobridge_binary)s'
REPOSPANNER_NEW_REPO = %(repospanner_new_repo)s
REPOSPANNER_NEW_REPO_ADMIN_OVERRIDE = %(repospanner_admin_override)s
REPOSPANNER_NEW_FORK = %(repospanner_new_fork)s
REPOSPANNER_ADMIN_MIGRATION = %(repospanner_admin_migration)s
REPOSPANNER_REGIONS = {
    'default': {'url': 'https://repospanner.localhost.localdomain:%(repospanner_gitport)s',
                'repo_prefix': 'pagure/',
                'hook': None,
                'ca': '%(path)s/repospanner/pki/ca.crt',
                'admin_cert': {'cert': '%(path)s/repospanner/pki/admin.crt',
                               'key': '%(path)s/repospanner/pki/admin.key'},
                'push_cert': {'cert': '%(path)s/repospanner/pki/pagure.crt',
                              'key': '%(path)s/repospanner/pki/pagure.key'}}
}
"""
# The Celery docs warn against using task_always_eager:
# http://docs.celeryproject.org/en/latest/userguide/testing.html
# but that warning is only valid when testing the async nature of the task, not
# what the task actually does.


LOG.info("BUILD_ID: %s", os.environ.get("BUILD_ID"))


WAIT_REGEX = re.compile(r"""var _url = '(\/wait\/[a-z0-9-]+\??.*)'""")


def get_wait_target(html):
    """ This parses the window.location out of the HTML for the wait page. """
    found = WAIT_REGEX.findall(html)
    if len(found) == 0:
        raise Exception("Not able to get wait target in %s" % html)
    return found[-1]


def get_post_target(html):
    """ This parses the wait page form to get the POST url. """
    soup = BeautifulSoup(html, "html.parser")
    form = soup.find(id="waitform")
    if not form:
        raise Exception("Not able to get the POST url in %s" % html)
    return form.get("action")


def get_post_args(html):
    """ This parses the wait page for the hidden arguments of the form. """
    soup = BeautifulSoup(html, "html.parser")
    output = {}
    inputs = soup.find_all("input")
    if not inputs:
        raise Exception("Not able to get the POST arguments in %s" % html)
    for inp in inputs:
        if inp.get("type") == "hidden":
            output[inp.get("name")] = inp.get("value")
    return output


def create_maybe_waiter(method, getter):
    def maybe_waiter(*args, **kwargs):
        """ A wrapper for self.app.get()/.post() that will resolve wait's """
        result = method(*args, **kwargs)

        # Handle the POST wait case
        form_url = None
        form_args = None
        try:
            result_text = result.get_data(as_text=True)
        except UnicodeDecodeError:
            return result
        if 'id="waitform"' in result_text:
            form_url = get_post_target(result_text)
            form_args = get_post_args(result_text)
            form_args["csrf_token"] = result_text.split(
                'name="csrf_token" type="hidden" value="'
            )[1].split('">')[0]

        count = 0
        while "We are waiting for your task to finish." in result_text:
            # Resolve wait page
            target_url = get_wait_target(result_text)
            if count > 10:
                time.sleep(0.5)
            else:
                time.sleep(0.1)
            result = getter(target_url, follow_redirects=True)
            try:
                result_text = result.get_data(as_text=True)
            except UnicodeDecodeError:
                return result
            if count > 50:
                raise Exception("Had to wait too long")
        else:
            if form_url and form_args:
                return method(form_url, data=form_args, follow_redirects=True)
            return result

    return maybe_waiter


@contextmanager
def user_set(APP, user, keep_get_user=False):
    """ Set the provided user as fas_user in the provided application."""

    # Hack used to remove the before_request function set by
    # flask.ext.fas_openid.FAS which otherwise kills our effort to set a
    # flask.g.fas_user.
    from flask import appcontext_pushed, g

    keep = []
    for meth in APP.before_request_funcs[None]:
        if "flask_fas_openid.FAS" in str(meth):
            continue
        keep.append(meth)
    APP.before_request_funcs[None] = keep

    def handler(sender, **kwargs):
        g.fas_user = user
        g.fas_session_id = b"123"
        g.authenticated = True

    old_get_user = pagure.flask_app._get_user
    if not keep_get_user:
        pagure.flask_app._get_user = mock.MagicMock(
            return_value=pagure.lib.model.User()
        )

    with appcontext_pushed.connected_to(handler, APP):
        yield

    pagure.flask_app._get_user = old_get_user


tests_state = {
    "path": tempfile.mkdtemp(prefix="pagure-tests-"),
    "broker": None,
    "broker_client": None,
    "results": {},
}


def _populate_db(session):
    # Create a couple of users
    item = pagure.lib.model.User(
        user="pingou",
        fullname="PY C",
        password=b"foo",
        default_email="bar@pingou.com",
    )
    session.add(item)
    item = pagure.lib.model.UserEmail(user_id=1, email="bar@pingou.com")
    session.add(item)
    item = pagure.lib.model.UserEmail(user_id=1, email="foo@pingou.com")
    session.add(item)

    item = pagure.lib.model.User(
        user="foo",
        fullname="foo bar",
        password=b"foo",
        default_email="foo@bar.com",
    )
    session.add(item)
    item = pagure.lib.model.UserEmail(user_id=2, email="foo@bar.com")
    session.add(item)

    session.commit()


def store_eager_results(*args, **kwargs):
    """A wrapper for EagerResult that stores the instance."""
    result = EagerResult(*args, **kwargs)
    tests_state["results"][result.id] = result
    return result


def setUp():
    # In order to save time during local test execution, we create sqlite DB
    # file only once and then we populate it and empty it for every test case
    # (as opposed to creating DB file for every test case).
    session = pagure.lib.model.create_tables(
        "sqlite:///%s/db.sqlite" % tests_state["path"],
        acls=pagure_config.get("ACLS", {}),
    )
    tests_state["db_session"] = session

    # Create a broker
    broker_url = os.path.join(tests_state["path"], "broker")

    tests_state["broker"] = broker = subprocess.Popen(
        [
            "/usr/bin/redis-server",
            "--unixsocket",
            broker_url,
            "--port",
            "0",
            "--loglevel",
            "warning",
            "--logfile",
            "/dev/null",
        ],
        stdout=None,
        stderr=None,
    )
    broker.poll()
    if broker.returncode is not None:
        raise Exception("Broker failed to start")
    tests_state["broker_client"] = redis.Redis(unix_socket_path=broker_url)

    # Store the EagerResults to be able to retrieve them later
    tests_state["eg_patcher"] = mock.patch("celery.app.task.EagerResult")
    eg_mock = tests_state["eg_patcher"].start()
    eg_mock.side_effect = store_eager_results


def tearDown():
    tests_state["db_session"].close()
    tests_state["eg_patcher"].stop()
    broker = tests_state["broker"]
    broker.kill()
    broker.wait()
    shutil.rmtree(tests_state["path"])


class SimplePagureTest(unittest.TestCase):
    """
    Simple Test class that does not set a broker/worker
    """

    populate_db = True
    config_values = {}

    @mock.patch("pagure.lib.notify.fedmsg_publish", mock.MagicMock())
    def __init__(self, method_name="runTest"):
        """ Constructor. """
        unittest.TestCase.__init__(self, method_name)
        self.session = None
        self.path = None
        self.gitrepo = None
        self.gitrepos = None

    def perfMaxWalks(self, max_walks, max_steps):
        """ Check that we have not performed too many walks/steps. """
        num_walks = 0
        num_steps = 0
        for reqstat in perfrepo.REQUESTS:
            for walk in reqstat["walks"].values():
                num_walks += 1
                num_steps += walk["steps"]
        self.assertLessEqual(
            num_walks,
            max_walks,
            "%s git repo walks performed, at most %s allowed"
            % (num_walks, max_walks),
        )
        self.assertLessEqual(
            num_steps,
            max_steps,
            "%s git repo steps performed, at most %s allowed"
            % (num_steps, max_steps),
        )

    def perfReset(self):
        """ Reset perfrepo stats. """
        perfrepo.reset_stats()
        perfrepo.REQUESTS = []

    def setUp(self):
        if self.path:
            # This prevents test state leakage.
            # This should be None if the previous runs' tearDown didn't finish,
            # leaving behind a self.path.
            # If we continue in this case, not only did the previous worker and
            # redis instances not exit, we also might accidentally use the
            # old database connection.
            # @pingou, don't delete this again... :)
            raise Exception("Previous test failed!")

        self.perfReset()

        self.path = tempfile.mkdtemp(prefix="pagure-tests-path-")

        LOG.debug("Testdir: %s", self.path)
        for folder in ["repos", "forks", "releases", "remotes", "attachments"]:
            os.mkdir(os.path.join(self.path, folder))

        if hasattr(pagure.lib.query, "REDIS") and pagure.lib.query.REDIS:
            pagure.lib.query.REDIS.connection_pool.disconnect()
            pagure.lib.query.REDIS = None

        # Database
        self._prepare_db()

        # Write a config file
        config_values = {
            "path": self.path,
            "dburl": self.dbpath,
            "enable_docs": True,
            "docs_folder": "%s/repos/docs" % self.path,
            "enable_tickets": True,
            "tickets_folder": "%s/repos/tickets" % self.path,
            "global_path": tests_state["path"],
            "authbackend": "gitolite3",
            "repobridge_binary": "/usr/libexec/repobridge",
            "repospanner_gitport": str(8443 + sys.version_info.major),
            "repospanner_new_repo": "None",
            "repospanner_admin_override": "False",
            "repospanner_new_fork": "True",
            "repospanner_admin_migration": "False",
        }
        config_values.update(self.config_values)
        self.config_values = config_values
        config_path = os.path.join(self.path, "config")
        if not os.path.exists(config_path):
            with open(config_path, "w") as f:
                f.write(CONFIG_TEMPLATE % self.config_values)
        os.environ["PAGURE_CONFIG"] = config_path
        pagure_config.update(reload_config())

        imp.reload(pagure.lib.tasks)
        imp.reload(pagure.lib.tasks_mirror)
        imp.reload(pagure.lib.tasks_services)

        self._app = pagure.flask_app.create_app({"DB_URL": self.dbpath})

        self.app = self._app.test_client()
        self.gr_patcher = mock.patch("pagure.lib.tasks.get_result")
        gr_mock = self.gr_patcher.start()
        gr_mock.side_effect = lambda tid: tests_state["results"][tid]

        # Refresh the DB session
        self.session = pagure.lib.query.create_session(self.dbpath)

    def tearDown(self):
        self.gr_patcher.stop()
        self.session.rollback()
        self._clear_database()

        # Remove testdir
        try:
            shutil.rmtree(self.path)
        except:
            # Sometimes there is a race condition that makes deleting the folder
            # fail during the first attempt. So just try a second time if that's
            # the case.
            shutil.rmtree(self.path)
        self.path = None

        del self.app
        del self._app

    def shortDescription(self):
        doc = self.__str__() + ": " + self._testMethodDoc
        return doc or None

    def _prepare_db(self):
        self.dbpath = "sqlite:///%s" % os.path.join(
            tests_state["path"], "db.sqlite"
        )
        self.session = tests_state["db_session"]
        pagure.lib.model.create_default_status(
            self.session, acls=pagure_config.get("ACLS", {})
        )
        if self.populate_db:
            _populate_db(self.session)

    def _clear_database(self):
        tables = reversed(pagure.lib.model_base.BASE.metadata.sorted_tables)
        if self.dbpath.startswith("postgresql"):
            self.session.execute(
                "TRUNCATE %s CASCADE" % ", ".join([t.name for t in tables])
            )
        elif self.dbpath.startswith("sqlite"):
            for table in tables:
                self.session.execute("DELETE FROM %s" % table.name)
        elif self.dbpath.startswith("mysql"):
            self.session.execute("SET FOREIGN_KEY_CHECKS = 0")
            for table in tables:
                self.session.execute("TRUNCATE %s" % table.name)
            self.session.execute("SET FOREIGN_KEY_CHECKS = 1")
        self.session.commit()

    def set_auth_status(self, value):
        """ Set the return value for the test auth """
        with open(
            os.path.join(self.path, "testauth_status.json"), "w"
        ) as statusfile:
            statusfile.write(six.u(json.dumps(value)))

    def get_csrf(self, url="/new", output=None):
        """Retrieve a CSRF token from given URL."""
        if output is None:
            output = self.app.get(url)
            self.assertEqual(output.status_code, 200)

        return (
            output.get_data(as_text=True)
            .split('name="csrf_token" type="hidden" value="')[1]
            .split('">')[0]
        )

    def get_wtforms_version(self):
        """Returns the wtforms version as a tuple."""
        import wtforms

        wtforms_v = wtforms.__version__.split(".")
        for idx, val in enumerate(wtforms_v):
            try:
                val = int(val)
            except ValueError:
                pass
            wtforms_v[idx] = val
        return tuple(wtforms_v)

    def assertURLEqual(self, url_1, url_2):
        url_parsed_1 = list(urlparse(url_1))
        url_parsed_1[4] = parse_qs(url_parsed_1[4])
        url_parsed_2 = list(urlparse(url_2))
        url_parsed_2[4] = parse_qs(url_parsed_2[4])
        return self.assertListEqual(url_parsed_1, url_parsed_2)

    def assertJSONEqual(self, json_1, json_2):
        return self.assertEqual(json.loads(json_1), json.loads(json_2))


class Modeltests(SimplePagureTest):
    """ Model tests. """

    def setUp(self):  # pylint: disable=invalid-name
        """ Set up the environnment, ran before every tests. """
        # Clean up test performance info
        super(Modeltests, self).setUp()
        self.app.get = create_maybe_waiter(self.app.get, self.app.get)
        self.app.post = create_maybe_waiter(self.app.post, self.app.get)

        # Refresh the DB session
        self.session = pagure.lib.query.create_session(self.dbpath)

    def tearDown(self):  # pylint: disable=invalid-name
        """ Remove the test.db database if there is one. """
        tests_state["broker_client"].flushall()
        super(Modeltests, self).tearDown()

    def create_project_full(self, projectname, extra=None):
        """ Create a project via the API.

        This makes sure that the repo is fully setup the way a normal new
        project would be, with hooks and all setup.
        """

        headers = {"Authorization": "token aaabbbcccddd"}
        data = {"name": projectname, "description": "A test repo"}
        if extra:
            data.update(extra)

        # Valid request
        output = self.app.post("/api/0/new/", data=data, headers=headers)
        self.assertEqual(output.status_code, 200)
        data = json.loads(output.get_data(as_text=True))
        self.assertDictEqual(
            data, {"message": 'Project "%s" created' % projectname}
        )


class FakeGroup(object):  # pylint: disable=too-few-public-methods
    """ Fake object used to make the FakeUser object closer to the
    expectations.
    """

    def __init__(self, name):
        """ Constructor.
        :arg name: the name given to the name attribute of this object.
        """
        self.name = name
        self.group_type = "cla"


class FakeUser(object):  # pylint: disable=too-few-public-methods
    """ Fake user used to test the fedocallib library. """

    def __init__(
        self, groups=None, username="username", cla_done=True, id=None
    ):
        """ Constructor.
        :arg groups: list of the groups in which this fake user is
            supposed to be.
        """
        if isinstance(groups, six.string_types):
            groups = [groups]
        self.id = id
        self.groups = groups or []
        self.user = username
        self.username = username
        self.name = username
        self.email = "foo@bar.com"
        self.default_email = "foo@bar.com"

        self.approved_memberships = [
            FakeGroup("packager"),
            FakeGroup("design-team"),
        ]
        self.dic = {}
        self.dic["timezone"] = "Europe/Paris"
        self.login_time = datetime.utcnow()
        self.cla_done = cla_done

    def __getitem__(self, key):
        return self.dic[key]


def create_locks(session, project):
    for ltype in ("WORKER", "WORKER_TICKET", "WORKER_REQUEST"):
        lock = pagure.lib.model.ProjectLock(
            project_id=project.id, lock_type=ltype
        )
        session.add(lock)


def create_projects(session, is_fork=False, user_id=1, hook_token_suffix=""):
    """ Create some projects in the database. """
    item = pagure.lib.model.Project(
        user_id=user_id,  # pingou
        name="test",
        is_fork=is_fork,
        parent_id=1 if is_fork else None,
        description="test project #1",
        hook_token="aaabbbccc" + hook_token_suffix,
    )
    item.close_status = ["Invalid", "Insufficient data", "Fixed", "Duplicate"]
    session.add(item)
    session.flush()
    create_locks(session, item)

    item = pagure.lib.model.Project(
        user_id=user_id,  # pingou
        name="test2",
        is_fork=is_fork,
        parent_id=2 if is_fork else None,
        description="test project #2",
        hook_token="aaabbbddd" + hook_token_suffix,
    )
    item.close_status = ["Invalid", "Insufficient data", "Fixed", "Duplicate"]
    session.add(item)
    session.flush()
    create_locks(session, item)

    item = pagure.lib.model.Project(
        user_id=user_id,  # pingou
        name="test3",
        is_fork=is_fork,
        parent_id=3 if is_fork else None,
        description="namespaced test project",
        hook_token="aaabbbeee" + hook_token_suffix,
        namespace="somenamespace",
    )
    item.close_status = ["Invalid", "Insufficient data", "Fixed", "Duplicate"]
    session.add(item)
    session.flush()
    create_locks(session, item)
    session.commit()


def create_projects_git(folder, bare=False):
    """ Create some projects in the database. """
    repos = []
    for project in [
        "test.git",
        "test2.git",
        os.path.join("somenamespace", "test3.git"),
    ]:
        repo_path = os.path.join(folder, project)
        repos.append(repo_path)
        if not os.path.exists(repo_path):
            os.makedirs(repo_path)
        pygit2.init_repository(repo_path, bare=bare)
    return repos


def create_tokens(session, user_id=1, project_id=1):
    """ Create some tokens for the project in the database. """
    item = pagure.lib.model.Token(
        id="aaabbbcccddd",
        user_id=user_id,
        project_id=project_id,
        expiration=datetime.utcnow() + timedelta(days=30),
    )
    session.add(item)

    item = pagure.lib.model.Token(
        id="foo_token",
        user_id=user_id,
        project_id=project_id,
        expiration=datetime.utcnow() + timedelta(days=30),
    )
    session.add(item)

    item = pagure.lib.model.Token(
        id="expired_token",
        user_id=user_id,
        project_id=project_id,
        expiration=datetime.utcnow() - timedelta(days=1),
    )
    session.add(item)
    session.commit()


def create_tokens_acl(session, token_id="aaabbbcccddd", acl_name=None):
    """ Create some ACLs for the token. If acl_name is not set, the token will
    have all the ACLs enabled.
    """
    if acl_name is None:
        for aclid in range(len(pagure_config["ACLS"])):
            token_acl = pagure.lib.model.TokenAcl(
                token_id=token_id, acl_id=aclid + 1
            )
            session.add(token_acl)
    else:
        acl = (
            session.query(pagure.lib.model.ACL).filter_by(name=acl_name).one()
        )
        token_acl = pagure.lib.model.TokenAcl(token_id=token_id, acl_id=acl.id)
        session.add(token_acl)

    session.commit()


def _clone_and_top_commits(folder, branch, branch_ref=False):
    """ Clone the repository, checkout the specified branch and return
    the top commit of that branch if there is one.
    Returns the repo, the path to the clone and the top commit(s) in a tuple
    or the repo, the path to the clone and the reference to the branch
    object if branch_ref is True.
    """
    if not os.path.exists(folder):
        os.makedirs(folder)
    brepo = pygit2.init_repository(folder, bare=True)

    newfolder = tempfile.mkdtemp(prefix="pagure-tests")
    repo = pygit2.clone_repository(folder, newfolder)

    branch_ref_obj = None
    if "origin/%s" % branch in repo.listall_branches(pygit2.GIT_BRANCH_ALL):
        branch_ref_obj = pagure.lib.git.get_branch_ref(repo, branch)
        repo.checkout(branch_ref_obj)

    if branch_ref:
        return (repo, newfolder, branch_ref_obj)

    parents = []
    commit = None
    try:
        if branch_ref_obj:
            commit = repo[branch_ref_obj.peel().hex]
        else:
            commit = repo.revparse_single("HEAD")
    except KeyError:
        pass
    if commit:
        parents = [commit.oid.hex]

    return (repo, newfolder, parents)


def add_content_git_repo(folder, branch="master", append=None):
    """ Create some content for the specified git repo. """
    repo, newfolder, parents = _clone_and_top_commits(folder, branch)

    # Create a file in that git repo
    filename = os.path.join(newfolder, "sources")
    content = "foo\n bar"
    if os.path.exists(filename):
        content = "foo\n bar\nbaz"
    if append:
        content += append
    with open(filename, "w") as stream:
        stream.write(content)
    repo.index.add("sources")
    repo.index.write()

    # Commits the files added
    tree = repo.index.write_tree()
    author = pygit2.Signature("Alice Author", "alice@authors.tld")
    committer = pygit2.Signature("Cecil Committer", "cecil@committers.tld")
    commit = repo.create_commit(
        "refs/heads/%s" % branch,  # the name of the reference to update
        author,
        committer,
        "Add sources file for testing",
        # binary string representing the tree object ID
        tree,
        # list of binary strings representing parents of the new commit
        parents,
    )

    if commit:
        parents = [commit.hex]

    subfolder = os.path.join("folder1", "folder2")
    if not os.path.exists(os.path.join(newfolder, subfolder)):
        os.makedirs(os.path.join(newfolder, subfolder))
    # Create a file in that git repo
    with open(os.path.join(newfolder, subfolder, "file"), "w") as stream:
        stream.write("foo\n bar\nbaz")
    repo.index.add(os.path.join(subfolder, "file"))
    with open(os.path.join(newfolder, subfolder, "fileŠ"), "w") as stream:
        stream.write("foo\n bar\nbaz")
    repo.index.add(os.path.join(subfolder, "fileŠ"))
    repo.index.write()

    # Commits the files added
    tree = repo.index.write_tree()
    author = pygit2.Signature("Alice Author", "alice@authors.tld")
    committer = pygit2.Signature("Cecil Committer", "cecil@committers.tld")
    commit = repo.create_commit(
        "refs/heads/%s" % branch,  # the name of the reference to update
        author,
        committer,
        "Add some directory and a file for more testing",
        # binary string representing the tree object ID
        tree,
        # list of binary strings representing parents of the new commit
        parents,
    )

    # Push to origin
    ori_remote = repo.remotes[0]
    master_ref = repo.lookup_reference(
        "HEAD" if branch == "master" else "refs/heads/%s" % branch
    ).resolve()
    refname = "%s:%s" % (master_ref.name, master_ref.name)

    PagureRepo.push(ori_remote, refname)

    shutil.rmtree(newfolder)


def add_readme_git_repo(folder, readme_name="README.rst", branch="master"):
    """ Create a README file for the specified git repo. """
    repo, newfolder, parents = _clone_and_top_commits(folder, branch)

    if readme_name == "README.rst":
        content = """Pagure
======

:Author: Pierre-Yves Chibon <pingou@pingoured.fr>


Pagure is a light-weight git-centered forge based on pygit2.

Currently, Pagure offers a web-interface for git repositories, a ticket
system and possibilities to create new projects, fork existing ones and
create/merge pull-requests across or within projects.


Homepage: https://github.com/pypingou/pagure

Dev instance: http://209.132.184.222/ (/!\\ May change unexpectedly, it's a dev instance ;-))
"""
    else:
        content = (
            """Pagure
======

This is a placeholder """
            + readme_name
            + """
that should never get displayed on the website if there is a README.rst in the repo.
"""
        )

    # Create a file in that git repo
    with open(os.path.join(newfolder, readme_name), "w") as stream:
        stream.write(content)
    repo.index.add(readme_name)
    repo.index.write()

    # Commits the files added
    tree = repo.index.write_tree()
    author = pygit2.Signature("Alice Author", "alice@authors.tld")
    committer = pygit2.Signature("Cecil Committer", "cecil@committers.tld")
    branch_ref = "refs/heads/%s" % branch
    repo.create_commit(
        branch_ref,  # the name of the reference to update
        author,
        committer,
        "Add a README file",
        # binary string representing the tree object ID
        tree,
        # list of binary strings representing parents of the new commit
        parents,
    )

    # Push to origin
    ori_remote = repo.remotes[0]

    PagureRepo.push(ori_remote, "%s:%s" % (branch_ref, branch_ref))

    shutil.rmtree(newfolder)


def add_commit_git_repo(
    folder, ncommits=10, filename="sources", branch="master", symlink_to=None
):
    """ Create some more commits for the specified git repo. """
    repo, newfolder, branch_ref_obj = _clone_and_top_commits(
        folder, branch, branch_ref=True
    )

    for index in range(ncommits):
        # Create a file in that git repo
        if symlink_to:
            os.symlink(symlink_to, os.path.join(newfolder, filename))
        else:
            with open(os.path.join(newfolder, filename), "a") as stream:
                stream.write("Row %s\n" % index)
        repo.index.add(filename)
        repo.index.write()

        parents = []
        commit = None
        try:
            if branch_ref_obj:
                commit = repo[branch_ref_obj.peel().hex]
            else:
                commit = repo.revparse_single("HEAD")
        except (KeyError, AttributeError):
            pass
        if commit:
            parents = [commit.oid.hex]

        # Commits the files added
        tree = repo.index.write_tree()
        author = pygit2.Signature("Alice Author", "alice@authors.tld")
        committer = pygit2.Signature("Cecil Committer", "cecil@committers.tld")
        branch_ref = "refs/heads/%s" % branch
        repo.create_commit(
            branch_ref,
            author,
            committer,
            "Add row %s to %s file" % (index, filename),
            # binary string representing the tree object ID
            tree,
            # list of binary strings representing parents of the new commit
            parents,
        )
        branch_ref_obj = pagure.lib.git.get_branch_ref(repo, branch)

    # Push to origin
    ori_remote = repo.remotes[0]
    PagureRepo.push(ori_remote, "%s:%s" % (branch_ref, branch_ref))

    shutil.rmtree(newfolder)


def add_tag_git_repo(folder, tagname, obj_hash, message):
    """ Add a tag to the given object of the given repo annotated by given message. """
    repo, newfolder, branch_ref_obj = _clone_and_top_commits(
        folder, "master", branch_ref=True
    )

    tag_sha = repo.create_tag(
        tagname,
        obj_hash,
        repo.get(obj_hash).type,
        pygit2.Signature("Alice Author", "alice@authors.tld"),
        message,
    )

    # Push to origin
    ori_remote = repo.remotes[0]
    PagureRepo.push(
        ori_remote, "refs/tags/%s:refs/tags/%s" % (tagname, tagname)
    )

    shutil.rmtree(newfolder)
    return tag_sha


def add_content_to_git(
    folder,
    branch="master",
    filename="sources",
    content="foo",
    message=None,
    author=("Alice Author", "alice@authors.tld"),
    commiter=("Cecil Committer", "cecil@committers.tld"),
):
    """ Create some more commits for the specified git repo. """
    repo, newfolder, branch_ref_obj = _clone_and_top_commits(
        folder, branch, branch_ref=True
    )

    # Create a file in that git repo
    with open(
        os.path.join(newfolder, filename), "a", encoding="utf-8"
    ) as stream:
        stream.write("%s\n" % content)
    repo.index.add(filename)
    repo.index.write()

    parents = []
    commit = None
    try:
        if branch_ref_obj:
            commit = repo[branch_ref_obj.peel().hex]
        else:
            commit = repo.revparse_single("HEAD")
    except (KeyError, AttributeError):
        pass
    if commit:
        parents = [commit.oid.hex]

    # Commits the files added
    tree = repo.index.write_tree()
    author = pygit2.Signature(*author)
    committer = pygit2.Signature(*commiter)
    branch_ref = "refs/heads/%s" % branch
    message = message or "Add content to file %s" % (filename)
    repo.create_commit(
        branch_ref,  # the name of the reference to update
        author,
        committer,
        message,
        # binary string representing the tree object ID
        tree,
        # list of binary strings representing parents of the new commit
        parents,
    )

    # Push to origin
    ori_remote = repo.remotes[0]
    PagureRepo.push(ori_remote, "%s:%s" % (branch_ref, branch_ref))

    shutil.rmtree(newfolder)


def add_binary_git_repo(folder, filename):
    """ Create a fake image file for the specified git repo. """
    repo, newfolder, parents = _clone_and_top_commits(folder, "master")

    content = b"""\x00\x00\x01\x00\x01\x00\x18\x18\x00\x00\x01\x00 \x00\x88
\t\x00\x00\x16\x00\x00\x00(\x00\x00\x00\x18\x00x00\x00\x01\x00 \x00\x00\x00
\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00
00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\xa7lM\x01\xa6kM\t\xa6kM\x01
\xa4fF\x04\xa2dE\x95\xa2cD8\xa1a
"""

    # Create a file in that git repo
    with open(os.path.join(newfolder, filename), "wb") as stream:
        stream.write(content)
    repo.index.add(filename)
    repo.index.write()

    # Commits the files added
    tree = repo.index.write_tree()
    author = pygit2.Signature("Alice Author", "alice@authors.tld")
    committer = pygit2.Signature("Cecil Committer", "cecil@committers.tld")
    repo.create_commit(
        "refs/heads/master",  # the name of the reference to update
        author,
        committer,
        "Add a fake image file",
        # binary string representing the tree object ID
        tree,
        # list of binary strings representing parents of the new commit
        parents,
    )

    # Push to origin
    ori_remote = repo.remotes[0]
    master_ref = repo.lookup_reference("HEAD").resolve()
    refname = "%s:%s" % (master_ref.name, master_ref.name)

    PagureRepo.push(ori_remote, refname)

    shutil.rmtree(newfolder)


def remove_file_git_repo(folder, filename, branch="master"):
    """ Delete the specified file on the give git repo and branch. """
    repo, newfolder, parents = _clone_and_top_commits(folder, branch)

    # Remove file
    repo.index.remove(filename)

    # Write the change and commit it
    tree = repo.index.write_tree()

    author = pygit2.Signature("Alice Author", "alice@authors.tld")
    committer = pygit2.Signature("Cecil Committer", "cecil@committers.tld")
    branch_ref = "refs/heads/%s" % branch
    repo.create_commit(
        branch_ref,  # the name of the reference to update
        author,
        committer,
        "Remove file %s" % filename,
        # binary string representing the tree object ID
        tree,
        # list of binary strings representing parents of the new commit
        parents,
    )

    # Push to origin
    ori_remote = repo.remotes[0]

    PagureRepo.push(ori_remote, "%s:%s" % (branch_ref, branch_ref))

    shutil.rmtree(newfolder)


def add_pull_request_git_repo(
    folder,
    session,
    repo,
    fork,
    branch_from="feature",
    user="pingou",
    allow_rebase=False,
):
    """ Set up the git repo and create the corresponding PullRequest
        object.
        """

    # Clone the main repo
    gitrepo = os.path.join(folder, "repos", repo.path)
    newpath = tempfile.mkdtemp(prefix="pagure-fork-test")
    repopath = os.path.join(newpath, "test")
    clone_repo = pygit2.clone_repository(gitrepo, repopath)

    # Create a file in that git repo
    with open(os.path.join(repopath, "sources"), "w") as stream:
        stream.write("foo\n bar")
    clone_repo.index.add("sources")
    clone_repo.index.write()

    # Commits the files added
    tree = clone_repo.index.write_tree()
    author = pygit2.Signature("Alice Author", "alice@authors.tld")
    committer = pygit2.Signature("Cecil Committer", "cecil@committers.tld")
    clone_repo.create_commit(
        "refs/heads/master",  # the name of the reference to update
        author,
        committer,
        "Add sources file for testing",
        # binary string representing the tree object ID
        tree,
        # list of binary strings representing parents of the new commit
        [],
    )
    refname = "refs/heads/master:refs/heads/master"
    ori_remote = clone_repo.remotes[0]
    PagureRepo.push(ori_remote, refname)

    first_commit = clone_repo.revparse_single("HEAD")

    # Set the second repo
    repopath = os.path.join(folder, "repos", fork.path)
    new_gitrepo = os.path.join(newpath, "fork_test")
    clone_repo = pygit2.clone_repository(repopath, new_gitrepo)

    # Add the main project as remote repo
    upstream_path = os.path.join(folder, "repos", repo.path)
    remote = clone_repo.create_remote("upstream", upstream_path)
    remote.fetch()

    # Edit the sources file again
    with open(os.path.join(new_gitrepo, "sources"), "w") as stream:
        stream.write("foo\n bar\nbaz\n boose")
    clone_repo.index.add("sources")
    clone_repo.index.write()

    # Commits the files added
    tree = clone_repo.index.write_tree()
    author = pygit2.Signature("Alice Author", "alice@authors.tld")
    committer = pygit2.Signature("Cecil Committer", "cecil@committers.tld")
    clone_repo.create_commit(
        "refs/heads/%s" % branch_from,
        author,
        committer,
        "A commit on branch %s" % branch_from,
        tree,
        [first_commit.oid.hex],
    )
    refname = "refs/heads/%s" % (branch_from)
    ori_remote = clone_repo.remotes[0]
    PagureRepo.push(ori_remote, refname)

    # Create a PR for these changes
    project = pagure.lib.query.get_authorized_project(session, "test")
    req = pagure.lib.query.new_pull_request(
        session=session,
        repo_from=fork,
        branch_from=branch_from,
        repo_to=project,
        branch_to="master",
        title="PR from the %s branch" % branch_from,
        allow_rebase=allow_rebase,
        user=user,
    )
    session.commit()

    return req


def clean_pull_requests_path():
    newpath = tempfile.mkdtemp(prefix="pagure-fork-test")
    shutil.rmtree(newpath)


@contextmanager
def capture_output(merge_stderr=True):
    oldout, olderr = sys.stdout, sys.stderr
    try:
        out = StringIO()
        err = StringIO()
        if merge_stderr:
            sys.stdout = sys.stderr = out
            yield out
        else:
            sys.stdout, sys.stderr = out, err
            yield out, err
    finally:
        sys.stdout, sys.stderr = oldout, olderr


def get_alerts(html):
    soup = BeautifulSoup(html, "html.parser")
    alerts = []
    for element in soup.find_all("div", class_="alert"):
        severity = None
        for class_ in element["class"]:
            if not class_.startswith("alert-"):
                continue
            if class_ == "alert-dismissible":
                continue
            severity = class_[len("alert-") :]
            break
        element.find("button").decompose()  # close button
        alerts.append(
            dict(severity=severity, text="".join(element.stripped_strings))
        )
    return alerts


def definitely_wait(result):
    """ Helper function for definitely waiting in _maybe_wait. """
    result.wait()


if __name__ == "__main__":
    SUITE = unittest.TestLoader().loadTestsFromTestCase(Modeltests)
    unittest.TextTestRunner(verbosity=2).run(SUITE)