# HG changeset patch # User Fabien Ninoles # Date 1304908871 14400 # Node ID 2744eb2a589b58e3d199d4b93ee6e4594068aec7 # Parent 48d514cc3309c8ffd58b6576df53fc8f257bc8f3 Work better if we don't reused the same poller each time. diff -r 48d514cc3309 -r 2744eb2a589b collector.py --- a/collector.py Sun May 08 22:14:10 2011 -0400 +++ b/collector.py Sun May 08 22:41:11 2011 -0400 @@ -10,12 +10,21 @@ context = zmq.Context() counter = count() +def checkzmqerror(func): + def wrapper(*args, **kwargs): + try: + func(*args, **kwargs) + except zmq.ZMQError, err: + info("%r(*%r, **%r) is terminating", func, args, kwargs) + return wrapper + def collector(name, frontend, backend): - poller = zmq.Poller() - poller.register(backend, zmq.POLLIN) backends = set() info("collector %s is ready with %r backends", name, len(backends)) + dropped = 0 while True: + poller = zmq.Poller() + poller.register(backend, zmq.POLLIN) poller.register(frontend, zmq.POLLIN) for socket, event in poller.poll(100): request = socket.recv_multipart() @@ -30,23 +39,27 @@ delim = request.index("") address_stack = request[:delim+1] timeout = request[delim+1] - if timeout != "STOP": - debug("collector %s has new work to do in %s ms", name, timeout) + debug("collector %s has new work to do in %s ms", name, timeout) recipients = backends backends = set() debug("collector %s send requests to %r", name, recipients) for dest in recipients: backend.send_multipart([dest] + request[delim:]) - if timeout == "STOP": - info("collector %s is terminating", name) - return timeout = int(timeout) - poller.unregister(frontend) + poller = zmq.Poller() + poller.register(backend, zmq.POLLIN) while recipients: + start = time.time() + debug("%r: collector %s wait %r on %r", start, name, timeout, recipients) events = poller.poll(timeout) if not events: - info("collector %s has a timeout with %r", name, recipients) - break + end = time.time() + if (end-start)*1000 < timeout: + info("no event but timeout: %r", events) + else: + dropped += 1 + debug("%r: collector %s has a %d timeout with %r (%r)", end, name, dropped, recipients, timeout) + break for socket, event in events: reply = socket.recv_multipart() if reply[2] == READY: @@ -62,6 +75,7 @@ info("collector %s is ready with %r backends", name, len(backends)) +@checkzmqerror def broker_collector(frontend_url, backend_url): frontend = context.socket(zmq.XREP) frontend.setsockopt(zmq.IDENTITY, "broker") @@ -72,6 +86,7 @@ backend.bind(backend_url) collector("broker", frontend, backend) +@checkzmqerror def proxy_collector(frontend_url, backend_url): frontend = context.socket(zmq.XREQ) frontend.setsockopt(zmq.IDENTITY, "proxy") @@ -89,34 +104,29 @@ info("Worker is ready") socket.send_multipart(["",READY]) request = socket.recv_multipart() - info("Worker receive request %r", request) + debug("Worker receive request %r", request) delim = request.index("") address = request[:delim+1] timeout = request[delim+1] - if timeout == "STOP": - info("worker is terminating") - return True request = request[delim+2:] assert request[0] == "REQUEST" if failure_rate and random.randrange(failure_rate) == 0: info("worker failed") return False time.sleep(workload) - info("worker send reply") + debug("worker send reply") socket.send_multipart(address + [request[1], "DONE"]) +@checkzmqerror def connect_worker(url, workload, failure_rate = 0): while True: socket = context.socket(zmq.XREQ) info("Connecting worker to %s", url) socket.connect(url) - if worker(socket, workload, failure_rate): - return - -stop = False + worker(socket, workload, failure_rate) def requester(socket, timeout = -1): - while not stop: + while True: i = str(counter.next()) info("Requester send request %s", i) socket.send_multipart(["", str(timeout), "REQUEST", i]) @@ -130,9 +140,8 @@ results += 1 info("requester received %d results", results) # time.sleep(1) - info("requester is terminating") - socket.send_multipart(["", "STOP"]) +@checkzmqerror def connect_requester(url, timeout): socket = context.socket(zmq.XREQ) info("Connecting requester to %s", url) @@ -169,8 +178,8 @@ work.start() workers.append(work) time.sleep(10) - stop = True info("Joining thread") + context.term() for thread in senders + brokers + proxies + workers: thread.join()