diff --git a/pagure-logcom/README.rst b/pagure-logcom/README.rst deleted file mode 100644 index b60fb19..0000000 --- a/pagure-logcom/README.rst +++ /dev/null @@ -1,12 +0,0 @@ -Pagure LogCom -============= - -This is the service logging in the user's commits to be displayed in the -database. -This service is triggered by a git hook, sending a notification that a push -happened. This service receive the notification and goes over all the commit -that got pushed and logs the activity corresponding to that user. - - * Run:: - - PAGURE_CONFIG=/path/to/config PYTHONPATH=. python pagure-logcom/pagure_logcom_server.py diff --git a/pagure-logcom/pagure_logcom_server.py b/pagure-logcom/pagure_logcom_server.py deleted file mode 100644 index ce45ef6..0000000 --- a/pagure-logcom/pagure_logcom_server.py +++ /dev/null @@ -1,175 +0,0 @@ -#!/usr/bin/env python -# -*- coding: utf-8 -*- - -""" - (c) 2016-2017 - Copyright Red Hat Inc - - Authors: - Pierre-Yves Chibon - - -This server listens to message sent via redis post commits and log the -user's activity in the database. - -Using this mechanism, we no longer need to block the git push until all the -activity has been logged (which is you push the kernel tree for the first -time can be really time-consuming). - -""" - -from __future__ import print_function -import json -import logging -import os -from sqlalchemy.exc import SQLAlchemyError - -import trollius -import trollius_redis - -import pagure -import pagure.lib - - -if 'PAGURE_CONFIG' not in os.environ \ - and os.path.exists('/etc/pagure/pagure.cfg'): - print('Using configuration file `/etc/pagure/pagure.cfg`') - os.environ['PAGURE_CONFIG'] = '/etc/pagure/pagure.cfg' - -_config = pagure.config.config.reload_config() -_log = logging.getLogger(__name__) - - -@trollius.coroutine -def handle_messages(): - ''' Handles connecting to redis and acting upon messages received. - In this case, it means logging into the DB the commits specified in the - message for the default repo or sending commit notification emails. - - The currently accepted message format looks like: - - :: - - { - "project": { - "name": "foo", - "namespace": null, - "parent": null, - "username": { - "name": "user" - } - }, - "abspath": "/srv/git/repositories/pagure.git", - "commits": [ - "b7b4059c44d692d7df3227ce58ce01191e5407bd", - "f8d0899bb6654590ffdef66b539fd3b8cf873b35", - "9b6fdc48d3edab82d3de28953271ea52b0a96117" - ], - "branch": "master", - "default_branch": "master" - } - - ''' - - host = _config.get('REDIS_HOST', '0.0.0.0') - port = _config.get('REDIS_PORT', 6379) - dbname = _config.get('REDIS_DB', 0) - connection = yield trollius.From(trollius_redis.Connection.create( - host=host, port=port, db=dbname)) - - # Create subscriber. - subscriber = yield trollius.From(connection.start_subscribe()) - - # Subscribe to channel. - yield trollius.From(subscriber.subscribe(['pagure.logcom'])) - - # Inside a while loop, wait for incoming events. - while True: - reply = yield trollius.From(subscriber.next_published()) - _log.info( - 'Received: %s on channel: %s', - repr(reply.value), reply.channel) - data = json.loads(reply.value) - - commits = data['commits'] - abspath = data['abspath'] - branch = data['branch'] - default_branch = data['default_branch'] - repo = data['project']['name'] - username = data['project']['user']['name'] \ - if data['project']['parent'] else None - namespace = data['project']['namespace'] - - session = pagure.lib.create_session(_config['DB_URL']) - - _log.info('Looking for project: %s%s of %s', - '%s/' % namespace if namespace else '', - repo, username) - project = pagure.lib._get_project( - pagure.SESSION, repo, user=username, namespace=namespace, - case=_config.get('CASE_SENSITIVE', False)) - - if not project: - _log.info('No project found') - continue - - _log.info('Found project: %s', project.fullname) - - _log.info('Processing %s commits in %s', len(commits), abspath) - - # Only log commits when the branch is the default branch - if branch == default_branch: - pagure.lib.git.log_commits_to_db( - session, project, commits, abspath) - - # Notify subscribed users that there are new commits - pagure.lib.notify.notify_new_commits( - abspath, project, branch, commits) - - try: - session.commit() - except SQLAlchemyError as err: # pragma: no cover - session.rollback() - finally: - session.close() - _log.info('Ready for another') - - -def main(): - ''' Start the main async loop. ''' - - try: - loop = trollius.get_event_loop() - tasks = [ - trollius.async(handle_messages()), - ] - loop.run_until_complete(trollius.wait(tasks)) - loop.run_forever() - except KeyboardInterrupt: - pass - except trollius.ConnectionResetError: - pass - - _log.info("End Connection") - loop.close() - _log.info("End") - - -if __name__ == '__main__': - formatter = logging.Formatter( - "%(asctime)s %(levelname)s [%(module)s:%(lineno)d] %(message)s") - - logging.basicConfig(level=logging.DEBUG) - - # setup console logging - _log.setLevel(logging.DEBUG) - shellhandler = logging.StreamHandler() - shellhandler.setLevel(logging.DEBUG) - - aslog = logging.getLogger("asyncio") - aslog.setLevel(logging.DEBUG) - aslog = logging.getLogger("trollius") - aslog.setLevel(logging.DEBUG) - - shellhandler.setFormatter(formatter) - _log.addHandler(shellhandler) - main() diff --git a/pagure/hooks/files/default_hook.py b/pagure/hooks/files/default_hook.py old mode 100755 new mode 100644 index 868e4aa..5e5647a --- a/pagure/hooks/files/default_hook.py +++ b/pagure/hooks/files/default_hook.py @@ -5,7 +5,6 @@ """ from __future__ import print_function -import json import os import sys @@ -17,15 +16,13 @@ import pagure.exceptions # noqa: E402 import pagure.lib.link # noqa: E402 import pagure.lib.tasks # noqa: E402 -from pagure.lib import REDIS # noqa: E402 - if 'PAGURE_CONFIG' not in os.environ \ and os.path.exists('/etc/pagure/pagure.cfg'): os.environ['PAGURE_CONFIG'] = '/etc/pagure/pagure.cfg' -_config = pagure.config.config.reload_config() +_config = pagure.config.reload_config() abspath = os.path.abspath(os.environ['GIT_DIR']) @@ -39,8 +36,10 @@ def run_as_post_receive_hook(): print('user:', username) print('namespace:', namespace) + session = pagure.lib.create_session(_config['DB_URL']) + project = pagure.lib._get_project( - pagure.SESSION, repo, user=username, namespace=namespace, + session, repo, user=username, namespace=namespace, case=_config.get('CASE_SENSITIVE', False)) for line in sys.stdin: @@ -71,28 +70,21 @@ def run_as_post_receive_hook(): commits = pagure.lib.git.get_revs_between( oldrev, newrev, abspath, refname) - if REDIS: - if refname == default_branch: - print('Sending to redis to log activity and send commit ' - 'notification emails') - else: - print('Sending to redis to send commit notification emails') - # If REDIS is enabled, notify subscribed users that there are new - # commits to this project - REDIS.publish( - 'pagure.logcom', - json.dumps({ - 'project': project.to_json(public=True), - 'abspath': abspath, - 'branch': refname, - 'default_branch': default_branch, - 'commits': commits, - }) - ) + if refname == default_branch: + print('Sending to redis to log activity and send commit ' + 'notification emails') else: - print('Hook not configured to connect to pagure-logcom') - print('/!\ Commit notification emails will not be sent and ' - 'commits won\'t be logged') + print('Sending to redis to send commit notification emails') + + pagure.lib.tasks_services.log_commit_send_notifications.delay( + name=repo, + commits=commits, + abspath=abspath, + branch=refname, + default_branch=default_branch, + namespace=namespace, + username=username, + ) target_repo = project if project.is_fork: @@ -102,7 +94,7 @@ def run_as_post_receive_hook(): and target_repo.settings.get('pull_requests', True): print() prs = pagure.lib.search_pull_requests( - pagure.flask_app.SESSION, + session, project_id_from=project.id, status='Open', branch_from=refname, @@ -135,7 +127,7 @@ def run_as_post_receive_hook(): parent.user.user if parent.is_fork else None ) - pagure.SESSION.remove() + session.remove() def main(args): diff --git a/pagure/lib/tasks_services.py b/pagure/lib/tasks_services.py index df974ad..b79a2db 100644 --- a/pagure/lib/tasks_services.py +++ b/pagure/lib/tasks_services.py @@ -23,6 +23,7 @@ import six from celery import Celery from kitchen.text.converters import to_bytes +from sqlalchemy.exc import SQLAlchemyError import pagure.lib from pagure.config import config as pagure_config @@ -137,3 +138,59 @@ def webhook_notification( _log.info('Got the project and urls, going to the webhooks') call_web_hooks(project, topic, msg, urls) session.close() + + +@conn.task(queue=pagure_config.get('LOGCOM_CELERY_QUEUE', None), bind=True) +@set_status +def log_commit_send_notifications( + self, name, commits, abspath, branch, default_branch, + namespace=None, username=None): + """ Send webhook notifications about an event on that project. + + :arg topic: the topic for the notification + :type topic: str + :arg msg: the message to send via web-hook + :type msg: str + :kwarg namespace: the namespace of the project + :type namespace: None or str + :kwarg name: the name of the project + :type name: None or str + :kwarg user: the user of the project, only set if the project is a fork + :type user: None or str + + """ + session = pagure.lib.create_session(pagure_config['DB_URL']) + + _log.info( + 'Looking for project: %s%s of %s', + '%s/' % namespace if namespace else '', + name, + username) + project = pagure.lib._get_project( + session, name, user=username, namespace=namespace, + case=pagure_config.get('CASE_SENSITIVE', False)) + + if not project: + _log.info('No project found') + return + + _log.info('Found project: %s', project.fullname) + + _log.info('Processing %s commits in %s', len(commits), abspath) + + # Only log commits when the branch is the default branch + if branch == default_branch: + pagure.lib.git.log_commits_to_db( + session, project, commits, abspath) + + # Notify subscribed users that there are new commits + pagure.lib.notify.notify_new_commits( + abspath, project, branch, commits) + + try: + session.commit() + except SQLAlchemyError as err: # pragma: no cover + _log.exception(err) + session.rollback() + finally: + session.close()