collector.py
author fabien@tzone.org
Sun, 08 May 2011 19:40:30 -0400
changeset 0 57d81f2bf26f
child 1 48d514cc3309
permissions -rw-r--r--
Add collector.
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
0
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
     1
import random
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
     2
import time
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
     3
import threading
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
     4
import zmq
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
     5
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
     6
stop = False
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
     7
READY = "READY"
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
     8
context = zmq.Context()
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
     9
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
    10
def collector(frontend, backend):
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
    11
  poller = zmq.Poller()
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
    12
  poller.register(backend, zmq.POLLIN)
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
    13
  backends = set()
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
    14
  while not stop:
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
    15
    frontend.send_multipart(["",READY])
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
    16
    poller.register(frontend, zmq.POLLIN)
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
    17
    for socket, event in poller.poll(100):
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
    18
      request = socket.recv_multipart()
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
    19
      if socket is backend:
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
    20
        if request[2] == READY:
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
    21
          backends.add(request[0])
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
    22
      else:
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
    23
        timeout = int(request[request.index("")+1])
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
    24
        recipients = backends
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
    25
        backends = set()
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
    26
        for dest in recipients:
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
    27
          backend.send_multipart([dest] + request)
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
    28
        poller.unregister(frontend)
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
    29
        try:
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
    30
          while not stop and recipients:
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
    31
            for socket, event in poller.poll(timeout):
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
    32
              reply = socket.recv_multipart()
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
    33
              if reply[2] == READY:
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
    34
                backends.add(reply[0])
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
    35
                recipients.discard(reply[0])
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
    36
              else:
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
    37
                frontend.send_multipart(reply[2:])
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
    38
        except ZMQError:
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
    39
          pass
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
    40
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
    41
def broker_collector(frontend_url, backend_url):
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
    42
  frontend = context.socket(zmq.XREP)
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
    43
  backend = context.socket(zmq.XREP)
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
    44
  frontend.bind(frontend_url)
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
    45
  backend.bind(backend_url)
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
    46
  collector(frontend, backend)
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
    47
  
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
    48
def proxy_collector(frontend_url, backend_url):
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
    49
  frontend = context.socket(zmq.XREP)
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
    50
  backend = context.socket(zmq.XREP)
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
    51
  frontend.connect(frontend_url)
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
    52
  backend.bind(backend_url)
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
    53
  collector(frontend, backend)
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
    54
  
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
    55
def worker(socket, workload, failure_rate = 0):
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
    56
  while not stop:
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
    57
    socket.send_multipart(["",READY])
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
    58
    request = socket.recv_multipart()
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
    59
    delim = request.index("")
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
    60
    timeout = request[delim+1]
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
    61
    request = request[delim+2:]
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
    62
    assert request[0] == "REQUEST"
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
    63
    if failure_rate and random.randrange(failure_rate) == 0:
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
    64
      return
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
    65
    time.sleep(workload)
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
    66
    socket.send_multipart(request[:delim+1] + ["DONE"])
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
    67
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
    68
def connect_worker(url, workload, failure_rate = 0):
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
    69
  while not stop:
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
    70
    socket = context.socket(zmq.XREQ)
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
    71
    socket.connect(url)
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
    72
    worker(socket, workload, failure_rate)
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
    73
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
    74
def requester(socket, timeout = -1):
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
    75
  while not stop:
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
    76
    socket.send_multipart(["", str(timeout), "REQUEST"])
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
    77
    results = 0
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
    78
    while True:
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
    79
      reply = socket.recv_multipart()
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
    80
      if reply == ["",READY]:
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
    81
        break
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
    82
      results += 1
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
    83
    print results, "results received"
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
    84
  socket.send_multipart(["", "STOP"])
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
    85
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
    86
def connect_requester(url, timeout):
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
    87
  socket = context.socket(zmq.XREQ)
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
    88
  socket.connect(url)
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
    89
  requester(socket, timeout)
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
    90
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
    91
if __name__ == "__main__":
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
    92
  feurl = "inproc://frontend"
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
    93
  beurl = "inproc://backend"
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
    94
  brokers = []
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
    95
  broker = threading.Thread(target = broker_collector, args = (feurl, beurl))
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
    96
  broker.start()
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
    97
  brokers.append(broker)
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
    98
  time.sleep(1)
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
    99
  senders = []
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
   100
  for sender in xrange(2):
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
   101
    sender = threading.Thread(target = connect_requester, args = (feurl,1000))
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
   102
    sender.start()
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
   103
    senders.append(sender)
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
   104
  proxies = []
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
   105
  proxy_urls = []
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
   106
  for proxy in xrange(2):
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
   107
    url = "inproc://proxy_be#%d" % (proxy,)
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
   108
    proxy = threading.Thread(target = proxy_collector, args = (beurl, url))
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
   109
    proxy.start()
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
   110
    proxies.append(proxy)
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
   111
    proxy_urls.append(url)
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
   112
  time.sleep(1)
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
   113
  workers = []
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
   114
  for url in proxy_urls:
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
   115
    for work in xrange(5):
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
   116
      work = threading.Thread(target = connect_worker, args = (url, 0.1, 100))
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
   117
      work.start()
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
   118
      workers.append(work)
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
   119
  time.sleep(10)
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
   120
  stop = True
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
   121
  for thread in senders + brokers + proxies + workers:
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
   122
    thread.join()
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
   123