# -*- coding: utf-8 -*-
"""
(c) 2017 - Copyright Red Hat Inc
Authors:
Pierre-Yves Chibon <pingou@pingoured.fr>
"""
from __future__ import print_function
import argparse
import datetime
import logging
import os
import sys
if 'PAGURE_CONFIG' not in os.environ \
and os.path.exists('/etc/pagure/pagure.cfg'):
print('Using configuration file `/etc/pagure/pagure.cfg`')
os.environ['PAGURE_CONFIG'] = '/etc/pagure/pagure.cfg'
import pagure.exceptions # noqa: E402
import pagure.lib # noqa: E402
import pagure.lib.git # noqa: E402
import pagure.lib.tasks # noqa: E402
from pagure import (SESSION, APP, generate_user_key_files) # noqa: E402
_log = logging.getLogger(__name__)
WATCH = {
'-1': 'reset the watch status to default',
'0': 'unwatch, don\'t notify the user of anything',
'1': 'watch issues and PRs',
'2': 'watch commits',
'3': 'watch issues, PRs and commits',
}
def _parser_refresh_gitolite(subparser):
""" Set up the CLI argument parser for the refresh-gitolite action. """
local_parser = subparser.add_parser(
'refresh-gitolite',
help='Re-generate the gitolite config file')
local_parser.add_argument(
'--user', help="User of the project (to use only on forks)")
local_parser.add_argument(
'--project', help="Project to update (as namespace/project if there "
"is a namespace)")
local_parser.add_argument(
'--group', help="Group to refresh")
local_parser.add_argument(
'--all', dest="all_", default=False, action='store_true',
help="Refresh all the projects")
local_parser.set_defaults(func=do_generate_acl)
def _parser_refresh_ssh(subparser):
""" Set up the CLI argument parser for the refresh-ssh action. """
local_parser = subparser.add_parser(
'refresh-ssh',
help="Re-write to disk every user's ssh key stored in the database")
local_parser.set_defaults(func=do_refresh_ssh)
def _parser_clear_hook_token(subparser):
""" Set up the CLI argument parser for the clear-hook-token action. """
local_parser = subparser.add_parser(
'clear-hook-token',
help='Generate a new hook token for every project in this instance')
local_parser.set_defaults(func=do_generate_hook_token)
def _parser_admin_token_list(subparser):
""" Set up the CLI argument parser for the admin-token list action. """
local_parser = subparser.add_parser(
'list', help="List the API admin token")
local_parser.add_argument(
'--user',
help="User to associate or associated with the token")
local_parser.add_argument(
'--token', help="API token")
local_parser.add_argument(
'--active', default=False, action='store_true',
help="Only list active API token")
local_parser.add_argument(
'--expired', default=False, action='store_true',
help="Only list expired API token")
local_parser.set_defaults(func=do_list_admin_token)
def _parser_admin_token_info(subparser):
""" Set up the CLI argument parser for the admin-token info action. """
local_parser = subparser.add_parser(
'info', help="Provide some information about a specific API token")
local_parser.add_argument(
'token', help="API token")
local_parser.set_defaults(func=do_info_admin_token)
def _parser_admin_token_expire(subparser):
""" Set up the CLI argument parser for the admin-token expire action. """
# Expire admin token
local_parser = subparser.add_parser(
'expire', help="Expire a specific API token")
local_parser.add_argument(
'token', help="API token")
local_parser.set_defaults(func=do_expire_admin_token)
def _parser_admin_token_create(subparser):
""" Set up the CLI argument parser for the admin-token create action. """
# Create admin token
local_parser = subparser.add_parser(
'create', help="Create a new API token")
local_parser.add_argument(
'user', help="User to associate with the token")
local_parser.set_defaults(func=do_create_admin_token)
def _parser_admin_token(subparser):
""" Set up the CLI argument parser for the admin-token action. """
local_parser = subparser.add_parser(
'admin-token',
help='Manages the admin tokens for this instance')
subsubparser = local_parser.add_subparsers(title='actions')
# list
_parser_admin_token_list(subsubparser)
# info
_parser_admin_token_info(subsubparser)
# expire
_parser_admin_token_expire(subsubparser)
# create
_parser_admin_token_create(subsubparser)
def _parser_get_watch(subparser):
""" Set up the CLI argument parser for the get-watch action. """
# Update watch status
local_parser = subparser.add_parser(
'get-watch', help="Get someone's watch status on a project")
local_parser.add_argument(
'project', help="Project (as namespace/project if there "
"is a namespace) -- Fork not supported")
local_parser.add_argument(
'user', help="User to get the watch status of")
local_parser.set_defaults(func=do_get_watch_status)
def _parser_update_watch(subparser):
""" Set up the CLI argument parser for the update-watch action. """
# Update watch status
local_parser = subparser.add_parser(
'update-watch', help="Update someone's watch status on a project")
local_parser.add_argument(
'project', help="Project to update (as namespace/project if there "
"is a namespace) -- Fork not supported")
local_parser.add_argument(
'user', help="User to update the watch status of")
local_parser.add_argument(
'-s', '--status', help="Watch status to update to")
local_parser.set_defaults(func=do_update_watch_status)
def _parser_read_only(subparser):
""" Set up the CLI argument parser for the refresh-gitolite action. """
local_parser = subparser.add_parser(
'read-only',
help='Get or set the read-only flag on a project')
local_parser.add_argument(
'--user', help="User of the project (to use only on forks)")
local_parser.add_argument(
'project', help="Project to update (as namespace/project if there "
"is a namespace)")
local_parser.add_argument(
'--ro',
help="Read-Only status to set (has to be: true or false), do not "
"specify to get the current status")
local_parser.set_defaults(func=do_read_only)
def parse_arguments():
""" Set-up the argument parsing. """
parser = argparse.ArgumentParser(
description='The admin CLI for this pagure instance')
parser.add_argument(
'--debug', default=False, action='store_true',
help='Increase the verbosity of the information displayed')
subparser = parser.add_subparsers(title='actions')
# refresh-gitolite
_parser_refresh_gitolite(subparser)
# refresh-ssh
_parser_refresh_ssh(subparser)
# clear-hook-token
_parser_clear_hook_token(subparser)
# Admin token actions
_parser_admin_token(subparser)
# get-watch
_parser_get_watch(subparser)
# update-watch
_parser_update_watch(subparser)
# read-only
_parser_read_only(subparser)
return parser.parse_args()
def _ask_confirmation():
''' Ask to confirm an action.
'''
action = raw_input('Do you want to continue? [y/N]')
return action.lower() in ['y', 'yes']
def _get_input(text):
''' Ask the user for input. '''
return raw_input(text)
def _get_project(arg_project, user=None):
''' From the project specified to the CLI, extract the actual project.
'''
namespace = None
if '/' in arg_project:
if arg_project.count('/') > 1:
raise pagure.exceptions.PagureException(
'Invalid project name, has more than one "/": %s' %
arg_project)
namespace, name = arg_project.split('/')
else:
name = arg_project
return pagure.lib._get_project(
SESSION, namespace=namespace, name=name, user=user)
def do_generate_acl(args):
""" Regenerate the gitolite ACL file.
:arg args: the argparse object returned by ``parse_arguments()``.
"""
_log.debug('group: %s', args.group)
_log.debug('project: %s', args.project)
_log.debug('user: %s', args.user)
_log.debug('all: %s', args.all_)
title = None
project = None
if args.project:
project = _get_project(args.project, user=args.user)
title = project.fullname
if args.all_:
title = 'all'
project = -1
if not args.all_ and not args.project:
print(
'Please note that you have not selected a project or --all. '
'Do you want to recompile the existing config file?')
if not _ask_confirmation():
return
helper = pagure.lib.git_auth.get_git_auth_helper(
APP.config['GITOLITE_BACKEND'])
_log.debug('Got helper: %s', helper)
group_obj = None
if args.group:
group_obj = pagure.lib.search_groups(SESSION, group_name=args.group)
_log.debug(
'Calling helper: %s with arg: project=%s, group=%s',
helper, project, group_obj)
print(
'Do you want to re-generate the gitolite.conf file for group: %s '
'and project: %s?' % (group_obj, title))
if _ask_confirmation():
helper.generate_acls(project=project, group=group_obj)
pagure.lib.tasks.gc_clean()
print('Gitolite ACLs updated')
def do_refresh_ssh(_):
""" Regenerate the user key files.
:arg _: the argparse object returned by ``parse_arguments()``, which is
ignored as there are no argument to pass to this action.
"""
print(
'Do you want to re-generate all the ssh keys for every user in '
'the database? (Depending on your instance this may take a while '
'and result in an outage while it lasts)')
if _ask_confirmation():
generate_user_key_files()
print('User key files regenerated')
do_generate_acl()
def do_generate_hook_token(_):
""" Regenerate the hook_token for each projects in the DB.
:arg _: the argparse object returned by ``parse_arguments()``, which is
ignored as there are no argument to pass to this action.
"""
print(
'Do you want to re-generate all the hook token for every user in '
'the database? This will break every web-hook set-up on this '
'instance. You should only ever run this for a security issue')
if _ask_confirmation():
pagure.lib.generate_hook_token(SESSION)
print('Hook token all re-generated')
def do_list_admin_token(args):
""" List the admin token.
:arg args: the argparse object returned by ``parse_arguments()``.
"""
_log.debug('user: %s', args.user)
_log.debug('token: %s', args.token)
_log.debug('active: %s', args.active)
_log.debug('expire: %s', args.expired)
acls = APP.config['ADMIN_API_ACLS']
tokens = pagure.lib.search_token(
SESSION, acls,
user=args.user,
active=args.active,
expired=args.expired)
for token in tokens:
print('%s -- %s -- %s' % (
token.id, token.user.user, token.expiration))
if not tokens:
print('No admin tokens found')
def do_info_admin_token(args):
""" Print out information about the specified API token.
:arg args: the argparse object returned by ``parse_arguments()``.
"""
_log.debug('token: %s', args.token)
acls = APP.config['ADMIN_API_ACLS']
token = pagure.lib.search_token(SESSION, acls, token=args.token)
if not token:
raise pagure.exceptions.PagureException('No such admin token found')
print('%s -- %s -- %s' % (
token.id, token.user.user, token.expiration))
print('ACLs:')
for acl in token.acls:
print(' - %s' % acl.name)
def do_expire_admin_token(args):
""" Expire a specific admin token.
:arg args: the argparse object returned by ``parse_arguments()``.
"""
_log.debug('token: %s', args.token)
acls = APP.config['ADMIN_API_ACLS']
token = pagure.lib.search_token(SESSION, acls, token=args.token)
if not token:
raise pagure.exceptions.PagureException('No such admin token found')
print('%s -- %s -- %s' % (
token.id, token.user.user, token.expiration))
print('ACLs:')
for acl in token.acls:
print(' - %s' % acl.name)
print('Do you really want to expire this API token?')
if _ask_confirmation():
token.expiration = datetime.datetime.utcnow()
SESSION.add(token)
SESSION.commit()
print('Token expired')
def do_create_admin_token(args):
""" Create a new admin token.
:arg args: the argparse object returned by ``parse_arguments()``.
"""
_log.debug('user: %s', args.user)
# Validate user first
pagure.lib.get_user(SESSION, args.user)
acls_list = APP.config['ADMIN_API_ACLS']
for idx, acl in enumerate(acls_list):
print('%s. %s' % (idx, acl))
print('Which ACLs do you want to associated with this token?')
acls = _get_input('(Coma separated list): ')
acls_idx = [int(acl.strip()) for acl in acls.split(',')]
acls = [acls_list[acl] for acl in acls_idx]
print('ACLs selected:')
for idx, acl in enumerate(acls_idx):
print('%s. %s' % (acls_idx[idx], acls[idx]))
print('Do you want to create this API token?')
if _ask_confirmation():
print(pagure.lib.add_token_to_user(SESSION, None, acls, args.user))
def do_get_watch_status(args):
""" Get the watch status of an user on a project.
:arg args: the argparse object returned by ``parse_arguments()``.
"""
_log.debug('user: %s', args.user)
_log.debug('project: %s', args.project)
# Validate user
pagure.lib.get_user(SESSION, args.user)
# Get the project
project = _get_project(args.project)
if project is None:
raise pagure.exceptions.PagureException(
'No project found with: %s' % args.project)
level = pagure.lib.get_watch_level_on_repo(
session=SESSION,
user=args.user,
repo=project.name,
repouser=None,
namespace=project.namespace) or []
# Specify that issues == 'issues & PRs'
if 'issues' in level:
level.append('pull-requests')
print('On %s user: %s is watching the following items: %s' % (
project.fullname, args.user, ', '.join(level) or None))
def do_update_watch_status(args):
""" Update the watch status of an user on a project.
:arg args: the argparse object returned by ``parse_arguments()``.
"""
_log.debug('user: %s', args.user)
_log.debug('status: %s', args.status)
_log.debug('project: %s', args.project)
# Validate user
pagure.lib.get_user(SESSION, args.user)
# Ask the status if none were given
if args.status is None:
print('The watch status can be one of the following: ')
for lvl in WATCH:
print('%s: %s' % (lvl, WATCH[lvl]))
args.status = _get_input('Status:')
# Validate the status
if args.status not in WATCH:
raise pagure.exceptions.PagureException(
'Invalid status provided: %s not in %s' % (
args.status, ', '.join(sorted(WATCH.keys()))))
# Get the project
project = _get_project(args.project)
if project is None:
raise pagure.exceptions.PagureException(
'No project found with: %s' % args.project)
print('Updating watch status of %s to %s (%s) on %s' % (
args.user, args.status, WATCH[args.status], args.project))
pagure.lib.update_watch_status(
session=SESSION,
project=project,
user=args.user,
watch=args.status)
SESSION.commit()
def do_read_only(args):
""" Set or update the read-only status of a project.
:arg args: the argparse object returned by ``parse_arguments()``.
"""
_log.debug('project: %s', args.project)
_log.debug('user: %s', args.user)
_log.debug('read-only: %s', args.ro)
# Validate user
pagure.lib.get_user(SESSION, args.user)
# Get the project
project = _get_project(args.project)
if project is None:
raise pagure.exceptions.PagureException(
'No project found with: %s' % args.project)
# Validate ro flag
if args.ro and args.ro.lower() not in ['true', 'false']:
raise pagure.exceptions.PagureException(
'Invalid read-only status specified: %s is not in: '
'true, false' % args.ro.lower())
if not args.ro:
print(
'The current read-only flag of the project %s is set to %s' % (
project.fullname, project.read_only))
else:
pagure.lib.update_read_only_mode(
SESSION, project, read_only=(args.ro.lower() == 'true')
)
SESSION.commit()
print(
'The read-only flag of the project %s has been set to %s' % (
project.fullname, args.ro.lower() == 'true'))
def main():
""" Start of the application. """
# Parse the arguments
args = parse_arguments()
logging.basicConfig()
if args.debug:
_log.setLevel(logging.DEBUG)
# Act based on the arguments given
return_code = 0
try:
args.func(args)
except KeyboardInterrupt:
print("\nInterrupted by user.")
return_code = 1
except pagure.exceptions.PagureException as err:
print(err)
return_code = 3
except Exception as err:
print('Error: {0}'.format(err))
logging.exception("Generic error catched:")
return_code = 2
return return_code
if __name__ == '__main__':
sys.exit(main())