collector.py
author fabien@tzone.org
Sun, 08 May 2011 19:40:30 -0400
changeset 0 57d81f2bf26f
child 1 48d514cc3309
permissions -rw-r--r--
Add collector.

import random
import time
import threading
import zmq

stop = False
READY = "READY"
context = zmq.Context()

def collector(frontend, backend):
  poller = zmq.Poller()
  poller.register(backend, zmq.POLLIN)
  backends = set()
  while not stop:
    frontend.send_multipart(["",READY])
    poller.register(frontend, zmq.POLLIN)
    for socket, event in poller.poll(100):
      request = socket.recv_multipart()
      if socket is backend:
        if request[2] == READY:
          backends.add(request[0])
      else:
        timeout = int(request[request.index("")+1])
        recipients = backends
        backends = set()
        for dest in recipients:
          backend.send_multipart([dest] + request)
        poller.unregister(frontend)
        try:
          while not stop and recipients:
            for socket, event in poller.poll(timeout):
              reply = socket.recv_multipart()
              if reply[2] == READY:
                backends.add(reply[0])
                recipients.discard(reply[0])
              else:
                frontend.send_multipart(reply[2:])
        except ZMQError:
          pass

def broker_collector(frontend_url, backend_url):
  frontend = context.socket(zmq.XREP)
  backend = context.socket(zmq.XREP)
  frontend.bind(frontend_url)
  backend.bind(backend_url)
  collector(frontend, backend)
  
def proxy_collector(frontend_url, backend_url):
  frontend = context.socket(zmq.XREP)
  backend = context.socket(zmq.XREP)
  frontend.connect(frontend_url)
  backend.bind(backend_url)
  collector(frontend, backend)
  
def worker(socket, workload, failure_rate = 0):
  while not stop:
    socket.send_multipart(["",READY])
    request = socket.recv_multipart()
    delim = request.index("")
    timeout = request[delim+1]
    request = request[delim+2:]
    assert request[0] == "REQUEST"
    if failure_rate and random.randrange(failure_rate) == 0:
      return
    time.sleep(workload)
    socket.send_multipart(request[:delim+1] + ["DONE"])

def connect_worker(url, workload, failure_rate = 0):
  while not stop:
    socket = context.socket(zmq.XREQ)
    socket.connect(url)
    worker(socket, workload, failure_rate)

def requester(socket, timeout = -1):
  while not stop:
    socket.send_multipart(["", str(timeout), "REQUEST"])
    results = 0
    while True:
      reply = socket.recv_multipart()
      if reply == ["",READY]:
        break
      results += 1
    print results, "results received"
  socket.send_multipart(["", "STOP"])

def connect_requester(url, timeout):
  socket = context.socket(zmq.XREQ)
  socket.connect(url)
  requester(socket, timeout)

if __name__ == "__main__":
  feurl = "inproc://frontend"
  beurl = "inproc://backend"
  brokers = []
  broker = threading.Thread(target = broker_collector, args = (feurl, beurl))
  broker.start()
  brokers.append(broker)
  time.sleep(1)
  senders = []
  for sender in xrange(2):
    sender = threading.Thread(target = connect_requester, args = (feurl,1000))
    sender.start()
    senders.append(sender)
  proxies = []
  proxy_urls = []
  for proxy in xrange(2):
    url = "inproc://proxy_be#%d" % (proxy,)
    proxy = threading.Thread(target = proxy_collector, args = (beurl, url))
    proxy.start()
    proxies.append(proxy)
    proxy_urls.append(url)
  time.sleep(1)
  workers = []
  for url in proxy_urls:
    for work in xrange(5):
      work = threading.Thread(target = connect_worker, args = (url, 0.1, 100))
      work.start()
      workers.append(work)
  time.sleep(10)
  stop = True
  for thread in senders + brokers + proxies + workers:
    thread.join()