collector.py
changeset 0 57d81f2bf26f
child 1 48d514cc3309
equal deleted inserted replaced
-1:000000000000 0:57d81f2bf26f
       
     1 import random
       
     2 import time
       
     3 import threading
       
     4 import zmq
       
     5 
       
     6 stop = False
       
     7 READY = "READY"
       
     8 context = zmq.Context()
       
     9 
       
    10 def collector(frontend, backend):
       
    11   poller = zmq.Poller()
       
    12   poller.register(backend, zmq.POLLIN)
       
    13   backends = set()
       
    14   while not stop:
       
    15     frontend.send_multipart(["",READY])
       
    16     poller.register(frontend, zmq.POLLIN)
       
    17     for socket, event in poller.poll(100):
       
    18       request = socket.recv_multipart()
       
    19       if socket is backend:
       
    20         if request[2] == READY:
       
    21           backends.add(request[0])
       
    22       else:
       
    23         timeout = int(request[request.index("")+1])
       
    24         recipients = backends
       
    25         backends = set()
       
    26         for dest in recipients:
       
    27           backend.send_multipart([dest] + request)
       
    28         poller.unregister(frontend)
       
    29         try:
       
    30           while not stop and recipients:
       
    31             for socket, event in poller.poll(timeout):
       
    32               reply = socket.recv_multipart()
       
    33               if reply[2] == READY:
       
    34                 backends.add(reply[0])
       
    35                 recipients.discard(reply[0])
       
    36               else:
       
    37                 frontend.send_multipart(reply[2:])
       
    38         except ZMQError:
       
    39           pass
       
    40 
       
    41 def broker_collector(frontend_url, backend_url):
       
    42   frontend = context.socket(zmq.XREP)
       
    43   backend = context.socket(zmq.XREP)
       
    44   frontend.bind(frontend_url)
       
    45   backend.bind(backend_url)
       
    46   collector(frontend, backend)
       
    47   
       
    48 def proxy_collector(frontend_url, backend_url):
       
    49   frontend = context.socket(zmq.XREP)
       
    50   backend = context.socket(zmq.XREP)
       
    51   frontend.connect(frontend_url)
       
    52   backend.bind(backend_url)
       
    53   collector(frontend, backend)
       
    54   
       
    55 def worker(socket, workload, failure_rate = 0):
       
    56   while not stop:
       
    57     socket.send_multipart(["",READY])
       
    58     request = socket.recv_multipart()
       
    59     delim = request.index("")
       
    60     timeout = request[delim+1]
       
    61     request = request[delim+2:]
       
    62     assert request[0] == "REQUEST"
       
    63     if failure_rate and random.randrange(failure_rate) == 0:
       
    64       return
       
    65     time.sleep(workload)
       
    66     socket.send_multipart(request[:delim+1] + ["DONE"])
       
    67 
       
    68 def connect_worker(url, workload, failure_rate = 0):
       
    69   while not stop:
       
    70     socket = context.socket(zmq.XREQ)
       
    71     socket.connect(url)
       
    72     worker(socket, workload, failure_rate)
       
    73 
       
    74 def requester(socket, timeout = -1):
       
    75   while not stop:
       
    76     socket.send_multipart(["", str(timeout), "REQUEST"])
       
    77     results = 0
       
    78     while True:
       
    79       reply = socket.recv_multipart()
       
    80       if reply == ["",READY]:
       
    81         break
       
    82       results += 1
       
    83     print results, "results received"
       
    84   socket.send_multipart(["", "STOP"])
       
    85 
       
    86 def connect_requester(url, timeout):
       
    87   socket = context.socket(zmq.XREQ)
       
    88   socket.connect(url)
       
    89   requester(socket, timeout)
       
    90 
       
    91 if __name__ == "__main__":
       
    92   feurl = "inproc://frontend"
       
    93   beurl = "inproc://backend"
       
    94   brokers = []
       
    95   broker = threading.Thread(target = broker_collector, args = (feurl, beurl))
       
    96   broker.start()
       
    97   brokers.append(broker)
       
    98   time.sleep(1)
       
    99   senders = []
       
   100   for sender in xrange(2):
       
   101     sender = threading.Thread(target = connect_requester, args = (feurl,1000))
       
   102     sender.start()
       
   103     senders.append(sender)
       
   104   proxies = []
       
   105   proxy_urls = []
       
   106   for proxy in xrange(2):
       
   107     url = "inproc://proxy_be#%d" % (proxy,)
       
   108     proxy = threading.Thread(target = proxy_collector, args = (beurl, url))
       
   109     proxy.start()
       
   110     proxies.append(proxy)
       
   111     proxy_urls.append(url)
       
   112   time.sleep(1)
       
   113   workers = []
       
   114   for url in proxy_urls:
       
   115     for work in xrange(5):
       
   116       work = threading.Thread(target = connect_worker, args = (url, 0.1, 100))
       
   117       work.start()
       
   118       workers.append(work)
       
   119   time.sleep(10)
       
   120   stop = True
       
   121   for thread in senders + brokers + proxies + workers:
       
   122     thread.join()
       
   123