collector.py
changeset 2 2744eb2a589b
parent 1 48d514cc3309
child 3 197572da88ea
equal deleted inserted replaced
1:48d514cc3309 2:2744eb2a589b
     8 
     8 
     9 READY = "READY"
     9 READY = "READY"
    10 context = zmq.Context()
    10 context = zmq.Context()
    11 counter = count()
    11 counter = count()
    12 
    12 
       
    13 def checkzmqerror(func):
       
    14     def wrapper(*args, **kwargs):
       
    15         try:
       
    16             func(*args, **kwargs)
       
    17         except zmq.ZMQError, err:
       
    18             info("%r(*%r, **%r) is terminating", func, args, kwargs)
       
    19     return wrapper
       
    20 
    13 def collector(name, frontend, backend):
    21 def collector(name, frontend, backend):
    14     poller = zmq.Poller()
       
    15     poller.register(backend, zmq.POLLIN)
       
    16     backends = set()
    22     backends = set()
    17     info("collector %s is ready with %r backends", name, len(backends))
    23     info("collector %s is ready with %r backends", name, len(backends))
       
    24     dropped = 0
    18     while True:
    25     while True:
       
    26         poller = zmq.Poller()
       
    27         poller.register(backend, zmq.POLLIN)
    19         poller.register(frontend, zmq.POLLIN)
    28         poller.register(frontend, zmq.POLLIN)
    20         for socket, event in poller.poll(100):
    29         for socket, event in poller.poll(100):
    21             request = socket.recv_multipart()
    30             request = socket.recv_multipart()
    22             debug("collector %s received request %r", name, request)
    31             debug("collector %s received request %r", name, request)
    23             if socket is backend:
    32             if socket is backend:
    28                     debug("collector %s discard reply %r", name, request) 
    37                     debug("collector %s discard reply %r", name, request) 
    29             else:
    38             else:
    30                 delim = request.index("")
    39                 delim = request.index("")
    31                 address_stack = request[:delim+1]
    40                 address_stack = request[:delim+1]
    32                 timeout = request[delim+1]
    41                 timeout = request[delim+1]
    33                 if timeout != "STOP":
    42                 debug("collector %s has new work to do in %s ms", name, timeout)
    34                     debug("collector %s has new work to do in %s ms", name, timeout)
       
    35                 recipients = backends
    43                 recipients = backends
    36                 backends = set()
    44                 backends = set()
    37                 debug("collector %s send requests to %r", name, recipients)
    45                 debug("collector %s send requests to %r", name, recipients)
    38                 for dest in recipients:
    46                 for dest in recipients:
    39                     backend.send_multipart([dest] + request[delim:])
    47                     backend.send_multipart([dest] + request[delim:])
    40                 if timeout == "STOP":
       
    41                     info("collector %s is terminating", name)
       
    42                     return
       
    43                 timeout = int(timeout)
    48                 timeout = int(timeout)
    44                 poller.unregister(frontend)
    49                 poller = zmq.Poller()
       
    50                 poller.register(backend, zmq.POLLIN)
    45                 while recipients:
    51                 while recipients:
       
    52                     start = time.time()
       
    53                     debug("%r: collector %s wait %r on %r", start, name, timeout, recipients)
    46                     events = poller.poll(timeout)
    54                     events = poller.poll(timeout)
    47                     if not events:
    55                     if not events:
    48                         info("collector %s has a timeout with %r", name, recipients)
    56                         end = time.time()
    49                         break
    57                         if (end-start)*1000 < timeout:
       
    58                             info("no event but timeout: %r", events)
       
    59                         else:
       
    60                           dropped += 1
       
    61                           debug("%r: collector %s has a %d timeout with %r (%r)", end, name, dropped, recipients, timeout)
       
    62                           break
    50                     for socket, event in events:
    63                     for socket, event in events:
    51                         reply = socket.recv_multipart()
    64                         reply = socket.recv_multipart()
    52                         if reply[2] == READY:
    65                         if reply[2] == READY:
    53                             debug("%r is ready on %s", reply[0], name)
    66                             debug("%r is ready on %s", reply[0], name)
    54                             backends.add(reply[0])
    67                             backends.add(reply[0])
    60                             debug("collector %s discard reply %r", name, reply)
    73                             debug("collector %s discard reply %r", name, reply)
    61                 frontend.send_multipart(address_stack + [READY])
    74                 frontend.send_multipart(address_stack + [READY])
    62                 info("collector %s is ready with %r backends", name, len(backends))
    75                 info("collector %s is ready with %r backends", name, len(backends))
    63 
    76 
    64 
    77 
       
    78 @checkzmqerror
    65 def broker_collector(frontend_url, backend_url):
    79 def broker_collector(frontend_url, backend_url):
    66     frontend = context.socket(zmq.XREP)
    80     frontend = context.socket(zmq.XREP)
    67     frontend.setsockopt(zmq.IDENTITY, "broker")
    81     frontend.setsockopt(zmq.IDENTITY, "broker")
    68     backend = context.socket(zmq.XREP)
    82     backend = context.socket(zmq.XREP)
    69     info("Binding broker frontend to %s", frontend_url)
    83     info("Binding broker frontend to %s", frontend_url)
    70     frontend.bind(frontend_url)
    84     frontend.bind(frontend_url)
    71     info("Binding broker backend to %s", backend_url)
    85     info("Binding broker backend to %s", backend_url)
    72     backend.bind(backend_url)
    86     backend.bind(backend_url)
    73     collector("broker", frontend, backend)
    87     collector("broker", frontend, backend)
    74 
    88 
       
    89 @checkzmqerror
    75 def proxy_collector(frontend_url, backend_url):
    90 def proxy_collector(frontend_url, backend_url):
    76     frontend = context.socket(zmq.XREQ)
    91     frontend = context.socket(zmq.XREQ)
    77     frontend.setsockopt(zmq.IDENTITY, "proxy")
    92     frontend.setsockopt(zmq.IDENTITY, "proxy")
    78     backend = context.socket(zmq.XREP)
    93     backend = context.socket(zmq.XREP)
    79     info("Connecting proxy frontend to %s", frontend_url)
    94     info("Connecting proxy frontend to %s", frontend_url)
    87 def worker(socket, workload, failure_rate = 0):
   102 def worker(socket, workload, failure_rate = 0):
    88     while True:
   103     while True:
    89         info("Worker is ready")
   104         info("Worker is ready")
    90         socket.send_multipart(["",READY])
   105         socket.send_multipart(["",READY])
    91         request = socket.recv_multipart()
   106         request = socket.recv_multipart()
    92         info("Worker receive request %r", request)
   107         debug("Worker receive request %r", request)
    93         delim = request.index("")
   108         delim = request.index("")
    94         address = request[:delim+1]
   109         address = request[:delim+1]
    95         timeout = request[delim+1]
   110         timeout = request[delim+1]
    96         if timeout == "STOP":
       
    97             info("worker is terminating")
       
    98             return True
       
    99         request = request[delim+2:]
   111         request = request[delim+2:]
   100         assert request[0] == "REQUEST"
   112         assert request[0] == "REQUEST"
   101         if failure_rate and random.randrange(failure_rate) == 0:
   113         if failure_rate and random.randrange(failure_rate) == 0:
   102             info("worker failed")
   114             info("worker failed")
   103             return False
   115             return False
   104         time.sleep(workload)
   116         time.sleep(workload)
   105         info("worker send reply")
   117         debug("worker send reply")
   106         socket.send_multipart(address + [request[1], "DONE"])
   118         socket.send_multipart(address + [request[1], "DONE"])
   107 
   119 
       
   120 @checkzmqerror
   108 def connect_worker(url, workload, failure_rate = 0):
   121 def connect_worker(url, workload, failure_rate = 0):
   109     while True:
   122     while True:
   110         socket = context.socket(zmq.XREQ)
   123         socket = context.socket(zmq.XREQ)
   111         info("Connecting worker to %s", url)
   124         info("Connecting worker to %s", url)
   112         socket.connect(url)
   125         socket.connect(url)
   113         if worker(socket, workload, failure_rate):
   126         worker(socket, workload, failure_rate)
   114             return
       
   115 
       
   116 stop = False
       
   117 
   127 
   118 def requester(socket, timeout = -1):
   128 def requester(socket, timeout = -1):
   119     while not stop:
   129     while True:
   120         i = str(counter.next())
   130         i = str(counter.next())
   121         info("Requester send request %s", i)
   131         info("Requester send request %s", i)
   122         socket.send_multipart(["", str(timeout), "REQUEST", i])
   132         socket.send_multipart(["", str(timeout), "REQUEST", i])
   123         results = 0
   133         results = 0
   124         while True:
   134         while True:
   128                 break
   138                 break
   129             assert reply[1] == i
   139             assert reply[1] == i
   130             results += 1
   140             results += 1
   131         info("requester received %d results", results)
   141         info("requester received %d results", results)
   132         # time.sleep(1)
   142         # time.sleep(1)
   133     info("requester is terminating")
       
   134     socket.send_multipart(["", "STOP"])
       
   135 
   143 
       
   144 @checkzmqerror
   136 def connect_requester(url, timeout):
   145 def connect_requester(url, timeout):
   137     socket = context.socket(zmq.XREQ)
   146     socket = context.socket(zmq.XREQ)
   138     info("Connecting requester to %s", url)
   147     info("Connecting requester to %s", url)
   139     socket.connect(url)
   148     socket.connect(url)
   140     requester(socket, timeout)
   149     requester(socket, timeout)
   167         for work in xrange(1):
   176         for work in xrange(1):
   168             work = threading.Thread(target = connect_worker, args = (url, 1, 0))
   177             work = threading.Thread(target = connect_worker, args = (url, 1, 0))
   169             work.start()
   178             work.start()
   170             workers.append(work)
   179             workers.append(work)
   171     time.sleep(10)
   180     time.sleep(10)
   172     stop = True
       
   173     info("Joining thread")
   181     info("Joining thread")
       
   182     context.term()
   174     for thread in senders + brokers + proxies + workers:
   183     for thread in senders + brokers + proxies + workers:
   175         thread.join()
   184         thread.join()
   176 
   185