Work better if we don't reused the same poller each time.
authorFabien Ninoles <fabien@tzone.org>
Sun, 08 May 2011 22:41:11 -0400
changeset 2 2744eb2a589b
parent 1 48d514cc3309
child 3 197572da88ea
Work better if we don't reused the same poller each time.
collector.py
--- a/collector.py	Sun May 08 22:14:10 2011 -0400
+++ b/collector.py	Sun May 08 22:41:11 2011 -0400
@@ -10,12 +10,21 @@
 context = zmq.Context()
 counter = count()
 
+def checkzmqerror(func):
+    def wrapper(*args, **kwargs):
+        try:
+            func(*args, **kwargs)
+        except zmq.ZMQError, err:
+            info("%r(*%r, **%r) is terminating", func, args, kwargs)
+    return wrapper
+
 def collector(name, frontend, backend):
-    poller = zmq.Poller()
-    poller.register(backend, zmq.POLLIN)
     backends = set()
     info("collector %s is ready with %r backends", name, len(backends))
+    dropped = 0
     while True:
+        poller = zmq.Poller()
+        poller.register(backend, zmq.POLLIN)
         poller.register(frontend, zmq.POLLIN)
         for socket, event in poller.poll(100):
             request = socket.recv_multipart()
@@ -30,23 +39,27 @@
                 delim = request.index("")
                 address_stack = request[:delim+1]
                 timeout = request[delim+1]
-                if timeout != "STOP":
-                    debug("collector %s has new work to do in %s ms", name, timeout)
+                debug("collector %s has new work to do in %s ms", name, timeout)
                 recipients = backends
                 backends = set()
                 debug("collector %s send requests to %r", name, recipients)
                 for dest in recipients:
                     backend.send_multipart([dest] + request[delim:])
-                if timeout == "STOP":
-                    info("collector %s is terminating", name)
-                    return
                 timeout = int(timeout)
-                poller.unregister(frontend)
+                poller = zmq.Poller()
+                poller.register(backend, zmq.POLLIN)
                 while recipients:
+                    start = time.time()
+                    debug("%r: collector %s wait %r on %r", start, name, timeout, recipients)
                     events = poller.poll(timeout)
                     if not events:
-                        info("collector %s has a timeout with %r", name, recipients)
-                        break
+                        end = time.time()
+                        if (end-start)*1000 < timeout:
+                            info("no event but timeout: %r", events)
+                        else:
+                          dropped += 1
+                          debug("%r: collector %s has a %d timeout with %r (%r)", end, name, dropped, recipients, timeout)
+                          break
                     for socket, event in events:
                         reply = socket.recv_multipart()
                         if reply[2] == READY:
@@ -62,6 +75,7 @@
                 info("collector %s is ready with %r backends", name, len(backends))
 
 
+@checkzmqerror
 def broker_collector(frontend_url, backend_url):
     frontend = context.socket(zmq.XREP)
     frontend.setsockopt(zmq.IDENTITY, "broker")
@@ -72,6 +86,7 @@
     backend.bind(backend_url)
     collector("broker", frontend, backend)
 
+@checkzmqerror
 def proxy_collector(frontend_url, backend_url):
     frontend = context.socket(zmq.XREQ)
     frontend.setsockopt(zmq.IDENTITY, "proxy")
@@ -89,34 +104,29 @@
         info("Worker is ready")
         socket.send_multipart(["",READY])
         request = socket.recv_multipart()
-        info("Worker receive request %r", request)
+        debug("Worker receive request %r", request)
         delim = request.index("")
         address = request[:delim+1]
         timeout = request[delim+1]
-        if timeout == "STOP":
-            info("worker is terminating")
-            return True
         request = request[delim+2:]
         assert request[0] == "REQUEST"
         if failure_rate and random.randrange(failure_rate) == 0:
             info("worker failed")
             return False
         time.sleep(workload)
-        info("worker send reply")
+        debug("worker send reply")
         socket.send_multipart(address + [request[1], "DONE"])
 
+@checkzmqerror
 def connect_worker(url, workload, failure_rate = 0):
     while True:
         socket = context.socket(zmq.XREQ)
         info("Connecting worker to %s", url)
         socket.connect(url)
-        if worker(socket, workload, failure_rate):
-            return
-
-stop = False
+        worker(socket, workload, failure_rate)
 
 def requester(socket, timeout = -1):
-    while not stop:
+    while True:
         i = str(counter.next())
         info("Requester send request %s", i)
         socket.send_multipart(["", str(timeout), "REQUEST", i])
@@ -130,9 +140,8 @@
             results += 1
         info("requester received %d results", results)
         # time.sleep(1)
-    info("requester is terminating")
-    socket.send_multipart(["", "STOP"])
 
+@checkzmqerror
 def connect_requester(url, timeout):
     socket = context.socket(zmq.XREQ)
     info("Connecting requester to %s", url)
@@ -169,8 +178,8 @@
             work.start()
             workers.append(work)
     time.sleep(10)
-    stop = True
     info("Joining thread")
+    context.term()
     for thread in senders + brokers + proxies + workers:
         thread.join()