collector.py
changeset 4 4b5a51cb5fc7
parent 3 197572da88ea
child 5 55f26e7ee45e
equal deleted inserted replaced
3:197572da88ea 4:4b5a51cb5fc7
    13 def checkzmqerror(func):
    13 def checkzmqerror(func):
    14     def wrapper(*args, **kwargs):
    14     def wrapper(*args, **kwargs):
    15         try:
    15         try:
    16             func(*args, **kwargs)
    16             func(*args, **kwargs)
    17         except zmq.ZMQError, err:
    17         except zmq.ZMQError, err:
    18             info("%r(*%r, **%r) is terminating", func, args, kwargs)
    18             info("%r(*%r, **%r) is terminating with error %s", func, args, kwargs, err)
    19     return wrapper
    19     return wrapper
    20 
    20 
    21 def collector(name, frontend, backend):
    21 def collector(name, frontend, backend):
    22     backends = set()
    22     backends = set()
    23     info("collector %s is ready with %r backends", name, len(backends))
    23     debug("collector %s is ready with %r backends", name, len(backends))
    24     dropped = 0
    24     dropped = 0
    25     while True:
    25     while True:
    26         poller = zmq.Poller()
    26         poller = zmq.Poller()
    27         poller.register(backend, zmq.POLLIN)
    27         poller.register(backend, zmq.POLLIN)
    28         poller.register(frontend, zmq.POLLIN)
    28         poller.register(frontend, zmq.POLLIN)
    70                             debug("collector %s forward reply", name)
    70                             debug("collector %s forward reply", name)
    71                             frontend.send_multipart(address_stack + reply[2:])
    71                             frontend.send_multipart(address_stack + reply[2:])
    72                         else:
    72                         else:
    73                             debug("collector %s discard reply %r", name, reply)
    73                             debug("collector %s discard reply %r", name, reply)
    74                 frontend.send_multipart(address_stack + [READY])
    74                 frontend.send_multipart(address_stack + [READY])
    75                 info("collector %s is ready with %r backends", name, len(backends))
    75                 debug("collector %s is ready with %r backends", name, len(backends))
    76 
    76 
    77 
    77 
    78 @checkzmqerror
    78 @checkzmqerror
    79 def broker_collector(frontend_url, backend_url):
    79 def broker_collector(frontend_url, backend_url):
    80     frontend = context.socket(zmq.XREP)
    80     frontend = context.socket(zmq.XREP)
    99     frontend.send_multipart(["", READY])
    99     frontend.send_multipart(["", READY])
   100     collector("proxy", frontend, backend)
   100     collector("proxy", frontend, backend)
   101 
   101 
   102 def worker(socket, workload, failure_rate = 0):
   102 def worker(socket, workload, failure_rate = 0):
   103     while True:
   103     while True:
   104         info("Worker is ready")
   104         debug("Worker is ready")
   105         socket.send_multipart(["",READY])
   105         socket.send_multipart(["",READY])
   106         request = socket.recv_multipart()
   106         request = socket.recv_multipart()
   107         debug("Worker receive request %r", request)
   107         debug("Worker receive request %r", request)
   108         delim = request.index("")
   108         delim = request.index("")
   109         address = request[:delim+1]
   109         address = request[:delim+1]
   172         proxy_urls.append(url)
   172         proxy_urls.append(url)
   173     time.sleep(2)
   173     time.sleep(2)
   174     workers = []
   174     workers = []
   175     for url in proxy_urls:
   175     for url in proxy_urls:
   176         for work in xrange(5):
   176         for work in xrange(5):
   177             work = threading.Thread(target = connect_worker, args = (url, 1, 0))
   177             work = threading.Thread(target = connect_worker, args = (url, 1, 4500))
   178             work.start()
   178             work.start()
   179             workers.append(work)
   179             workers.append(work)
   180     time.sleep(20)
   180     time.sleep(20)
   181     info("Joining thread")
   181     info("Joining thread")
   182     context.term()
   182     context.term()