Blob Blame Raw
# -*- coding: utf-8 -*-

"""
 (c) 2017 - Copyright Red Hat Inc

 Authors:
   Pierre-Yves Chibon <pingou@pingoured.fr>

"""
from __future__ import print_function

import argparse
import datetime
import logging
import os
import sys

if 'PAGURE_CONFIG' not in os.environ \
        and os.path.exists('/etc/pagure/pagure.cfg'):
    print('Using configuration file `/etc/pagure/pagure.cfg`')
    os.environ['PAGURE_CONFIG'] = '/etc/pagure/pagure.cfg'

import pagure.exceptions  # noqa
import pagure.lib  # noqa
import pagure.lib.git  # noqa
from pagure import (SESSION, APP, generate_user_key_files)  # noqa


_log = logging.getLogger(__name__)


def _parser_refresh_gitolite(subparser):
    """ Set up the CLI argument parser for the refresh-gitolite action. """
    local_parser = subparser.add_parser(
        'refresh-gitolite',
        help='Re-generate the gitolite config file')
    local_parser.set_defaults(func=do_generate_acl)


def _parser_refresh_ssh(subparser):
    """ Set up the CLI argument parser for the refresh-ssh action. """
    local_parser = subparser.add_parser(
        'refresh-ssh',
        help="Re-write to disk every user's ssh key stored in the database")
    local_parser.set_defaults(func=do_refresh_ssh)


def _parser_clear_hook_token(subparser):
    """ Set up the CLI argument parser for the clear-hook-token action. """
    local_parser = subparser.add_parser(
        'clear-hook-token',
        help='Generate a new hook token for every project in this instance')
    local_parser.set_defaults(func=do_generate_hook_token)


def _parser_admin_token_list(subparser):
    """ Set up the CLI argument parser for the admin-token list action. """
    local_parser = subparser.add_parser(
        'list',  help="List the API admin token")
    local_parser.add_argument(
        '--user',
        help="User to associate or associated with the token")
    local_parser.add_argument(
        '--token', help="API token")
    local_parser.add_argument(
        '--active', default=False, action='store_true',
        help="Only list active API token")
    local_parser.add_argument(
        '--expired', default=False, action='store_true',
        help="Only list expired API token")
    local_parser.set_defaults(func=do_list_admin_token)


def _parser_admin_token_info(subparser):
    """ Set up the CLI argument parser for the admin-token info action. """
    local_parser = subparser.add_parser(
        'info',  help="Provide some information about a specific API token")
    local_parser.add_argument(
        'token', help="API token")
    local_parser.set_defaults(func=do_info_admin_token)


def _parser_admin_token_expire(subparser):
    """ Set up the CLI argument parser for the admin-token expire action. """
    # Expire admin token
    local_parser = subparser.add_parser(
        'expire',  help="Expire a specific API token")
    local_parser.add_argument(
        'token', help="API token")
    local_parser.set_defaults(func=do_expire_admin_token)


def _parser_admin_token_create(subparser):
    """ Set up the CLI argument parser for the admin-token create action. """
    # Create admin token
    local_parser = subparser.add_parser(
        'create',  help="Create a new API token")
    local_parser.add_argument(
        'user', help="User to associate with the token")
    local_parser.set_defaults(func=do_create_admin_token)


def _parser_admin_token(subparser):
    """ Set up the CLI argument parser for the admin-token action. """
    local_parser = subparser.add_parser(
        'admin-token',
        help='Manages the admin tokens for this instance')

    subsubparser = local_parser.add_subparsers(title='actions')

    # list
    _parser_admin_token_list(subsubparser)
    # info
    _parser_admin_token_info(subsubparser)
    # expire
    _parser_admin_token_expire(subsubparser)
    # create
    _parser_admin_token_create(subsubparser)


def parse_arguments():
    """ Set-up the argument parsing. """
    parser = argparse.ArgumentParser(
        description='The admin CLI for this pagure instance')

    parser.add_argument(
        '--debug', default=False, action='store_true',
        help='Increase the verbosity of the information displayed')
    parser.set_defaults(func=lambda a, k: print(parser.format_help()))

    subparser = parser.add_subparsers(title='actions')

    # refresh-gitolite
    _parser_refresh_gitolite(subparser)

    # refresh-ssh
    _parser_refresh_ssh(subparser)

    # clear-hook-token
    _parser_clear_hook_token(subparser)

    # Admin token actions
    _parser_admin_token(subparser)

    return parser.parse_args()


def _ask_confirmation():
    ''' Ask to confirm an action.
    '''
    action = raw_input('Do you want to continue? [y/N]')
    return action.lower() in ['y', 'yes']


def _get_input(text):
    ''' Ask the user for input. '''
    return raw_input(text)


def do_generate_acl(_):
    """ Regenerate the gitolite ACL file.


    :arg _: the argparse object returned by ``parse_arguments()``, which is
        ignored as there are no argument to pass to this action.

    """
    cmd = pagure.lib.git._get_gitolite_command()
    if not cmd:
        raise pagure.exceptions.PagureException(
            '/!\ un-able to generate the right gitolite command')
    print(
        'Do you want to re-generate the gitolite.conf file then '
        'calling: %s' % cmd)
    if _ask_confirmation():
        pagure.lib.git.generate_gitolite_acls()
        print('Gitolite ACLs updated')


def do_refresh_ssh(_):
    """ Regenerate the user key files.

    :arg _: the argparse object returned by ``parse_arguments()``, which is
        ignored as there are no argument to pass to this action.

    """
    print(
        'Do you want to re-generate all the ssh keys for every user in '
        'the database? (Depending on your instance this may take a while '
        'and result in an outage while it lasts)')
    if _ask_confirmation():
        generate_user_key_files()
        print('User key files regenerated')
        do_generate_acl()


def do_generate_hook_token(_):
    """ Regenerate the hook_token for each projects in the DB.

    :arg _: the argparse object returned by ``parse_arguments()``, which is
        ignored as there are no argument to pass to this action.

    """
    print(
        'Do you want to re-generate all the hook token for every user in '
        'the database? This will break every web-hook set-up on this '
        'instance. You should only ever run this for a security issue')
    if _ask_confirmation():
        pagure.lib.generate_hook_token(SESSION)
        print('Hook token all re-generated')


def do_list_admin_token(args):
    """ List the admin token.

    :arg args: the argparse object returned by ``parse_arguments()``.

    """
    _log.debug('user:           %s', args.user)
    _log.debug('token:          %s', args.token)
    _log.debug('active:         %s', args.active)
    _log.debug('expire:         %s', args.expired)

    acls = APP.config['ADMIN_API_ACLS']
    tokens = pagure.lib.search_token(
        SESSION, acls, active=args.active, expired=args.expired)
    for token in tokens:
        print('%s -- %s -- %s' % (
            token.id, token.user.user, token.expiration))
    if not tokens:
        print('No admin tokens found')


def do_info_admin_token(args):
    """ Print out information about the specified API token.

    :arg args: the argparse object returned by ``parse_arguments()``.

    """
    _log.debug('token:          %s', args.token)

    acls = APP.config['ADMIN_API_ACLS']
    token = pagure.lib.search_token(SESSION, acls, token=args.token)
    if not token:
        raise pagure.exceptions.PagureException('No such admin token found')

    print('%s -- %s -- %s' % (
        token.id, token.user.user, token.expiration))
    print('ACLs:')
    for acl in token.acls:
        print('  - %s' % acl.name)


def do_expire_admin_token(args):
    """ Expire a specific admin token.

    :arg args: the argparse object returned by ``parse_arguments()``.

    """
    _log.debug('token:          %s', args.token)

    acls = APP.config['ADMIN_API_ACLS']
    token = pagure.lib.search_token(SESSION, acls, token=args.token)
    if not token:
        raise pagure.exceptions.PagureException('No such admin token found')

    print('%s -- %s -- %s' % (
        token.id, token.user.user, token.expiration))
    print('ACLs:')
    for acl in token.acls:
        print('  - %s' % acl.name)

    print('Do you really want to expire this API token?')
    if _ask_confirmation():
        token.expiration = datetime.datetime.utcnow()
        SESSION.add(token)
        SESSION.commit()
        print('Token expired')


def do_create_admin_token(args):
    """ Create a new admin token.

    :arg args: the argparse object returned by ``parse_arguments()``.

    """
    _log.debug('user:          %s', args.user)
    # Validate user first
    pagure.lib.get_user(SESSION, args.user)

    acls_list = APP.config['ADMIN_API_ACLS']
    for idx, acl in enumerate(acls_list):
        print('%s.  %s' % (idx, acl))

    print('Which ACLs do you want to associated with this token?')
    acls = _get_input('(Coma separated list): ')
    acls_idx = [int(acl.strip()) for acl in acls.split(',')]
    acls = [acls_list[acl] for acl in acls_idx]

    print('ACLs selected:')
    for idx, acl in enumerate(acls_idx):
        print('%s.  %s' % (acls_idx[idx], acls[idx]))

    print('Do you want to create this API token?')
    if _ask_confirmation():
        print(pagure.lib.add_token_to_user(SESSION, None, acls, args.user))


def main():
    """ Start of the application. """

    # Parse the arguments
    args = parse_arguments()

    logging.basicConfig()
    if args.debug:
        _log.setLevel(logging.DEBUG)

    # Act based on the arguments given
    return_code = 0
    try:
        args.func(args)
    except KeyboardInterrupt:
        print("\nInterrupted by user.")
        return_code = 1
    except pagure.exceptions.PagureException as err:
        print(err)
        return_code = 3
    except Exception as err:
        print('Error: {0}'.format(err))
        logging.exception("Generic error catched:")
        return_code = 2

    return return_code


if __name__ == '__main__':
    sys.exit(main())