#!/usr/bin/env python
"""
(c) 2015-2017 - Copyright Red Hat Inc
Authors:
Pierre-Yves Chibon <pingou@pingoured.fr>
This server listens to message sent via redis and send the corresponding
web-hook request.
Using this mechanism, we no longer block the main application if the
receiving end is offline or so.
"""
from __future__ import print_function
import datetime
import hashlib
import hmac
import json
import logging
import os
import requests
import time
import uuid
import six
import trollius
import trollius_redis
from kitchen.text.converters import to_bytes
import pagure
import pagure.lib
from pagure.exceptions import PagureEvException
if 'PAGURE_CONFIG' not in os.environ \
and os.path.exists('/etc/pagure/pagure.cfg'):
print('Using configuration file `/etc/pagure/pagure.cfg`')
os.environ['PAGURE_CONFIG'] = '/etc/pagure/pagure.cfg'
_config = pagure.config.config.reload_config()
log = logging.getLogger(__name__)
_i = 0
def call_web_hooks(project, topic, msg, urls):
''' Sends the web-hook notification. '''
log.info(
"Processing project: %s - topic: %s", project.fullname, topic)
log.debug('msg: %s', msg)
# Send web-hooks notification
global _i
_i += 1
year = datetime.datetime.now().year
if isinstance(topic, six.text_type):
topic = to_bytes(topic, encoding='utf8', nonstring="passthru")
msg['pagure_instance'] = _config['APP_URL']
msg['project_fullname'] = project.fullname
msg = dict(
topic=topic.decode('utf-8'),
msg=msg,
timestamp=int(time.time()),
msg_id=str(year) + '-' + str(uuid.uuid4()),
i=_i,
)
content = json.dumps(msg)
hashhex = hmac.new(
str(project.hook_token), content, hashlib.sha1).hexdigest()
hashhex256 = hmac.new(
str(project.hook_token), content, hashlib.sha256).hexdigest()
headers = {
'X-Pagure': _config['APP_URL'],
'X-Pagure-project': project.fullname,
'X-Pagure-Signature': hashhex,
'X-Pagure-Signature-256': hashhex256,
'X-Pagure-Topic': topic,
'Content-Type': 'application/json',
}
for url in urls:
url = url.strip()
log.info('Calling url %s' % url)
try:
req = requests.post(
url,
headers=headers,
data=content,
timeout=60,
)
if not req:
log.info(
'An error occured while querying: %s - '
'Error code: %s' % (url, req.status_code))
except (requests.exceptions.RequestException, Exception) as err:
log.info(
'An error occured while querying: %s - Error: %s' % (
url, err))
@trollius.coroutine
def handle_messages():
host = _config.get('REDIS_HOST', '0.0.0.0')
port = _config.get('REDIS_PORT', 6379)
dbname = _config.get('REDIS_DB', 0)
connection = yield trollius.From(trollius_redis.Connection.create(
host=host, port=port, db=dbname))
# Create subscriber.
subscriber = yield trollius.From(connection.start_subscribe())
# Subscribe to channel.
yield trollius.From(subscriber.subscribe(['pagure.hook']))
# Inside a while loop, wait for incoming events.
while True:
reply = yield trollius.From(subscriber.next_published())
log.info(
'Received: %s on channel: %s',
repr(reply.value), reply.channel)
data = json.loads(reply.value)
username = None
if data['project'].startswith('forks'):
username, projectname = data['project'].split('/', 2)[1:]
else:
projectname = data['project']
namespace = None
if '/' in projectname:
namespace, projectname = projectname.split('/', 1)
log.info(
'Searching %s/%s/%s' % (username, namespace, projectname))
session = pagure.lib.create_session(_config['DB_URL'])
project = pagure.lib._get_project(
session=session, name=projectname, user=username,
namespace=namespace,
case=_config.get('CASE_SENSITIVE', False))
if not project:
log.info('No project found with these criteria')
session.close()
continue
urls = project.settings.get('Web-hooks')
session.close()
if not urls:
log.info('No URLs set: %s' % urls)
continue
urls = urls.split('\n')
log.info('Got the project, going to the webhooks')
call_web_hooks(project, data['topic'], data['msg'], urls)
def main():
server = None
try:
loop = trollius.get_event_loop()
tasks = [
trollius.async(handle_messages()),
]
loop.run_until_complete(trollius.wait(tasks))
loop.run_forever()
except KeyboardInterrupt:
pass
except trollius.ConnectionResetError:
pass
log.info("End Connection")
loop.close()
log.info("End")
if __name__ == '__main__':
log = logging.getLogger("")
formatter = logging.Formatter(
"%(asctime)s %(levelname)s [%(module)s:%(lineno)d] %(message)s")
# setup console logging
log.setLevel(logging.DEBUG)
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
aslog = logging.getLogger("asyncio")
aslog.setLevel(logging.DEBUG)
ch.setFormatter(formatter)
log.addHandler(ch)
main()