collector.py
author Fabien Ninoles <fabien@tzone.org>
Sun, 08 May 2011 22:58:56 -0400
changeset 4 4b5a51cb5fc7
parent 3 197572da88ea
child 5 55f26e7ee45e
permissions -rw-r--r--
Add failure and near timeout workload.

import random
import time
import threading
import zmq
import logging
from logging import debug, info
from itertools import count

READY = "READY"
context = zmq.Context()
counter = count()

def checkzmqerror(func):
    def wrapper(*args, **kwargs):
        try:
            func(*args, **kwargs)
        except zmq.ZMQError, err:
            info("%r(*%r, **%r) is terminating with error %s", func, args, kwargs, err)
    return wrapper

def collector(name, frontend, backend):
    backends = set()
    debug("collector %s is ready with %r backends", name, len(backends))
    dropped = 0
    while True:
        poller = zmq.Poller()
        poller.register(backend, zmq.POLLIN)
        poller.register(frontend, zmq.POLLIN)
        for socket, event in poller.poll(100):
            request = socket.recv_multipart()
            debug("collector %s received request %r", name, request)
            if socket is backend:
                if request[2] == READY:
                    debug("collector %s has new backend: %r", name, request[0])
                    backends.add(request[0])
                else:
                    debug("collector %s discard reply %r", name, request) 
            else:
                delim = request.index("")
                address_stack = request[:delim+1]
                timeout = request[delim+1]
                debug("collector %s has new work to do in %s ms", name, timeout)
                recipients = backends
                backends = set()
                debug("collector %s send requests to %r", name, recipients)
                for dest in recipients:
                    backend.send_multipart([dest] + request[delim:])
                timeout = int(timeout)
                poller = zmq.Poller()
                poller.register(backend, zmq.POLLIN)
                while recipients:
                    start = time.time()
                    debug("%r: collector %s wait %r on %r", start, name, timeout, recipients)
                    events = poller.poll(timeout)
                    if not events:
                        end = time.time()
                        if (end-start)*1000 < timeout:
                            info("no event but timeout: %r", events)
                        else:
                          dropped += 1
                          debug("%r: collector %s has a %d timeout with %r (%r)", end, name, dropped, recipients, timeout)
                          break
                    for socket, event in events:
                        reply = socket.recv_multipart()
                        if reply[2] == READY:
                            debug("%r is ready on %s", reply[0], name)
                            backends.add(reply[0])
                            recipients.discard(reply[0])
                        elif reply[0] in recipients:
                            debug("collector %s forward reply", name)
                            frontend.send_multipart(address_stack + reply[2:])
                        else:
                            debug("collector %s discard reply %r", name, reply)
                frontend.send_multipart(address_stack + [READY])
                debug("collector %s is ready with %r backends", name, len(backends))


@checkzmqerror
def broker_collector(frontend_url, backend_url):
    frontend = context.socket(zmq.XREP)
    frontend.setsockopt(zmq.IDENTITY, backend_url)
    backend = context.socket(zmq.XREP)
    info("Binding broker frontend to %s", frontend_url)
    frontend.bind(frontend_url)
    info("Binding broker backend to %s", backend_url)
    backend.bind(backend_url)
    collector("broker", frontend, backend)

@checkzmqerror
def proxy_collector(frontend_url, backend_url):
    frontend = context.socket(zmq.XREQ)
    frontend.setsockopt(zmq.IDENTITY, backend_url)
    backend = context.socket(zmq.XREP)
    info("Connecting proxy frontend to %s", frontend_url)
    frontend.connect(frontend_url)
    info("Binding proxy backend to %s", backend_url)
    # Sending presence to frontend.
    backend.bind(backend_url)
    frontend.send_multipart(["", READY])
    collector("proxy", frontend, backend)

def worker(socket, workload, failure_rate = 0):
    while True:
        debug("Worker is ready")
        socket.send_multipart(["",READY])
        request = socket.recv_multipart()
        debug("Worker receive request %r", request)
        delim = request.index("")
        address = request[:delim+1]
        timeout = request[delim+1]
        request = request[delim+2:]
        assert request[0] == "REQUEST"
        if failure_rate and random.randrange(failure_rate) == 0:
            info("worker failed")
            return False
        time.sleep(workload)
        debug("worker send reply")
        socket.send_multipart(address + [request[1], "DONE"])

@checkzmqerror
def connect_worker(url, workload, failure_rate = 0):
    while True:
        socket = context.socket(zmq.XREQ)
        info("Connecting worker to %s", url)
        socket.connect(url)
        worker(socket, workload, failure_rate)

def requester(socket, timeout = -1):
    while True:
        i = str(counter.next())
        info("Requester send request %s", i)
        socket.send_multipart(["", str(timeout), "REQUEST", i])
        results = 0
        while True:
            reply = socket.recv_multipart()
            debug("requester received reply %r", reply)
            if reply == ["",READY]:
                break
            assert reply[1] == i
            results += 1
        info("requester received %d results", results)
        # time.sleep(1)

@checkzmqerror
def connect_requester(url, timeout):
    socket = context.socket(zmq.XREQ)
    info("Connecting requester to %s", url)
    socket.connect(url)
    requester(socket, timeout)

if __name__ == "__main__":
    logging.getLogger().setLevel(logging.INFO)
    feurl = "inproc://frontend"
    beurl = "inproc://backend"
    brokers = []
    broker = threading.Thread(target = broker_collector, args = (feurl, beurl))
    broker.start()
    brokers.append(broker)
    time.sleep(2)
    senders = []
    for sender in xrange(10):
        sender = threading.Thread(target = connect_requester, args = (feurl,5000))
        sender.start()
        senders.append(sender)
    proxies = []
    proxy_urls = []
    for proxy in xrange(5):
        url = "inproc://proxy_be#%d" % (proxy,)
        proxy = threading.Thread(target = proxy_collector, args = (beurl, url))
        proxy.start()
        proxies.append(proxy)
        proxy_urls.append(url)
    time.sleep(2)
    workers = []
    for url in proxy_urls:
        for work in xrange(5):
            work = threading.Thread(target = connect_worker, args = (url, 1, 4500))
            work.start()
            workers.append(work)
    time.sleep(20)
    info("Joining thread")
    context.term()
    for thread in senders + brokers + proxies + workers:
        thread.join()