# HG changeset patch # User fabien@tzone.org # Date 1304898030 14400 # Node ID 57d81f2bf26f8f635a9ad1e47c1e17554203e88f Add collector. diff -r 000000000000 -r 57d81f2bf26f collector.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/collector.py Sun May 08 19:40:30 2011 -0400 @@ -0,0 +1,123 @@ +import random +import time +import threading +import zmq + +stop = False +READY = "READY" +context = zmq.Context() + +def collector(frontend, backend): + poller = zmq.Poller() + poller.register(backend, zmq.POLLIN) + backends = set() + while not stop: + frontend.send_multipart(["",READY]) + poller.register(frontend, zmq.POLLIN) + for socket, event in poller.poll(100): + request = socket.recv_multipart() + if socket is backend: + if request[2] == READY: + backends.add(request[0]) + else: + timeout = int(request[request.index("")+1]) + recipients = backends + backends = set() + for dest in recipients: + backend.send_multipart([dest] + request) + poller.unregister(frontend) + try: + while not stop and recipients: + for socket, event in poller.poll(timeout): + reply = socket.recv_multipart() + if reply[2] == READY: + backends.add(reply[0]) + recipients.discard(reply[0]) + else: + frontend.send_multipart(reply[2:]) + except ZMQError: + pass + +def broker_collector(frontend_url, backend_url): + frontend = context.socket(zmq.XREP) + backend = context.socket(zmq.XREP) + frontend.bind(frontend_url) + backend.bind(backend_url) + collector(frontend, backend) + +def proxy_collector(frontend_url, backend_url): + frontend = context.socket(zmq.XREP) + backend = context.socket(zmq.XREP) + frontend.connect(frontend_url) + backend.bind(backend_url) + collector(frontend, backend) + +def worker(socket, workload, failure_rate = 0): + while not stop: + socket.send_multipart(["",READY]) + request = socket.recv_multipart() + delim = request.index("") + timeout = request[delim+1] + request = request[delim+2:] + assert request[0] == "REQUEST" + if failure_rate and random.randrange(failure_rate) == 0: + return + time.sleep(workload) + socket.send_multipart(request[:delim+1] + ["DONE"]) + +def connect_worker(url, workload, failure_rate = 0): + while not stop: + socket = context.socket(zmq.XREQ) + socket.connect(url) + worker(socket, workload, failure_rate) + +def requester(socket, timeout = -1): + while not stop: + socket.send_multipart(["", str(timeout), "REQUEST"]) + results = 0 + while True: + reply = socket.recv_multipart() + if reply == ["",READY]: + break + results += 1 + print results, "results received" + socket.send_multipart(["", "STOP"]) + +def connect_requester(url, timeout): + socket = context.socket(zmq.XREQ) + socket.connect(url) + requester(socket, timeout) + +if __name__ == "__main__": + feurl = "inproc://frontend" + beurl = "inproc://backend" + brokers = [] + broker = threading.Thread(target = broker_collector, args = (feurl, beurl)) + broker.start() + brokers.append(broker) + time.sleep(1) + senders = [] + for sender in xrange(2): + sender = threading.Thread(target = connect_requester, args = (feurl,1000)) + sender.start() + senders.append(sender) + proxies = [] + proxy_urls = [] + for proxy in xrange(2): + url = "inproc://proxy_be#%d" % (proxy,) + proxy = threading.Thread(target = proxy_collector, args = (beurl, url)) + proxy.start() + proxies.append(proxy) + proxy_urls.append(url) + time.sleep(1) + workers = [] + for url in proxy_urls: + for work in xrange(5): + work = threading.Thread(target = connect_worker, args = (url, 0.1, 100)) + work.start() + workers.append(work) + time.sleep(10) + stop = True + for thread in senders + brokers + proxies + workers: + thread.join() +