Blame ev-server/pagure-stream-server.py

Pierre-Yves Chibon 1dd40f
#!/usr/bin/env python
Pierre-Yves Chibon 1dd40f
Pierre-Yves Chibon 1dd40f
"""
Pierre-Yves Chibon 0d7c61
 (c) 2015 - Copyright Red Hat Inc
Pierre-Yves Chibon 1dd40f
Pierre-Yves Chibon 1dd40f
 Authors:
Pierre-Yves Chibon 1dd40f
   Pierre-Yves Chibon <pingou@pingoured.fr></pingou@pingoured.fr>
Pierre-Yves Chibon 1dd40f
Pierre-Yves Chibon 1dd40f
Pierre-Yves Chibon 1dd40f
Streaming server for pagure's eventsource feature
Pierre-Yves Chibon 1dd40f
This server takes messages sent to redis and publish them at the specified
Pierre-Yves Chibon 1dd40f
endpoint
Pierre-Yves Chibon 1dd40f
Pierre-Yves Chibon 1dd40f
To test, run this script and in another terminal
Pierre-Yves Chibon 1dd40f
nc localhost 8080
Pierre-Yves Chibon 1dd40f
  HELLO
Pierre-Yves Chibon 1dd40f
Pierre-Yves Chibon 1dd40f
  GET /test/issue/26?foo=bar HTTP/1.1
Pierre-Yves Chibon 1dd40f
Pierre-Yves Chibon 1dd40f
"""
Pierre-Yves Chibon 1dd40f
Pierre-Yves Chibon 1dd40f
import datetime
Pierre-Yves Chibon 1dd40f
import logging
Pierre-Yves Chibon 1dd40f
import os
Pierre-Yves Chibon 1dd40f
import urlparse
Pierre-Yves Chibon 1dd40f
Pierre-Yves Chibon 1dd40f
import trollius
Pierre-Yves Chibon 1dd40f
import trollius_redis
Pierre-Yves Chibon 1dd40f
Pierre-Yves Chibon 1dd40f
log = logging.getLogger(__name__)
Pierre-Yves Chibon 1dd40f
Pierre-Yves Chibon 1dd40f
Pierre-Yves Chibon 1dd40f
if 'PAGURE_CONFIG' not in os.environ \
Pierre-Yves Chibon 1dd40f
        and os.path.exists('/etc/pagure/pagure.cfg'):
Pierre-Yves Chibon 1dd40f
    print 'Using configuration file `/etc/pagure/pagure.cfg`'
Pierre-Yves Chibon 1dd40f
    os.environ['PAGURE_CONFIG'] = '/etc/pagure/pagure.cfg'
Pierre-Yves Chibon 1dd40f
Pierre-Yves Chibon 1dd40f
Pierre-Yves Chibon 1dd40f
import pagure
Pierre-Yves Chibon 1dd40f
import pagure.lib
Pierre-Yves Chibon 116ab2
from pagure.exceptions import PagureEvException
Pierre-Yves Chibon 1dd40f
Pierre-Yves Chibon 1dd40f
Pierre-Yves Chibon 1dd40f
clients = {}
Pierre-Yves Chibon 1dd40f
Pierre-Yves Chibon 1dd40f
Pierre-Yves Chibon 116ab2
def get_obj_from_path(path):
Pierre-Yves Chibon 116ab2
    """ Return the Ticket or Request object based on the path provided.
Pierre-Yves Chibon 116ab2
    """
Pierre-Yves Chibon 116ab2
    username = None
Pierre-Yves Chibon 116ab2
    if path.startswith('/fork'):
Pierre-Yves Chibon 116ab2
        username, repo, obj, objid = path.split('/')[2:6]
Pierre-Yves Chibon 116ab2
    else:
Pierre-Yves Chibon 116ab2
        repo, obj, objid = path.split('/')[1:4]
Pierre-Yves Chibon 116ab2
Pierre-Yves Chibon 116ab2
    repo = pagure.lib.get_project(pagure.SESSION, repo, user=username)
Pierre-Yves Chibon 116ab2
Pierre-Yves Chibon 116ab2
    if repo is None:
Pierre-Yves Chibon 116ab2
        raise PagureEvException("Project '%s' not found" % repo)
Pierre-Yves Chibon 116ab2
Pierre-Yves Chibon 116ab2
    output = None
Pierre-Yves Chibon 116ab2
    if obj == 'issue':
Pierre-Yves Chibon 116ab2
        if not repo.settings.get('issue_tracker', True):
Pierre-Yves Chibon 116ab2
            raise PagureEvException("No issue tracker found for this project")
Pierre-Yves Chibon 116ab2
Pierre-Yves Chibon 116ab2
        output = pagure.lib.search_issues(
Pierre-Yves Chibon 116ab2
            pagure.SESSION, repo, issueid=objid)
Pierre-Yves Chibon 116ab2
Pierre-Yves Chibon 116ab2
        if output is None or output.project != repo:
Pierre-Yves Chibon 116ab2
            raise PagureEvException("Issue '%s' not found" % objid)
Pierre-Yves Chibon 116ab2
Pierre-Yves Chibon 116ab2
        if output.private:
Pierre-Yves Chibon 116ab2
            # TODO: find a way to do auth
Pierre-Yves Chibon 116ab2
            raise PagureEvException(
Pierre-Yves Chibon 116ab2
                "This issue is private and you are not allowed to view it")
Pierre-Yves Chibon e07ca8
    elif obj == 'pull-request':
Pierre-Yves Chibon 116ab2
        if not repo.settings.get('pull_requests', True):
Pierre-Yves Chibon 116ab2
            raise PagureEvException(
Pierre-Yves Chibon 116ab2
                "No pull-request tracker found for this project")
Pierre-Yves Chibon 116ab2
Pierre-Yves Chibon 116ab2
        output = pagure.lib.search_pull_requests(
Pierre-Yves Chibon 116ab2
            pagure.SESSION, project_id=repo.id, requestid=objid)
Pierre-Yves Chibon 116ab2
Pierre-Yves Chibon 116ab2
        if output is None or output.project != repo:
Pierre-Yves Chibon 116ab2
            raise PagureEvException("Pull-Request '%s' not found" % objid)
Pierre-Yves Chibon 116ab2
Pierre-Yves Chibon e07ca8
    else:
Pierre-Yves Chibon e07ca8
        raise PagureEvException("Invalid object provided: '%s'" % obj)
Pierre-Yves Chibon e07ca8
Pierre-Yves Chibon 116ab2
    return output
Pierre-Yves Chibon 116ab2
Pierre-Yves Chibon 116ab2
Pierre-Yves Chibon 1dd40f
@trollius.coroutine
Pierre-Yves Chibon 1dd40f
def handle_client(client_reader, client_writer):
Pierre-Yves Chibon 1dd40f
    # give client a chance to respond, timeout after 10 seconds
Pierre-Yves Chibon 1dd40f
    data = yield trollius.From(trollius.wait_for(
Pierre-Yves Chibon 1dd40f
        client_reader.readline(),
Pierre-Yves Chibon 1dd40f
        timeout=10.0))
Pierre-Yves Chibon 1dd40f
Pierre-Yves Chibon 1dd40f
    if data is None:
Pierre-Yves Chibon 1dd40f
        log.warning("Expected ticket uid, received None")
Pierre-Yves Chibon 1dd40f
        return
Pierre-Yves Chibon 1dd40f
Pierre-Yves Chibon 1dd40f
    data = data.decode().rstrip().split()
Pierre-Yves Chibon 1dd40f
    log.info("Received %s", data)
Pierre-Yves Chibon 1dd40f
    if not data:
Pierre-Yves Chibon 1dd40f
        log.warning("No URL provided: %s" % data)
Pierre-Yves Chibon 1dd40f
        return
Pierre-Yves Chibon 1dd40f
Pierre-Yves Chibon 1dd40f
    if not '/' in data[1]:
Pierre-Yves Chibon 1dd40f
        log.warning("Invalid URL provided: %s" % data[1])
Pierre-Yves Chibon 1dd40f
        return
Pierre-Yves Chibon 1dd40f
Pierre-Yves Chibon 1dd40f
    url = urlparse.urlsplit(data[1])
Pierre-Yves Chibon 1dd40f
Pierre-Yves Chibon 1dd40f
    client_writer.write((
Pierre-Yves Chibon 1dd40f
        "HTTP/1.0 200 OK\n"
Pierre-Yves Chibon 1dd40f
        "Content-Type: text/event-stream\n"
Pierre-Yves Chibon 1dd40f
        "Cache: nocache\n"
Pierre-Yves Chibon 1dd40f
        "Connection: keep-alive\n"
Pierre-Yves Chibon 1dd40f
        "Access-Control-Allow-Origin: *\n\n"
Pierre-Yves Chibon 1dd40f
    ).encode())
Pierre-Yves Chibon 1dd40f
Pierre-Yves Chibon 116ab2
    try:
Pierre-Yves Chibon 116ab2
        obj = get_obj_from_path(url.path)
Pierre-Yves Chibon 116ab2
    except PagureEvException as err:
Pierre-Yves Chibon 116ab2
        log.warning(err.message)
Pierre-Yves Chibon 1dd40f
        return
Pierre-Yves Chibon 1dd40f
Pierre-Yves Chibon 1dd40f
    try:
Pierre-Yves Chibon 1dd40f
        connection = yield trollius.From(trollius_redis.Connection.create(
Pierre-Yves Chibon 1dd40f
            host=pagure.APP.config['REDIS_HOST'],
Pierre-Yves Chibon 1dd40f
            port=pagure.APP.config['REDIS_PORT'],
Pierre-Yves Chibon 1dd40f
            db=pagure.APP.config['REDIS_DB']))
Pierre-Yves Chibon 1dd40f
Pierre-Yves Chibon 1dd40f
        # Create subscriber.
Pierre-Yves Chibon 1dd40f
        subscriber = yield trollius.From(connection.start_subscribe())
Pierre-Yves Chibon 1dd40f
Pierre-Yves Chibon 1dd40f
        # Subscribe to channel.
Pierre-Yves Chibon 116ab2
        yield trollius.From(subscriber.subscribe([obj.uid]))
Pierre-Yves Chibon 1dd40f
Pierre-Yves Chibon 1dd40f
        # Inside a while loop, wait for incoming events.
Pierre-Yves Chibon 1dd40f
        while True:
Pierre-Yves Chibon 1dd40f
            reply = yield trollius.From(subscriber.next_published())
Pierre-Yves Chibon 1dd40f
            #print(u'Received: ', repr(reply.value), u'on channel', reply.channel)
Pierre-Yves Chibon 1dd40f
            log.info(reply)
Pierre-Yves Chibon 1dd40f
            log.info("Sending %s", reply.value)
Pierre-Yves Chibon 1dd40f
            client_writer.write(('data: %s\n\n' % reply.value).encode())
Pierre-Yves Chibon 1dd40f
            yield trollius.From(client_writer.drain())
Pierre-Yves Chibon 1dd40f
Pierre-Yves Chibon 1dd40f
    except trollius.ConnectionResetError:
Pierre-Yves Chibon 1dd40f
        pass
Pierre-Yves Chibon 1dd40f
    finally:
Pierre-Yves Chibon 1dd40f
        # Wathever happens, close the connection.
Pierre-Yves Chibon 1dd40f
        connection.close()
Pierre-Yves Chibon 1dd40f
        client_writer.close()
Pierre-Yves Chibon 1dd40f
Pierre-Yves Chibon 1dd40f
Pierre-Yves Chibon 1dd40f
def main():
Pierre-Yves Chibon 1dd40f
Pierre-Yves Chibon 1dd40f
    try:
Pierre-Yves Chibon 1dd40f
        loop = trollius.get_event_loop()
Pierre-Yves Chibon 1dd40f
        coro = trollius.start_server(
Pierre-Yves Chibon 1dd40f
            handle_client, host=None, port=8080, loop=loop)
Pierre-Yves Chibon 1dd40f
        server = loop.run_until_complete(coro)
Pierre-Yves Chibon 1dd40f
        print('Serving on {}'.format(server.sockets[0].getsockname()))
Pierre-Yves Chibon 1dd40f
        loop.run_forever()
Pierre-Yves Chibon 1dd40f
    except KeyboardInterrupt:
Pierre-Yves Chibon 1dd40f
        pass
Pierre-Yves Chibon 1dd40f
    except trollius.ConnectionResetError:
Pierre-Yves Chibon 1dd40f
        pass
Pierre-Yves Chibon 1dd40f
Pierre-Yves Chibon 1dd40f
    # Close the server
Pierre-Yves Chibon 1dd40f
    server.close()
Pierre-Yves Chibon 1dd40f
    log.info("End Connection")
Pierre-Yves Chibon 1dd40f
    loop.run_until_complete(server.wait_closed())
Pierre-Yves Chibon 1dd40f
    loop.close()
Pierre-Yves Chibon 1dd40f
    log.info("End")
Pierre-Yves Chibon 1dd40f
Pierre-Yves Chibon 1dd40f
Pierre-Yves Chibon 1dd40f
if __name__ == '__main__':
Pierre-Yves Chibon 1dd40f
    log = logging.getLogger("")
Pierre-Yves Chibon 1dd40f
    formatter = logging.Formatter(
Pierre-Yves Chibon 1dd40f
        "%(asctime)s %(levelname)s [%(module)s:%(lineno)d] %(message)s")
Pierre-Yves Chibon 1dd40f
Pierre-Yves Chibon 1dd40f
    # setup console logging
Pierre-Yves Chibon 1dd40f
    log.setLevel(logging.DEBUG)
Pierre-Yves Chibon 1dd40f
    ch = logging.StreamHandler()
Pierre-Yves Chibon 1dd40f
    ch.setLevel(logging.DEBUG)
Pierre-Yves Chibon 1dd40f
Pierre-Yves Chibon 1dd40f
    aslog = logging.getLogger("asyncio")
Pierre-Yves Chibon 1dd40f
    aslog.setLevel(logging.DEBUG)
Pierre-Yves Chibon 1dd40f
Pierre-Yves Chibon 1dd40f
    ch.setFormatter(formatter)
Pierre-Yves Chibon 1dd40f
    log.addHandler(ch)
Pierre-Yves Chibon 1dd40f
    main()