collector.py
changeset 6 fdf7da5a5d21
parent 5 55f26e7ee45e
equal deleted inserted replaced
5:55f26e7ee45e 6:fdf7da5a5d21
    16             func(*args, **kwargs)
    16             func(*args, **kwargs)
    17         except zmq.ZMQError, err:
    17         except zmq.ZMQError, err:
    18             info("%r(*%r, **%r) is terminating with error %s", func, args, kwargs, err)
    18             info("%r(*%r, **%r) is terminating with error %s", func, args, kwargs, err)
    19     return wrapper
    19     return wrapper
    20 
    20 
    21 def collector(name, frontend, backend):
    21 def collector(name, frontend, backend, timeout):
    22     backends = set()
    22     backends = set()
    23     debug("collector %s is ready with %r backends", name, len(backends))
    23     debug("collector %s is ready with %r backends", name, len(backends))
    24     dropped = 0
    24     dropped = 0
    25     while True:
    25     while True:
    26         poller = zmq.Poller()
    26         poller = zmq.Poller()
    27         poller.register(backend, zmq.POLLIN)
    27         poller.register(backend, zmq.POLLIN)
    28         poller.register(frontend, zmq.POLLIN)
    28         poller.register(frontend, zmq.POLLIN)
    29         for socket, event in poller.poll(100):
    29         for socket, event in poller.poll():
    30             request = socket.recv_multipart()
    30             request = socket.recv_multipart()
    31             debug("collector %s received request %r", name, request)
    31             debug("collector %s received request %r", name, request)
    32             if socket is backend:
    32             if socket is backend:
    33                 if request[2] == READY:
    33                 if request[2] == READY:
    34                     debug("collector %s has new backend: %r", name, request[0])
    34                     debug("collector %s has new backend: %r", name, request[0])
    36                 else:
    36                 else:
    37                     debug("collector %s discard reply %r", name, request) 
    37                     debug("collector %s discard reply %r", name, request) 
    38             else:
    38             else:
    39                 delim = request.index("")
    39                 delim = request.index("")
    40                 address_stack = request[:delim+1]
    40                 address_stack = request[:delim+1]
    41                 timeout = request[delim+1]
    41                 debug("collector %s has new work to do", name)
    42                 debug("collector %s has new work to do in %s ms", name, timeout)
       
    43                 recipients = backends
    42                 recipients = backends
    44                 backends = set()
    43                 backends = set()
    45                 debug("collector %s send requests to %r", name, recipients)
    44                 debug("collector %s send requests to %r", name, recipients)
    46                 for dest in recipients:
    45                 for dest in recipients:
    47                     backend.send_multipart([dest] + request[delim:])
    46                     backend.send_multipart([dest] + request[delim:])
    48                 timeout = int(timeout)
       
    49                 poller = zmq.Poller()
    47                 poller = zmq.Poller()
    50                 poller.register(backend, zmq.POLLIN)
    48                 poller.register(backend, zmq.POLLIN)
       
    49                 start = time.time()
       
    50                 deadline = start + timeout / 1000.0
    51                 while recipients:
    51                 while recipients:
    52                     start = time.time()
    52                     debug("%r: collector %s wait on on %r", start, name, recipients)
    53                     debug("%r: collector %s wait %r on %r", start, name, timeout, recipients)
    53                     events = poller.poll(max(0,deadline-time.time()))
    54                     events = poller.poll(timeout)
       
    55                     if not events:
       
    56                         end = time.time()
       
    57                         if (end-start)*1000 < timeout:
       
    58                             info("no event but timeout: %r", events)
       
    59                         else:
       
    60                           dropped += 1
       
    61                           debug("%r: collector %s has a %d timeout with %r (%r)", end, name, dropped, recipients, timeout)
       
    62                           break
       
    63                     for socket, event in events:
    54                     for socket, event in events:
    64                         reply = socket.recv_multipart()
    55                         reply = socket.recv_multipart()
    65                         if reply[2] == READY:
    56                         if reply[2] == READY:
    66                             debug("%r is ready on %s", reply[0], name)
    57                             debug("%r is ready on %s", reply[0], name)
    67                             backends.add(reply[0])
    58                             backends.add(reply[0])
    69                         elif reply[0] in recipients:
    60                         elif reply[0] in recipients:
    70                             debug("collector %s forward reply", name)
    61                             debug("collector %s forward reply", name)
    71                             frontend.send_multipart(address_stack + reply[2:])
    62                             frontend.send_multipart(address_stack + reply[2:])
    72                         else:
    63                         else:
    73                             debug("collector %s discard reply %r", name, reply)
    64                             debug("collector %s discard reply %r", name, reply)
       
    65                     end = time.time()
       
    66                     if recipients and end > deadline:
       
    67                         info("%r: collector %s has timeout with %d recipients", end, name, len(recipients))
       
    68                         break
    74                 frontend.send_multipart(address_stack + [READY])
    69                 frontend.send_multipart(address_stack + [READY])
    75                 debug("collector %s is ready with %r backends", name, len(backends))
    70                 debug("collector %s is ready with %r backends", name, len(backends))
    76 
    71 
    77 
    72 
    78 @checkzmqerror
    73 @checkzmqerror
    79 def broker_collector(frontend_url, backend_url):
    74 def broker_collector(frontend_url, backend_url, timeout):
    80     frontend = context.socket(zmq.XREP)
    75     frontend = context.socket(zmq.XREP)
    81     frontend.setsockopt(zmq.IDENTITY, backend_url)
    76     frontend.setsockopt(zmq.IDENTITY, backend_url)
    82     backend = context.socket(zmq.XREP)
    77     backend = context.socket(zmq.XREP)
    83     info("Binding broker frontend to %s", frontend_url)
    78     info("Binding broker frontend to %s", frontend_url)
    84     frontend.bind(frontend_url)
    79     frontend.bind(frontend_url)
    85     info("Binding broker backend to %s", backend_url)
    80     info("Binding broker backend to %s", backend_url)
    86     backend.bind(backend_url)
    81     backend.bind(backend_url)
    87     collector("broker", frontend, backend)
    82     collector("broker", frontend, backend, timeout)
    88 
    83 
    89 @checkzmqerror
    84 @checkzmqerror
    90 def proxy_collector(frontend_url, backend_url):
    85 def proxy_collector(frontend_url, backend_url, timeout):
    91     frontend = context.socket(zmq.XREQ)
    86     frontend = context.socket(zmq.XREQ)
    92     frontend.setsockopt(zmq.IDENTITY, backend_url)
    87     frontend.setsockopt(zmq.IDENTITY, backend_url)
    93     backend = context.socket(zmq.XREP)
    88     backend = context.socket(zmq.XREP)
    94     info("Connecting proxy frontend to %s", frontend_url)
    89     info("Connecting proxy frontend to %s", frontend_url)
    95     frontend.connect(frontend_url)
    90     frontend.connect(frontend_url)
    96     info("Binding proxy backend to %s", backend_url)
    91     info("Binding proxy backend to %s", backend_url)
    97     # Sending presence to frontend.
    92     # Sending presence to frontend.
    98     backend.bind(backend_url)
    93     backend.bind(backend_url)
    99     frontend.send_multipart(["", READY])
    94     frontend.send_multipart(["", READY])
   100     collector("proxy", frontend, backend)
    95     collector("proxy", frontend, backend, timeout)
   101 
    96 
   102 def worker(socket, workload, failure_rate = 0):
    97 def worker(socket, workload, failure_rate = 0):
   103     while True:
    98     while True:
   104         debug("Worker is ready")
    99         debug("Worker is ready")
   105         socket.send_multipart(["",READY])
   100         socket.send_multipart(["",READY])
   106         request = socket.recv_multipart()
   101         request = socket.recv_multipart()
   107         debug("Worker receive request %r", request)
   102         debug("Worker receive request %r", request)
   108         delim = request.index("")
   103         content = request.index("") + 1
   109         address = request[:delim+1]
   104         address = request[:content]
   110         timeout = request[delim+1]
   105         request = request[content:]
   111         request = request[delim+2:]
       
   112         assert request[0] == "REQUEST"
   106         assert request[0] == "REQUEST"
   113         if failure_rate and random.randrange(failure_rate) == 0:
   107         if failure_rate and random.randrange(failure_rate) == 0:
   114             info("worker failed")
   108             info("worker failed")
   115             return False
   109             return False
   116         time.sleep(workload * (1 + random.random()))
   110         time.sleep(workload * (1 + random.random()))
   123         socket = context.socket(zmq.XREQ)
   117         socket = context.socket(zmq.XREQ)
   124         info("Connecting worker to %s", url)
   118         info("Connecting worker to %s", url)
   125         socket.connect(url)
   119         socket.connect(url)
   126         worker(socket, workload, failure_rate)
   120         worker(socket, workload, failure_rate)
   127 
   121 
   128 def requester(socket, timeout = -1):
   122 def requester(socket):
   129     while True:
   123     while True:
   130         i = str(counter.next())
   124         i = str(counter.next())
   131         info("Requester send request %s", i)
   125         info("Requester send request %s", i)
   132         socket.send_multipart(["", str(timeout), "REQUEST", i])
   126         socket.send_multipart(["", "REQUEST", i])
   133         results = 0
   127         results = 0
   134         while True:
   128         while True:
   135             reply = socket.recv_multipart()
   129             reply = socket.recv_multipart()
   136             debug("requester received reply %r", reply)
   130             debug("requester received reply %r", reply)
   137             if reply == ["",READY]:
   131             if reply == ["",READY]:
   138                 break
   132                 break
   139             assert reply[1] == i
   133             assert reply[1] == i
   140             results += 1
   134             results += 1
   141         info("requester received %d results", results)
   135         info("requester received %d results", results)
   142         # time.sleep(1)
       
   143 
   136 
   144 @checkzmqerror
   137 @checkzmqerror
   145 def connect_requester(url, timeout):
   138 def connect_requester(url):
   146     socket = context.socket(zmq.XREQ)
   139     socket = context.socket(zmq.XREQ)
   147     info("Connecting requester to %s", url)
   140     info("Connecting requester to %s", url)
   148     socket.connect(url)
   141     socket.connect(url)
   149     requester(socket, timeout)
   142     requester(socket)
   150 
   143 
   151 if __name__ == "__main__":
   144 if __name__ == "__main__":
   152     logging.getLogger().setLevel(logging.INFO)
   145     logging.getLogger().setLevel(logging.INFO)
   153     feurl = "inproc://frontend"
   146     feurl = "inproc://frontend"
   154     beurl = "inproc://backend"
   147     beurl = "inproc://backend"
       
   148     workload = 2.5
       
   149     broker_timeout = 5000
       
   150     proxy_timeout = 5000
   155     brokers = []
   151     brokers = []
   156     broker = threading.Thread(target = broker_collector, args = (feurl, beurl))
   152     broker = threading.Thread(target = broker_collector, args = (feurl, beurl, broker_timeout))
   157     broker.start()
   153     broker.start()
   158     brokers.append(broker)
   154     brokers.append(broker)
   159     time.sleep(2)
   155     time.sleep(2)
   160     senders = []
   156     senders = []
   161     for sender in xrange(10):
   157     for sender in xrange(10):
   162         sender = threading.Thread(target = connect_requester, args = (feurl,5000))
   158         sender = threading.Thread(target = connect_requester, args = (feurl,))
   163         sender.start()
   159         sender.start()
   164         senders.append(sender)
   160         senders.append(sender)
   165     proxies = []
   161     proxies = []
   166     proxy_urls = []
   162     proxy_urls = []
   167     for proxy in xrange(5):
   163     for proxy in xrange(5):
   168         url = "inproc://proxy_be#%d" % (proxy,)
   164         url = "inproc://proxy_be#%d" % (proxy,)
   169         proxy = threading.Thread(target = proxy_collector, args = (beurl, url))
   165         proxy = threading.Thread(target = proxy_collector, args = (beurl, url, proxy_timeout))
   170         proxy.start()
   166         proxy.start()
   171         proxies.append(proxy)
   167         proxies.append(proxy)
   172         proxy_urls.append(url)
   168         proxy_urls.append(url)
   173     time.sleep(2)
   169     time.sleep(2)
   174     workers = []
   170     workers = []
   175     for url in proxy_urls:
   171     for url in proxy_urls:
   176         for work in xrange(5):
   172         for work in xrange(5):
   177             work = threading.Thread(target = connect_worker, args = (url, 1, 4800))
   173             work = threading.Thread(target = connect_worker, args = (url, 3, 10))
   178             work.start()
   174             work.start()
   179             workers.append(work)
   175             workers.append(work)
   180     time.sleep(20)
   176     time.sleep(20)
   181     info("Joining thread")
   177     info("Joining thread")
   182     context.term()
   178     context.term()