collector.py
author Fabien Ninoles <fabien@tzone.org>
Sun, 08 May 2011 22:14:10 -0400
changeset 1 48d514cc3309
parent 0 57d81f2bf26f
child 2 2744eb2a589b
permissions -rw-r--r--
Some rewrite and debugging. Issue: worker is accumulating past job.

import random
import time
import threading
import zmq
import logging
from logging import debug, info
from itertools import count

READY = "READY"
context = zmq.Context()
counter = count()

def collector(name, frontend, backend):
    poller = zmq.Poller()
    poller.register(backend, zmq.POLLIN)
    backends = set()
    info("collector %s is ready with %r backends", name, len(backends))
    while True:
        poller.register(frontend, zmq.POLLIN)
        for socket, event in poller.poll(100):
            request = socket.recv_multipart()
            debug("collector %s received request %r", name, request)
            if socket is backend:
                if request[2] == READY:
                    debug("collector %s has new backend: %r", name, request[0])
                    backends.add(request[0])
                else:
                    debug("collector %s discard reply %r", name, request) 
            else:
                delim = request.index("")
                address_stack = request[:delim+1]
                timeout = request[delim+1]
                if timeout != "STOP":
                    debug("collector %s has new work to do in %s ms", name, timeout)
                recipients = backends
                backends = set()
                debug("collector %s send requests to %r", name, recipients)
                for dest in recipients:
                    backend.send_multipart([dest] + request[delim:])
                if timeout == "STOP":
                    info("collector %s is terminating", name)
                    return
                timeout = int(timeout)
                poller.unregister(frontend)
                while recipients:
                    events = poller.poll(timeout)
                    if not events:
                        info("collector %s has a timeout with %r", name, recipients)
                        break
                    for socket, event in events:
                        reply = socket.recv_multipart()
                        if reply[2] == READY:
                            debug("%r is ready on %s", reply[0], name)
                            backends.add(reply[0])
                            recipients.discard(reply[0])
                        elif reply[0] in recipients:
                            debug("collector %s forward reply", name)
                            frontend.send_multipart(address_stack + reply[2:])
                        else:
                            debug("collector %s discard reply %r", name, reply)
                frontend.send_multipart(address_stack + [READY])
                info("collector %s is ready with %r backends", name, len(backends))


def broker_collector(frontend_url, backend_url):
    frontend = context.socket(zmq.XREP)
    frontend.setsockopt(zmq.IDENTITY, "broker")
    backend = context.socket(zmq.XREP)
    info("Binding broker frontend to %s", frontend_url)
    frontend.bind(frontend_url)
    info("Binding broker backend to %s", backend_url)
    backend.bind(backend_url)
    collector("broker", frontend, backend)

def proxy_collector(frontend_url, backend_url):
    frontend = context.socket(zmq.XREQ)
    frontend.setsockopt(zmq.IDENTITY, "proxy")
    backend = context.socket(zmq.XREP)
    info("Connecting proxy frontend to %s", frontend_url)
    frontend.connect(frontend_url)
    info("Binding proxy backend to %s", backend_url)
    # Sending presence to frontend.
    backend.bind(backend_url)
    frontend.send_multipart(["", READY])
    collector("proxy", frontend, backend)

def worker(socket, workload, failure_rate = 0):
    while True:
        info("Worker is ready")
        socket.send_multipart(["",READY])
        request = socket.recv_multipart()
        info("Worker receive request %r", request)
        delim = request.index("")
        address = request[:delim+1]
        timeout = request[delim+1]
        if timeout == "STOP":
            info("worker is terminating")
            return True
        request = request[delim+2:]
        assert request[0] == "REQUEST"
        if failure_rate and random.randrange(failure_rate) == 0:
            info("worker failed")
            return False
        time.sleep(workload)
        info("worker send reply")
        socket.send_multipart(address + [request[1], "DONE"])

def connect_worker(url, workload, failure_rate = 0):
    while True:
        socket = context.socket(zmq.XREQ)
        info("Connecting worker to %s", url)
        socket.connect(url)
        if worker(socket, workload, failure_rate):
            return

stop = False

def requester(socket, timeout = -1):
    while not stop:
        i = str(counter.next())
        info("Requester send request %s", i)
        socket.send_multipart(["", str(timeout), "REQUEST", i])
        results = 0
        while True:
            reply = socket.recv_multipart()
            debug("requester received reply %r", reply)
            if reply == ["",READY]:
                break
            assert reply[1] == i
            results += 1
        info("requester received %d results", results)
        # time.sleep(1)
    info("requester is terminating")
    socket.send_multipart(["", "STOP"])

def connect_requester(url, timeout):
    socket = context.socket(zmq.XREQ)
    info("Connecting requester to %s", url)
    socket.connect(url)
    requester(socket, timeout)

if __name__ == "__main__":
    logging.getLogger().setLevel(logging.INFO)
    feurl = "inproc://frontend"
    beurl = "inproc://backend"
    brokers = []
    broker = threading.Thread(target = broker_collector, args = (feurl, beurl))
    broker.start()
    brokers.append(broker)
    time.sleep(2)
    senders = []
    for sender in xrange(5):
        sender = threading.Thread(target = connect_requester, args = (feurl,5000))
        sender.start()
        senders.append(sender)
    proxies = []
    proxy_urls = []
    for proxy in xrange(1):
        url = "inproc://proxy_be#%d" % (proxy,)
        proxy = threading.Thread(target = proxy_collector, args = (beurl, url))
        proxy.start()
        proxies.append(proxy)
        proxy_urls.append(url)
    time.sleep(2)
    workers = []
    for url in proxy_urls:
        for work in xrange(1):
            work = threading.Thread(target = connect_worker, args = (url, 1, 0))
            work.start()
            workers.append(work)
    time.sleep(10)
    stop = True
    info("Joining thread")
    for thread in senders + brokers + proxies + workers:
        thread.join()