Some rewrite and debugging.
authorFabien Ninoles <fabien@tzone.org>
Sun, 08 May 2011 22:14:10 -0400
changeset 1 48d514cc3309
parent 0 57d81f2bf26f
child 2 2744eb2a589b
Some rewrite and debugging. Issue: worker is accumulating past job.
collector.py
--- a/collector.py	Sun May 08 19:40:30 2011 -0400
+++ b/collector.py	Sun May 08 22:14:10 2011 -0400
@@ -2,122 +2,175 @@
 import time
 import threading
 import zmq
+import logging
+from logging import debug, info
+from itertools import count
 
-stop = False
 READY = "READY"
 context = zmq.Context()
+counter = count()
 
-def collector(frontend, backend):
-  poller = zmq.Poller()
-  poller.register(backend, zmq.POLLIN)
-  backends = set()
-  while not stop:
-    frontend.send_multipart(["",READY])
-    poller.register(frontend, zmq.POLLIN)
-    for socket, event in poller.poll(100):
-      request = socket.recv_multipart()
-      if socket is backend:
-        if request[2] == READY:
-          backends.add(request[0])
-      else:
-        timeout = int(request[request.index("")+1])
-        recipients = backends
-        backends = set()
-        for dest in recipients:
-          backend.send_multipart([dest] + request)
-        poller.unregister(frontend)
-        try:
-          while not stop and recipients:
-            for socket, event in poller.poll(timeout):
-              reply = socket.recv_multipart()
-              if reply[2] == READY:
-                backends.add(reply[0])
-                recipients.discard(reply[0])
-              else:
-                frontend.send_multipart(reply[2:])
-        except ZMQError:
-          pass
+def collector(name, frontend, backend):
+    poller = zmq.Poller()
+    poller.register(backend, zmq.POLLIN)
+    backends = set()
+    info("collector %s is ready with %r backends", name, len(backends))
+    while True:
+        poller.register(frontend, zmq.POLLIN)
+        for socket, event in poller.poll(100):
+            request = socket.recv_multipart()
+            debug("collector %s received request %r", name, request)
+            if socket is backend:
+                if request[2] == READY:
+                    debug("collector %s has new backend: %r", name, request[0])
+                    backends.add(request[0])
+                else:
+                    debug("collector %s discard reply %r", name, request) 
+            else:
+                delim = request.index("")
+                address_stack = request[:delim+1]
+                timeout = request[delim+1]
+                if timeout != "STOP":
+                    debug("collector %s has new work to do in %s ms", name, timeout)
+                recipients = backends
+                backends = set()
+                debug("collector %s send requests to %r", name, recipients)
+                for dest in recipients:
+                    backend.send_multipart([dest] + request[delim:])
+                if timeout == "STOP":
+                    info("collector %s is terminating", name)
+                    return
+                timeout = int(timeout)
+                poller.unregister(frontend)
+                while recipients:
+                    events = poller.poll(timeout)
+                    if not events:
+                        info("collector %s has a timeout with %r", name, recipients)
+                        break
+                    for socket, event in events:
+                        reply = socket.recv_multipart()
+                        if reply[2] == READY:
+                            debug("%r is ready on %s", reply[0], name)
+                            backends.add(reply[0])
+                            recipients.discard(reply[0])
+                        elif reply[0] in recipients:
+                            debug("collector %s forward reply", name)
+                            frontend.send_multipart(address_stack + reply[2:])
+                        else:
+                            debug("collector %s discard reply %r", name, reply)
+                frontend.send_multipart(address_stack + [READY])
+                info("collector %s is ready with %r backends", name, len(backends))
+
 
 def broker_collector(frontend_url, backend_url):
-  frontend = context.socket(zmq.XREP)
-  backend = context.socket(zmq.XREP)
-  frontend.bind(frontend_url)
-  backend.bind(backend_url)
-  collector(frontend, backend)
-  
+    frontend = context.socket(zmq.XREP)
+    frontend.setsockopt(zmq.IDENTITY, "broker")
+    backend = context.socket(zmq.XREP)
+    info("Binding broker frontend to %s", frontend_url)
+    frontend.bind(frontend_url)
+    info("Binding broker backend to %s", backend_url)
+    backend.bind(backend_url)
+    collector("broker", frontend, backend)
+
 def proxy_collector(frontend_url, backend_url):
-  frontend = context.socket(zmq.XREP)
-  backend = context.socket(zmq.XREP)
-  frontend.connect(frontend_url)
-  backend.bind(backend_url)
-  collector(frontend, backend)
-  
+    frontend = context.socket(zmq.XREQ)
+    frontend.setsockopt(zmq.IDENTITY, "proxy")
+    backend = context.socket(zmq.XREP)
+    info("Connecting proxy frontend to %s", frontend_url)
+    frontend.connect(frontend_url)
+    info("Binding proxy backend to %s", backend_url)
+    # Sending presence to frontend.
+    backend.bind(backend_url)
+    frontend.send_multipart(["", READY])
+    collector("proxy", frontend, backend)
+
 def worker(socket, workload, failure_rate = 0):
-  while not stop:
-    socket.send_multipart(["",READY])
-    request = socket.recv_multipart()
-    delim = request.index("")
-    timeout = request[delim+1]
-    request = request[delim+2:]
-    assert request[0] == "REQUEST"
-    if failure_rate and random.randrange(failure_rate) == 0:
-      return
-    time.sleep(workload)
-    socket.send_multipart(request[:delim+1] + ["DONE"])
+    while True:
+        info("Worker is ready")
+        socket.send_multipart(["",READY])
+        request = socket.recv_multipart()
+        info("Worker receive request %r", request)
+        delim = request.index("")
+        address = request[:delim+1]
+        timeout = request[delim+1]
+        if timeout == "STOP":
+            info("worker is terminating")
+            return True
+        request = request[delim+2:]
+        assert request[0] == "REQUEST"
+        if failure_rate and random.randrange(failure_rate) == 0:
+            info("worker failed")
+            return False
+        time.sleep(workload)
+        info("worker send reply")
+        socket.send_multipart(address + [request[1], "DONE"])
 
 def connect_worker(url, workload, failure_rate = 0):
-  while not stop:
-    socket = context.socket(zmq.XREQ)
-    socket.connect(url)
-    worker(socket, workload, failure_rate)
+    while True:
+        socket = context.socket(zmq.XREQ)
+        info("Connecting worker to %s", url)
+        socket.connect(url)
+        if worker(socket, workload, failure_rate):
+            return
+
+stop = False
 
 def requester(socket, timeout = -1):
-  while not stop:
-    socket.send_multipart(["", str(timeout), "REQUEST"])
-    results = 0
-    while True:
-      reply = socket.recv_multipart()
-      if reply == ["",READY]:
-        break
-      results += 1
-    print results, "results received"
-  socket.send_multipart(["", "STOP"])
+    while not stop:
+        i = str(counter.next())
+        info("Requester send request %s", i)
+        socket.send_multipart(["", str(timeout), "REQUEST", i])
+        results = 0
+        while True:
+            reply = socket.recv_multipart()
+            debug("requester received reply %r", reply)
+            if reply == ["",READY]:
+                break
+            assert reply[1] == i
+            results += 1
+        info("requester received %d results", results)
+        # time.sleep(1)
+    info("requester is terminating")
+    socket.send_multipart(["", "STOP"])
 
 def connect_requester(url, timeout):
-  socket = context.socket(zmq.XREQ)
-  socket.connect(url)
-  requester(socket, timeout)
+    socket = context.socket(zmq.XREQ)
+    info("Connecting requester to %s", url)
+    socket.connect(url)
+    requester(socket, timeout)
 
 if __name__ == "__main__":
-  feurl = "inproc://frontend"
-  beurl = "inproc://backend"
-  brokers = []
-  broker = threading.Thread(target = broker_collector, args = (feurl, beurl))
-  broker.start()
-  brokers.append(broker)
-  time.sleep(1)
-  senders = []
-  for sender in xrange(2):
-    sender = threading.Thread(target = connect_requester, args = (feurl,1000))
-    sender.start()
-    senders.append(sender)
-  proxies = []
-  proxy_urls = []
-  for proxy in xrange(2):
-    url = "inproc://proxy_be#%d" % (proxy,)
-    proxy = threading.Thread(target = proxy_collector, args = (beurl, url))
-    proxy.start()
-    proxies.append(proxy)
-    proxy_urls.append(url)
-  time.sleep(1)
-  workers = []
-  for url in proxy_urls:
-    for work in xrange(5):
-      work = threading.Thread(target = connect_worker, args = (url, 0.1, 100))
-      work.start()
-      workers.append(work)
-  time.sleep(10)
-  stop = True
-  for thread in senders + brokers + proxies + workers:
-    thread.join()
-    
+    logging.getLogger().setLevel(logging.INFO)
+    feurl = "inproc://frontend"
+    beurl = "inproc://backend"
+    brokers = []
+    broker = threading.Thread(target = broker_collector, args = (feurl, beurl))
+    broker.start()
+    brokers.append(broker)
+    time.sleep(2)
+    senders = []
+    for sender in xrange(5):
+        sender = threading.Thread(target = connect_requester, args = (feurl,5000))
+        sender.start()
+        senders.append(sender)
+    proxies = []
+    proxy_urls = []
+    for proxy in xrange(1):
+        url = "inproc://proxy_be#%d" % (proxy,)
+        proxy = threading.Thread(target = proxy_collector, args = (beurl, url))
+        proxy.start()
+        proxies.append(proxy)
+        proxy_urls.append(url)
+    time.sleep(2)
+    workers = []
+    for url in proxy_urls:
+        for work in xrange(1):
+            work = threading.Thread(target = connect_worker, args = (url, 1, 0))
+            work.start()
+            workers.append(work)
+    time.sleep(10)
+    stop = True
+    info("Joining thread")
+    for thread in senders + brokers + proxies + workers:
+        thread.join()
+