| |
| |
| """ |
| (c) 2015 - Copyright Red Hat Inc |
| |
| Authors: |
| Pierre-Yves Chibon <pingou@pingoured.fr> |
| |
| |
| This server listens to message sent via redis and send the corresponding |
| web-hook request. |
| |
| Using this mechanism, we no longer block the main application if the |
| receiving end is offline or so. |
| |
| """ |
| |
| import datetime |
| import hashlib |
| import hmac |
| import json |
| import logging |
| import os |
| import requests |
| import time |
| import uuid |
| |
| import six |
| import trollius |
| import trollius_redis |
| |
| from kitchen.text.converters import to_bytes |
| |
| |
| log = logging.getLogger(__name__) |
| |
| |
| if 'PAGURE_CONFIG' not in os.environ \ |
| and os.path.exists('/etc/pagure/pagure.cfg'): |
| print 'Using configuration file `/etc/pagure/pagure.cfg`' |
| os.environ['PAGURE_CONFIG'] = '/etc/pagure/pagure.cfg' |
| |
| |
| import pagure |
| import pagure.lib |
| from pagure.exceptions import PagureEvException |
| |
| _i = 0 |
| |
| |
| def call_web_hooks(project, topic, msg, urls): |
| ''' Sends the web-hook notification. ''' |
| log.info( |
| "Processing project: %s - topic: %s", project.fullname, topic) |
| log.debug('msg: %s', msg) |
| |
| |
| global _i |
| _i += 1 |
| year = datetime.datetime.now().year |
| if isinstance(topic, six.text_type): |
| topic = to_bytes(topic, encoding='utf8', nonstring="passthru") |
| msg['pagure_instance'] = pagure.APP.config['APP_URL'] |
| msg['project_fullname'] = project.fullname |
| msg = dict( |
| topic=topic.decode('utf-8'), |
| msg=msg, |
| timestamp=int(time.time()), |
| msg_id=str(year) + '-' + str(uuid.uuid4()), |
| i=_i, |
| ) |
| |
| content = json.dumps(msg) |
| hashhex = hmac.new( |
| str(project.hook_token), content, hashlib.sha1).hexdigest() |
| hashhex256 = hmac.new( |
| str(project.hook_token), content, hashlib.sha256).hexdigest() |
| headers = { |
| 'X-Pagure': pagure.APP.config['APP_URL'], |
| 'X-Pagure-project': project.fullname, |
| 'X-Pagure-Signature': hashhex, |
| 'X-Pagure-Signature-256': hashhex256, |
| 'X-Pagure-Topic': topic, |
| 'Content-Type': 'application/json', |
| } |
| for url in urls: |
| url = url.strip() |
| log.info('Calling url %s' % url) |
| try: |
| req = requests.post( |
| url, |
| headers=headers, |
| data=content, |
| timeout=60, |
| ) |
| if not req: |
| log.info( |
| 'An error occured while querying: %s - ' |
| 'Error code: %s' % (url, req.status_code)) |
| except (requests.exceptions.RequestException, Exception) as err: |
| log.info( |
| 'An error occured while querying: %s - Error: %s' % ( |
| url, err)) |
| |
| |
| @trollius.coroutine |
| def handle_messages(): |
| host = pagure.APP.config.get('REDIS_HOST', '0.0.0.0') |
| port = pagure.APP.config.get('REDIS_PORT', 6379) |
| dbname = pagure.APP.config.get('REDIS_DB', 0) |
| connection = yield trollius.From(trollius_redis.Connection.create( |
| host=host, port=port, db=dbname)) |
| |
| |
| subscriber = yield trollius.From(connection.start_subscribe()) |
| |
| |
| yield trollius.From(subscriber.subscribe(['pagure.hook'])) |
| |
| |
| while True: |
| reply = yield trollius.From(subscriber.next_published()) |
| log.info( |
| 'Received: %s on channel: %s', |
| repr(reply.value), reply.channel) |
| data = json.loads(reply.value) |
| username = None |
| if data['project'].startswith('forks'): |
| username, projectname = data['project'].split('/', 2)[1:] |
| else: |
| projectname = data['project'] |
| |
| namespace = None |
| if '/' in projectname: |
| namespace, projectname = projectname.split('/', 1) |
| |
| log.info( |
| 'Searching %s/%s/%s' % (username, namespace, projectname)) |
| session = pagure.lib.create_session(pagure.APP.config['DB_URL']) |
| project = pagure.lib._get_project( |
| session=session, name=projectname, user=username, |
| namespace=namespace) |
| if not project: |
| log.info('No project found with these criteria') |
| session.close() |
| continue |
| urls = project.settings.get('Web-hooks') |
| session.close() |
| if not urls: |
| log.info('No URLs set: %s' % urls) |
| continue |
| urls = urls.split('\n') |
| log.info('Got the project, going to the webhooks') |
| call_web_hooks(project, data['topic'], data['msg'], urls) |
| |
| |
| def main(): |
| server = None |
| try: |
| loop = trollius.get_event_loop() |
| tasks = [ |
| trollius.async(handle_messages()), |
| ] |
| loop.run_until_complete(trollius.wait(tasks)) |
| loop.run_forever() |
| except KeyboardInterrupt: |
| pass |
| except trollius.ConnectionResetError: |
| pass |
| |
| log.info("End Connection") |
| loop.close() |
| log.info("End") |
| |
| |
| if __name__ == '__main__': |
| log = logging.getLogger("") |
| formatter = logging.Formatter( |
| "%(asctime)s %(levelname)s [%(module)s:%(lineno)d] %(message)s") |
| |
| |
| log.setLevel(logging.DEBUG) |
| ch = logging.StreamHandler() |
| ch.setLevel(logging.DEBUG) |
| |
| aslog = logging.getLogger("asyncio") |
| aslog.setLevel(logging.DEBUG) |
| |
| ch.setFormatter(formatter) |
| log.addHandler(ch) |
| main() |