Set timeout by collector instead of by message. default tip
authorFabien Ninoles <fabien@tzone.org>
Tue, 10 May 2011 00:21:11 -0400
changeset 6 fdf7da5a5d21
parent 5 55f26e7ee45e
Set timeout by collector instead of by message. More aggressive in error simulation.
collector.py
--- a/collector.py	Sun May 08 23:03:00 2011 -0400
+++ b/collector.py	Tue May 10 00:21:11 2011 -0400
@@ -18,7 +18,7 @@
             info("%r(*%r, **%r) is terminating with error %s", func, args, kwargs, err)
     return wrapper
 
-def collector(name, frontend, backend):
+def collector(name, frontend, backend, timeout):
     backends = set()
     debug("collector %s is ready with %r backends", name, len(backends))
     dropped = 0
@@ -26,7 +26,7 @@
         poller = zmq.Poller()
         poller.register(backend, zmq.POLLIN)
         poller.register(frontend, zmq.POLLIN)
-        for socket, event in poller.poll(100):
+        for socket, event in poller.poll():
             request = socket.recv_multipart()
             debug("collector %s received request %r", name, request)
             if socket is backend:
@@ -38,28 +38,19 @@
             else:
                 delim = request.index("")
                 address_stack = request[:delim+1]
-                timeout = request[delim+1]
-                debug("collector %s has new work to do in %s ms", name, timeout)
+                debug("collector %s has new work to do", name)
                 recipients = backends
                 backends = set()
                 debug("collector %s send requests to %r", name, recipients)
                 for dest in recipients:
                     backend.send_multipart([dest] + request[delim:])
-                timeout = int(timeout)
                 poller = zmq.Poller()
                 poller.register(backend, zmq.POLLIN)
+                start = time.time()
+                deadline = start + timeout / 1000.0
                 while recipients:
-                    start = time.time()
-                    debug("%r: collector %s wait %r on %r", start, name, timeout, recipients)
-                    events = poller.poll(timeout)
-                    if not events:
-                        end = time.time()
-                        if (end-start)*1000 < timeout:
-                            info("no event but timeout: %r", events)
-                        else:
-                          dropped += 1
-                          debug("%r: collector %s has a %d timeout with %r (%r)", end, name, dropped, recipients, timeout)
-                          break
+                    debug("%r: collector %s wait on on %r", start, name, recipients)
+                    events = poller.poll(max(0,deadline-time.time()))
                     for socket, event in events:
                         reply = socket.recv_multipart()
                         if reply[2] == READY:
@@ -71,12 +62,16 @@
                             frontend.send_multipart(address_stack + reply[2:])
                         else:
                             debug("collector %s discard reply %r", name, reply)
+                    end = time.time()
+                    if recipients and end > deadline:
+                        info("%r: collector %s has timeout with %d recipients", end, name, len(recipients))
+                        break
                 frontend.send_multipart(address_stack + [READY])
                 debug("collector %s is ready with %r backends", name, len(backends))
 
 
 @checkzmqerror
-def broker_collector(frontend_url, backend_url):
+def broker_collector(frontend_url, backend_url, timeout):
     frontend = context.socket(zmq.XREP)
     frontend.setsockopt(zmq.IDENTITY, backend_url)
     backend = context.socket(zmq.XREP)
@@ -84,10 +79,10 @@
     frontend.bind(frontend_url)
     info("Binding broker backend to %s", backend_url)
     backend.bind(backend_url)
-    collector("broker", frontend, backend)
+    collector("broker", frontend, backend, timeout)
 
 @checkzmqerror
-def proxy_collector(frontend_url, backend_url):
+def proxy_collector(frontend_url, backend_url, timeout):
     frontend = context.socket(zmq.XREQ)
     frontend.setsockopt(zmq.IDENTITY, backend_url)
     backend = context.socket(zmq.XREP)
@@ -97,7 +92,7 @@
     # Sending presence to frontend.
     backend.bind(backend_url)
     frontend.send_multipart(["", READY])
-    collector("proxy", frontend, backend)
+    collector("proxy", frontend, backend, timeout)
 
 def worker(socket, workload, failure_rate = 0):
     while True:
@@ -105,10 +100,9 @@
         socket.send_multipart(["",READY])
         request = socket.recv_multipart()
         debug("Worker receive request %r", request)
-        delim = request.index("")
-        address = request[:delim+1]
-        timeout = request[delim+1]
-        request = request[delim+2:]
+        content = request.index("") + 1
+        address = request[:content]
+        request = request[content:]
         assert request[0] == "REQUEST"
         if failure_rate and random.randrange(failure_rate) == 0:
             info("worker failed")
@@ -125,11 +119,11 @@
         socket.connect(url)
         worker(socket, workload, failure_rate)
 
-def requester(socket, timeout = -1):
+def requester(socket):
     while True:
         i = str(counter.next())
         info("Requester send request %s", i)
-        socket.send_multipart(["", str(timeout), "REQUEST", i])
+        socket.send_multipart(["", "REQUEST", i])
         results = 0
         while True:
             reply = socket.recv_multipart()
@@ -139,34 +133,36 @@
             assert reply[1] == i
             results += 1
         info("requester received %d results", results)
-        # time.sleep(1)
 
 @checkzmqerror
-def connect_requester(url, timeout):
+def connect_requester(url):
     socket = context.socket(zmq.XREQ)
     info("Connecting requester to %s", url)
     socket.connect(url)
-    requester(socket, timeout)
+    requester(socket)
 
 if __name__ == "__main__":
     logging.getLogger().setLevel(logging.INFO)
     feurl = "inproc://frontend"
     beurl = "inproc://backend"
+    workload = 2.5
+    broker_timeout = 5000
+    proxy_timeout = 5000
     brokers = []
-    broker = threading.Thread(target = broker_collector, args = (feurl, beurl))
+    broker = threading.Thread(target = broker_collector, args = (feurl, beurl, broker_timeout))
     broker.start()
     brokers.append(broker)
     time.sleep(2)
     senders = []
     for sender in xrange(10):
-        sender = threading.Thread(target = connect_requester, args = (feurl,5000))
+        sender = threading.Thread(target = connect_requester, args = (feurl,))
         sender.start()
         senders.append(sender)
     proxies = []
     proxy_urls = []
     for proxy in xrange(5):
         url = "inproc://proxy_be#%d" % (proxy,)
-        proxy = threading.Thread(target = proxy_collector, args = (beurl, url))
+        proxy = threading.Thread(target = proxy_collector, args = (beurl, url, proxy_timeout))
         proxy.start()
         proxies.append(proxy)
         proxy_urls.append(url)
@@ -174,7 +170,7 @@
     workers = []
     for url in proxy_urls:
         for work in xrange(5):
-            work = threading.Thread(target = connect_worker, args = (url, 1, 4800))
+            work = threading.Thread(target = connect_worker, args = (url, 3, 10))
             work.start()
             workers.append(work)
     time.sleep(20)