# HG changeset patch # User Fabien Ninoles # Date 1305001271 14400 # Node ID fdf7da5a5d21e7b63adddb50443b38c2be9857a2 # Parent 55f26e7ee45eb6914c69d1d21c13ef3ef9d18212 Set timeout by collector instead of by message. More aggressive in error simulation. diff -r 55f26e7ee45e -r fdf7da5a5d21 collector.py --- a/collector.py Sun May 08 23:03:00 2011 -0400 +++ b/collector.py Tue May 10 00:21:11 2011 -0400 @@ -18,7 +18,7 @@ info("%r(*%r, **%r) is terminating with error %s", func, args, kwargs, err) return wrapper -def collector(name, frontend, backend): +def collector(name, frontend, backend, timeout): backends = set() debug("collector %s is ready with %r backends", name, len(backends)) dropped = 0 @@ -26,7 +26,7 @@ poller = zmq.Poller() poller.register(backend, zmq.POLLIN) poller.register(frontend, zmq.POLLIN) - for socket, event in poller.poll(100): + for socket, event in poller.poll(): request = socket.recv_multipart() debug("collector %s received request %r", name, request) if socket is backend: @@ -38,28 +38,19 @@ else: delim = request.index("") address_stack = request[:delim+1] - timeout = request[delim+1] - debug("collector %s has new work to do in %s ms", name, timeout) + debug("collector %s has new work to do", name) recipients = backends backends = set() debug("collector %s send requests to %r", name, recipients) for dest in recipients: backend.send_multipart([dest] + request[delim:]) - timeout = int(timeout) poller = zmq.Poller() poller.register(backend, zmq.POLLIN) + start = time.time() + deadline = start + timeout / 1000.0 while recipients: - start = time.time() - debug("%r: collector %s wait %r on %r", start, name, timeout, recipients) - events = poller.poll(timeout) - if not events: - end = time.time() - if (end-start)*1000 < timeout: - info("no event but timeout: %r", events) - else: - dropped += 1 - debug("%r: collector %s has a %d timeout with %r (%r)", end, name, dropped, recipients, timeout) - break + debug("%r: collector %s wait on on %r", start, name, recipients) + events = poller.poll(max(0,deadline-time.time())) for socket, event in events: reply = socket.recv_multipart() if reply[2] == READY: @@ -71,12 +62,16 @@ frontend.send_multipart(address_stack + reply[2:]) else: debug("collector %s discard reply %r", name, reply) + end = time.time() + if recipients and end > deadline: + info("%r: collector %s has timeout with %d recipients", end, name, len(recipients)) + break frontend.send_multipart(address_stack + [READY]) debug("collector %s is ready with %r backends", name, len(backends)) @checkzmqerror -def broker_collector(frontend_url, backend_url): +def broker_collector(frontend_url, backend_url, timeout): frontend = context.socket(zmq.XREP) frontend.setsockopt(zmq.IDENTITY, backend_url) backend = context.socket(zmq.XREP) @@ -84,10 +79,10 @@ frontend.bind(frontend_url) info("Binding broker backend to %s", backend_url) backend.bind(backend_url) - collector("broker", frontend, backend) + collector("broker", frontend, backend, timeout) @checkzmqerror -def proxy_collector(frontend_url, backend_url): +def proxy_collector(frontend_url, backend_url, timeout): frontend = context.socket(zmq.XREQ) frontend.setsockopt(zmq.IDENTITY, backend_url) backend = context.socket(zmq.XREP) @@ -97,7 +92,7 @@ # Sending presence to frontend. backend.bind(backend_url) frontend.send_multipart(["", READY]) - collector("proxy", frontend, backend) + collector("proxy", frontend, backend, timeout) def worker(socket, workload, failure_rate = 0): while True: @@ -105,10 +100,9 @@ socket.send_multipart(["",READY]) request = socket.recv_multipart() debug("Worker receive request %r", request) - delim = request.index("") - address = request[:delim+1] - timeout = request[delim+1] - request = request[delim+2:] + content = request.index("") + 1 + address = request[:content] + request = request[content:] assert request[0] == "REQUEST" if failure_rate and random.randrange(failure_rate) == 0: info("worker failed") @@ -125,11 +119,11 @@ socket.connect(url) worker(socket, workload, failure_rate) -def requester(socket, timeout = -1): +def requester(socket): while True: i = str(counter.next()) info("Requester send request %s", i) - socket.send_multipart(["", str(timeout), "REQUEST", i]) + socket.send_multipart(["", "REQUEST", i]) results = 0 while True: reply = socket.recv_multipart() @@ -139,34 +133,36 @@ assert reply[1] == i results += 1 info("requester received %d results", results) - # time.sleep(1) @checkzmqerror -def connect_requester(url, timeout): +def connect_requester(url): socket = context.socket(zmq.XREQ) info("Connecting requester to %s", url) socket.connect(url) - requester(socket, timeout) + requester(socket) if __name__ == "__main__": logging.getLogger().setLevel(logging.INFO) feurl = "inproc://frontend" beurl = "inproc://backend" + workload = 2.5 + broker_timeout = 5000 + proxy_timeout = 5000 brokers = [] - broker = threading.Thread(target = broker_collector, args = (feurl, beurl)) + broker = threading.Thread(target = broker_collector, args = (feurl, beurl, broker_timeout)) broker.start() brokers.append(broker) time.sleep(2) senders = [] for sender in xrange(10): - sender = threading.Thread(target = connect_requester, args = (feurl,5000)) + sender = threading.Thread(target = connect_requester, args = (feurl,)) sender.start() senders.append(sender) proxies = [] proxy_urls = [] for proxy in xrange(5): url = "inproc://proxy_be#%d" % (proxy,) - proxy = threading.Thread(target = proxy_collector, args = (beurl, url)) + proxy = threading.Thread(target = proxy_collector, args = (beurl, url, proxy_timeout)) proxy.start() proxies.append(proxy) proxy_urls.append(url) @@ -174,7 +170,7 @@ workers = [] for url in proxy_urls: for work in xrange(5): - work = threading.Thread(target = connect_worker, args = (url, 1, 4800)) + work = threading.Thread(target = connect_worker, args = (url, 3, 10)) work.start() workers.append(work) time.sleep(20)