Blame pagure-ev/pagure_stream_server.py

Pierre-Yves Chibon 1dd40f
#!/usr/bin/env python
Pierre-Yves Chibon 1dd40f
Pierre-Yves Chibon 1dd40f
"""
Abhijeet Kasurde a8d2ec
 (c) 2015-2017 - Copyright Red Hat Inc
Pierre-Yves Chibon 1dd40f
Pierre-Yves Chibon 1dd40f
 Authors:
Pierre-Yves Chibon 1dd40f
   Pierre-Yves Chibon <pingou@pingoured.fr></pingou@pingoured.fr>
Pierre-Yves Chibon 1dd40f
Pierre-Yves Chibon 1dd40f
Pierre-Yves Chibon 1dd40f
Streaming server for pagure's eventsource feature
Pierre-Yves Chibon 1dd40f
This server takes messages sent to redis and publish them at the specified
Pierre-Yves Chibon 1dd40f
endpoint
Pierre-Yves Chibon 1dd40f
Pierre-Yves Chibon 1dd40f
To test, run this script and in another terminal
Pierre-Yves Chibon 1dd40f
nc localhost 8080
Pierre-Yves Chibon 1dd40f
  HELLO
Pierre-Yves Chibon 1dd40f
Pierre-Yves Chibon 1dd40f
  GET /test/issue/26?foo=bar HTTP/1.1
Pierre-Yves Chibon 1dd40f
Pierre-Yves Chibon 1dd40f
"""
Pierre-Yves Chibon 1dd40f
Pierre-Yves Chibon 67d1cc
from __future__ import unicode_literals, absolute_import
Aurélien Bompard dcf6f6
Pierre-Yves Chibon 1dd40f
import logging
Pierre-Yves Chibon 1dd40f
import os
Aurélien Bompard 831553
Pierre-Yves Chibon 1dd40f
Patrick Uiterwijk c998ef
import redis
Julen Landa Alustiza 47a9ab
import trololio
Pierre-Yves Chibon 1dd40f
Aurélien Bompard 831553
from six.moves.urllib.parse import urlparse
Aurélien Bompard 831553
Pierre-Yves Chibon 1dd40f
log = logging.getLogger(__name__)
Pierre-Yves Chibon 1dd40f
Pierre-Yves Chibon 1dd40f
Pierre-Yves Chibon 73d120
if "PAGURE_CONFIG" not in os.environ and os.path.exists(
Pierre-Yves Chibon 73d120
    "/etc/pagure/pagure.cfg"
Pierre-Yves Chibon 73d120
):
Pierre-Yves Chibon 73d120
    print("Using configuration file `/etc/pagure/pagure.cfg`")
Pierre-Yves Chibon 73d120
    os.environ["PAGURE_CONFIG"] = "/etc/pagure/pagure.cfg"
Pierre-Yves Chibon 1dd40f
Pierre-Yves Chibon 1dd40f
Pierre-Yves Chibon de6f12
import pagure  # noqa: E402
Pierre-Yves Chibon cf98be
import pagure.lib.model_base  # noqa: E402
Pierre-Yves Chibon 930073
import pagure.lib.query  # noqa: E402
Pierre-Yves Chibon 0c5199
from pagure.exceptions import PagureException, PagureEvException  # noqa: E402
Pierre-Yves Chibon 1dd40f
Pierre-Yves Chibon 39bf6b
SERVER = None
Pierre-Yves Chibon b130e5
SESSION = None
Patrick Uiterwijk c998ef
POOL = redis.ConnectionPool(
Pierre-Yves Chibon 73d120
    host=pagure.config.config["REDIS_HOST"],
Pierre-Yves Chibon 73d120
    port=pagure.config.config["REDIS_PORT"],
Pierre-Yves Chibon 73d120
    db=pagure.config.config["REDIS_DB"],
Pierre-Yves Chibon 73d120
)
Pierre-Yves Chibon b130e5
Pierre-Yves Chibon b130e5
Pierre-Yves Chibon b130e5
def _get_session():
Pierre-Yves Chibon b130e5
    global SESSION
Pierre-Yves Chibon b130e5
    if SESSION is None:
Pierre-Yves Chibon 73d120
        print(pagure.config.config["DB_URL"])
Pierre-Yves Chibon cf98be
        SESSION = pagure.lib.model_base.create_session(
Pierre-Yves Chibon 73d120
            pagure.config.config["DB_URL"]
Pierre-Yves Chibon 73d120
        )
Pierre-Yves Chibon b130e5
Pierre-Yves Chibon b130e5
    return SESSION
Pierre-Yves Chibon 1dd40f
Adam Williamson 01c1a4
Adam Williamson 01c1a4
def _get_issue(repo, objid):
Adam Williamson 01c1a4
    """Get a Ticket (issue) instance for a given repo (Project) and
Adam Williamson 01c1a4
    objid (issue number).
Pierre-Yves Chibon 116ab2
    """
Adam Williamson 01c1a4
    issue = None
Pierre-Yves Chibon 73d120
    if not repo.settings.get("issue_tracker", True):
Adam Williamson 01c1a4
        raise PagureEvException("No issue tracker found for this project")
Pierre-Yves Chibon 116ab2
Pierre-Yves Chibon b130e5
    session = _get_session()
Pierre-Yves Chibon 930073
    issue = pagure.lib.query.search_issues(session, repo, issueid=objid)
Pierre-Yves Chibon 116ab2
Adam Williamson 01c1a4
    if issue is None or issue.project != repo:
Adam Williamson 01c1a4
        raise PagureEvException("Issue '%s' not found" % objid)
Pierre-Yves Chibon 116ab2
Adam Williamson 01c1a4
    if issue.private:
Adam Williamson 01c1a4
        # TODO: find a way to do auth
Adam Williamson 01c1a4
        raise PagureEvException(
Pierre-Yves Chibon 73d120
            "This issue is private and you are not allowed to view it"
Pierre-Yves Chibon 73d120
        )
Pierre-Yves Chibon 116ab2
Adam Williamson 01c1a4
    return issue
Pierre-Yves Chibon 116ab2
Pierre-Yves Chibon 116ab2
Adam Williamson 01c1a4
def _get_pull_request(repo, objid):
Adam Williamson 01c1a4
    """Get a PullRequest instance for a given repo (Project) and objid
Adam Williamson 01c1a4
    (request number).
Adam Williamson 01c1a4
    """
Pierre-Yves Chibon 73d120
    if not repo.settings.get("pull_requests", True):
Adam Williamson 01c1a4
        raise PagureEvException(
Pierre-Yves Chibon 73d120
            "No pull-request tracker found for this project"
Pierre-Yves Chibon 73d120
        )
Adam Williamson 01c1a4
Pierre-Yves Chibon b130e5
    session = _get_session()
Pierre-Yves Chibon 930073
    request = pagure.lib.query.search_pull_requests(
Pierre-Yves Chibon 73d120
        session, project_id=repo.id, requestid=objid
Pierre-Yves Chibon 73d120
    )
Adam Williamson 01c1a4
Adam Williamson 01c1a4
    if request is None or request.project != repo:
Adam Williamson 01c1a4
        raise PagureEvException("Pull-Request '%s' not found" % objid)
Adam Williamson 01c1a4
Adam Williamson 01c1a4
    return request
Adam Williamson 01c1a4
Adam Williamson 01c1a4
Adam Williamson 01c1a4
# Dict representing known object types that we handle requests for,
Adam Williamson 01c1a4
# and the bound functions for getting an object instance from the
Adam Williamson 01c1a4
# parsed path data. Has to come after the functions it binds
Pierre-Yves Chibon 73d120
OBJECTS = {"issue": _get_issue, "pull-request": _get_pull_request}
Adam Williamson 01c1a4
Adam Williamson 01c1a4
Adam Williamson 01c1a4
def get_obj_from_path(path):
Adam Williamson 01c1a4
    """ Return the Ticket or Request object based on the path provided.
Adam Williamson 01c1a4
    """
Pierre-Yves Chibon c13fca
    (username, namespace, reponame, objtype, objid) = pagure.utils.parse_path(
Pierre-Yves Chibon 73d120
        path
Pierre-Yves Chibon 73d120
    )
Pierre-Yves Chibon b130e5
    session = _get_session()
Pierre-Yves Chibon 930073
    repo = pagure.lib.query.get_authorized_project(
Pierre-Yves Chibon 73d120
        session, reponame, user=username, namespace=namespace
Pierre-Yves Chibon 73d120
    )
Farhaan Bukhsh baa162
Adam Williamson 01c1a4
    if repo is None:
Adam Williamson 01c1a4
        raise PagureEvException("Project '%s' not found" % reponame)
Pierre-Yves Chibon 116ab2
Adam Williamson 01c1a4
    # find the appropriate object getter function from OBJECTS
Adam Williamson 01c1a4
    try:
Adam Williamson 01c1a4
        getfunc = OBJECTS[objtype]
Adam Williamson 01c1a4
    except KeyError:
Adam Williamson 01c1a4
        raise PagureEvException("Invalid object provided: '%s'" % objtype)
Pierre-Yves Chibon e07ca8
Adam Williamson 01c1a4
    return getfunc(repo, objid)
Pierre-Yves Chibon 116ab2
Pierre-Yves Chibon 116ab2
Neal Gompa 4b2e7f
@trololio.coroutine
Pierre-Yves Chibon 1dd40f
def handle_client(client_reader, client_writer):
Pierre-Yves Chibon ac3518
    data = None
Pierre-Yves Chibon ac3518
    while True:
Pierre-Yves Chibon ac3518
        # give client a chance to respond, timeout after 10 seconds
Pierre-Yves Chibon 73d120
        line = yield trololio.From(
Pierre-Yves Chibon 73d120
            trololio.asyncio.wait_for(client_reader.readline(), timeout=10.0)
Pierre-Yves Chibon 73d120
        )
Pierre-Yves Chibon ac3518
        if not line.decode().strip():
Pierre-Yves Chibon ac3518
            break
Pierre-Yves Chibon ac3518
        line = line.decode().rstrip()
Pierre-Yves Chibon ac3518
        if data is None:
Pierre-Yves Chibon ac3518
            data = line
Pierre-Yves Chibon 1dd40f
Pierre-Yves Chibon 1dd40f
    if data is None:
Pierre-Yves Chibon 1dd40f
        log.warning("Expected ticket uid, received None")
Pierre-Yves Chibon 1dd40f
        return
Pierre-Yves Chibon 1dd40f
Pierre-Yves Chibon 1dd40f
    data = data.decode().rstrip().split()
Pierre-Yves Chibon 1dd40f
    log.info("Received %s", data)
Pierre-Yves Chibon 1dd40f
    if not data:
Pierre-Yves Chibon 1dd40f
        log.warning("No URL provided: %s" % data)
Pierre-Yves Chibon 1dd40f
        return
Pierre-Yves Chibon 1dd40f
Pierre-Yves Chibon 73d120
    if "/" not in data[1]:
Pierre-Yves Chibon 1dd40f
        log.warning("Invalid URL provided: %s" % data[1])
Pierre-Yves Chibon 1dd40f
        return
Pierre-Yves Chibon 1dd40f
Aurélien Bompard 831553
    url = urlparse(data[1])
Pierre-Yves Chibon 1dd40f
Pierre-Yves Chibon 3d3084
    try:
Pierre-Yves Chibon 3d3084
        obj = get_obj_from_path(url.path)
Pierre-Yves Chibon c13fca
    except PagureException as err:
Pierre-Yves Chibon 3d3084
        log.warning(err.message)
Pierre-Yves Chibon 3d3084
        return
Pierre-Yves Chibon 3d3084
Pierre-Yves Chibon 73d120
    origin = pagure.config.config.get("APP_URL")
Pierre-Yves Chibon 73d120
    if origin.endswith("/"):
Pierre-Yves Chibon 734609
        origin = origin[:-1]
Pierre-Yves Chibon 734609
Pierre-Yves Chibon 73d120
    client_writer.write(
Pierre-Yves Chibon 73d120
        (
Pierre-Yves Chibon 73d120
            "HTTP/1.0 200 OK\n"
Pierre-Yves Chibon 73d120
            "Content-Type: text/event-stream\n"
Pierre-Yves Chibon 73d120
            "Cache: nocache\n"
Pierre-Yves Chibon 73d120
            "Connection: keep-alive\n"
Pierre-Yves Chibon 73d120
            "Access-Control-Allow-Origin: %s\n\n" % origin
Pierre-Yves Chibon 73d120
        ).encode()
Pierre-Yves Chibon 73d120
    )
Pierre-Yves Chibon 1f4ebf
Patrick Uiterwijk c998ef
    conn = redis.Redis(connection_pool=POOL)
Patrick Uiterwijk c998ef
    subscriber = conn.pubsub(ignore_subscribe_messages=True)
Pierre-Yves Chibon 1dd40f
Patrick Uiterwijk c998ef
    try:
Pierre-Yves Chibon 73d120
        subscriber.subscribe("pagure.%s" % obj.uid)
Pierre-Yves Chibon 1dd40f
Pierre-Yves Chibon 1dd40f
        # Inside a while loop, wait for incoming events.
Patrick Uiterwijk c998ef
        oncall = 0
Pierre-Yves Chibon 1dd40f
        while True:
Patrick Uiterwijk c998ef
            msg = subscriber.get_message()
Patrick Uiterwijk c998ef
            if msg is None:
Patrick Uiterwijk c998ef
                # Send a ping to see if the client is still alive
Patrick Uiterwijk c998ef
                if oncall >= 5:
Patrick Uiterwijk c998ef
                    # Only send a ping once every 5 seconds
Pierre-Yves Chibon 73d120
                    client_writer.write(("event: ping\n\n").encode())
Patrick Uiterwijk c998ef
                    oncall = 0
Patrick Uiterwijk c998ef
                oncall += 1
Neal Gompa 4b2e7f
                yield trololio.From(client_writer.drain())
Julen Landa Alustiza 47a9ab
                yield trololio.From(trololio.asyncio.sleep(1))
Patrick Uiterwijk c998ef
            else:
Pierre-Yves Chibon 73d120
                log.info("Sending %s", msg["data"])
Pierre-Yves Chibon 73d120
                client_writer.write(("data: %s\n\n" % msg["data"]).encode())
Neal Gompa 4b2e7f
                yield trololio.From(client_writer.drain())
Patrick Uiterwijk c998ef
Patrick Uiterwijk c998ef
    except OSError:
Patrick Uiterwijk c998ef
        log.info("Client closed connection")
Neal Gompa 4b2e7f
    except trololio.ConnectionResetError as err:
Pierre-Yves Chibon e89a19
        log.exception("ERROR: ConnectionResetError in handle_client")
Pierre-Yves Chibon 1f4ebf
    except Exception as err:
Pierre-Yves Chibon e89a19
        log.exception("ERROR: Exception in handle_client")
Patrick Uiterwijk c998ef
        log.info(type(err))
Pierre-Yves Chibon 1dd40f
    finally:
Pierre-Yves Chibon 1dd40f
        # Wathever happens, close the connection.
Patrick Uiterwijk c998ef
        log.info("Client left. Goodbye!")
Patrick Uiterwijk c998ef
        subscriber.close()
Pierre-Yves Chibon 1dd40f
        client_writer.close()
Pierre-Yves Chibon 1dd40f
Pierre-Yves Chibon 1dd40f
Neal Gompa 4b2e7f
@trololio.coroutine
Pierre-Yves Chibon 39bf6b
def stats(client_reader, client_writer):
Pierre-Yves Chibon 39bf6b
Pierre-Yves Chibon 39bf6b
    try:
Pierre-Yves Chibon 73d120
        log.info("Clients: %s", SERVER.active_count)
Pierre-Yves Chibon 73d120
        client_writer.write(
Pierre-Yves Chibon 73d120
            ("HTTP/1.0 200 OK\n" "Cache: nocache\n\n").encode()
Pierre-Yves Chibon 73d120
        )
Pierre-Yves Chibon 73d120
        client_writer.write(("data: %s\n\n" % SERVER.active_count).encode())
Neal Gompa 4b2e7f
        yield trololio.From(client_writer.drain())
Pierre-Yves Chibon 39bf6b
Neal Gompa 4b2e7f
    except trololio.ConnectionResetError as err:
Pierre-Yves Chibon af9aab
        log.info(err)
Pierre-Yves Chibon 39bf6b
    finally:
Pierre-Yves Chibon 39bf6b
        client_writer.close()
Pierre-Yves Chibon 39bf6b
    return
Pierre-Yves Chibon 39bf6b
Pierre-Yves Chibon 39bf6b
Pierre-Yves Chibon 1dd40f
def main():
Pierre-Yves Chibon 39bf6b
    global SERVER
Pierre-Yves Chibon b130e5
    _get_session()
Pierre-Yves Chibon 1dd40f
Pierre-Yves Chibon 1dd40f
    try:
Julen Landa Alustiza 47a9ab
        loop = trololio.asyncio.get_event_loop()
Julen Landa Alustiza 47a9ab
        coro = trololio.asyncio.start_server(
Pierre-Yves Chibon 8d3302
            handle_client,
Pierre-Yves Chibon 8d3302
            host=None,
Pierre-Yves Chibon 73d120
            port=pagure.config.config["EVENTSOURCE_PORT"],
Pierre-Yves Chibon 73d120
            loop=loop,
Pierre-Yves Chibon 73d120
        )
Pierre-Yves Chibon 39bf6b
        SERVER = loop.run_until_complete(coro)
Abhijeet Kasurde a8d2ec
        log.info(
Pierre-Yves Chibon 73d120
            "Serving server at {}".format(SERVER.sockets[0].getsockname())
Pierre-Yves Chibon 73d120
        )
Pierre-Yves Chibon 73d120
        if pagure.config.config.get("EV_STATS_PORT"):
Julen Landa Alustiza 47a9ab
            stats_coro = trololio.asyncio.start_server(
Pierre-Yves Chibon 39bf6b
                stats,
Pierre-Yves Chibon 39bf6b
                host=None,
Pierre-Yves Chibon 73d120
                port=pagure.config.config.get("EV_STATS_PORT"),
Pierre-Yves Chibon 73d120
                loop=loop,
Pierre-Yves Chibon 73d120
            )
Pierre-Yves Chibon 39bf6b
            stats_server = loop.run_until_complete(stats_coro)
Pierre-Yves Chibon 73d120
            log.info(
Pierre-Yves Chibon 73d120
                "Serving stats  at {}".format(
Pierre-Yves Chibon 73d120
                    stats_server.sockets[0].getsockname()
Pierre-Yves Chibon 73d120
                )
Pierre-Yves Chibon 73d120
            )
Pierre-Yves Chibon 1dd40f
        loop.run_forever()
Pierre-Yves Chibon 1dd40f
    except KeyboardInterrupt:
Pierre-Yves Chibon 1dd40f
        pass
Neal Gompa 4b2e7f
    except trololio.ConnectionResetError as err:
Pierre-Yves Chibon e89a19
        log.exception("ERROR: ConnectionResetError in main")
Abhijeet Kasurde a8d2ec
    except Exception:
Pierre-Yves Chibon e89a19
        log.exception("ERROR: Exception in main")
Pierre-Yves Chibon 069455
    finally:
Pierre-Yves Chibon 069455
        # Close the server
Pierre-Yves Chibon 069455
        SERVER.close()
Pierre-Yves Chibon 73d120
        if pagure.config.config.get("EV_STATS_PORT"):
Pierre-Yves Chibon 069455
            stats_server.close()
Pierre-Yves Chibon 069455
        log.info("End Connection")
Pierre-Yves Chibon 069455
        loop.run_until_complete(SERVER.wait_closed())
Pierre-Yves Chibon 069455
        loop.close()
Pierre-Yves Chibon 069455
        log.info("End")
Pierre-Yves Chibon 1dd40f
Pierre-Yves Chibon 1dd40f
Pierre-Yves Chibon 73d120
if __name__ == "__main__":
Pierre-Yves Chibon 1dd40f
    log = logging.getLogger("")
Pierre-Yves Chibon 1dd40f
    formatter = logging.Formatter(
Pierre-Yves Chibon 73d120
        "%(asctime)s %(levelname)s [%(module)s:%(lineno)d] %(message)s"
Pierre-Yves Chibon 73d120
    )
Pierre-Yves Chibon 1dd40f
Pierre-Yves Chibon 1dd40f
    # setup console logging
Pierre-Yves Chibon 1dd40f
    log.setLevel(logging.DEBUG)
Pierre-Yves Chibon 1dd40f
    ch = logging.StreamHandler()
Pierre-Yves Chibon 1dd40f
    ch.setLevel(logging.DEBUG)
Pierre-Yves Chibon 1dd40f
Pierre-Yves Chibon 1dd40f
    aslog = logging.getLogger("asyncio")
Pierre-Yves Chibon 1dd40f
    aslog.setLevel(logging.DEBUG)
Pierre-Yves Chibon 1dd40f
Pierre-Yves Chibon 1dd40f
    ch.setFormatter(formatter)
Pierre-Yves Chibon 1dd40f
    log.addHandler(ch)
Pierre-Yves Chibon 1dd40f
    main()