Add collector.
authorfabien@tzone.org
Sun, 08 May 2011 19:40:30 -0400
changeset 0 57d81f2bf26f
child 1 48d514cc3309
Add collector.
collector.py
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/collector.py	Sun May 08 19:40:30 2011 -0400
@@ -0,0 +1,123 @@
+import random
+import time
+import threading
+import zmq
+
+stop = False
+READY = "READY"
+context = zmq.Context()
+
+def collector(frontend, backend):
+  poller = zmq.Poller()
+  poller.register(backend, zmq.POLLIN)
+  backends = set()
+  while not stop:
+    frontend.send_multipart(["",READY])
+    poller.register(frontend, zmq.POLLIN)
+    for socket, event in poller.poll(100):
+      request = socket.recv_multipart()
+      if socket is backend:
+        if request[2] == READY:
+          backends.add(request[0])
+      else:
+        timeout = int(request[request.index("")+1])
+        recipients = backends
+        backends = set()
+        for dest in recipients:
+          backend.send_multipart([dest] + request)
+        poller.unregister(frontend)
+        try:
+          while not stop and recipients:
+            for socket, event in poller.poll(timeout):
+              reply = socket.recv_multipart()
+              if reply[2] == READY:
+                backends.add(reply[0])
+                recipients.discard(reply[0])
+              else:
+                frontend.send_multipart(reply[2:])
+        except ZMQError:
+          pass
+
+def broker_collector(frontend_url, backend_url):
+  frontend = context.socket(zmq.XREP)
+  backend = context.socket(zmq.XREP)
+  frontend.bind(frontend_url)
+  backend.bind(backend_url)
+  collector(frontend, backend)
+  
+def proxy_collector(frontend_url, backend_url):
+  frontend = context.socket(zmq.XREP)
+  backend = context.socket(zmq.XREP)
+  frontend.connect(frontend_url)
+  backend.bind(backend_url)
+  collector(frontend, backend)
+  
+def worker(socket, workload, failure_rate = 0):
+  while not stop:
+    socket.send_multipart(["",READY])
+    request = socket.recv_multipart()
+    delim = request.index("")
+    timeout = request[delim+1]
+    request = request[delim+2:]
+    assert request[0] == "REQUEST"
+    if failure_rate and random.randrange(failure_rate) == 0:
+      return
+    time.sleep(workload)
+    socket.send_multipart(request[:delim+1] + ["DONE"])
+
+def connect_worker(url, workload, failure_rate = 0):
+  while not stop:
+    socket = context.socket(zmq.XREQ)
+    socket.connect(url)
+    worker(socket, workload, failure_rate)
+
+def requester(socket, timeout = -1):
+  while not stop:
+    socket.send_multipart(["", str(timeout), "REQUEST"])
+    results = 0
+    while True:
+      reply = socket.recv_multipart()
+      if reply == ["",READY]:
+        break
+      results += 1
+    print results, "results received"
+  socket.send_multipart(["", "STOP"])
+
+def connect_requester(url, timeout):
+  socket = context.socket(zmq.XREQ)
+  socket.connect(url)
+  requester(socket, timeout)
+
+if __name__ == "__main__":
+  feurl = "inproc://frontend"
+  beurl = "inproc://backend"
+  brokers = []
+  broker = threading.Thread(target = broker_collector, args = (feurl, beurl))
+  broker.start()
+  brokers.append(broker)
+  time.sleep(1)
+  senders = []
+  for sender in xrange(2):
+    sender = threading.Thread(target = connect_requester, args = (feurl,1000))
+    sender.start()
+    senders.append(sender)
+  proxies = []
+  proxy_urls = []
+  for proxy in xrange(2):
+    url = "inproc://proxy_be#%d" % (proxy,)
+    proxy = threading.Thread(target = proxy_collector, args = (beurl, url))
+    proxy.start()
+    proxies.append(proxy)
+    proxy_urls.append(url)
+  time.sleep(1)
+  workers = []
+  for url in proxy_urls:
+    for work in xrange(5):
+      work = threading.Thread(target = connect_worker, args = (url, 0.1, 100))
+      work.start()
+      workers.append(work)
+  time.sleep(10)
+  stop = True
+  for thread in senders + brokers + proxies + workers:
+    thread.join()
+