collector.py
author Fabien Ninoles <fabien@tzone.org>
Sun, 08 May 2011 22:58:56 -0400
changeset 4 4b5a51cb5fc7
parent 3 197572da88ea
child 5 55f26e7ee45e
permissions -rw-r--r--
Add failure and near timeout workload.
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
0
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
     1
import random
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
     2
import time
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
     3
import threading
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
     4
import zmq
1
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
     5
import logging
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
     6
from logging import debug, info
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
     7
from itertools import count
0
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
     8
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
     9
READY = "READY"
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
    10
context = zmq.Context()
1
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    11
counter = count()
0
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
    12
2
2744eb2a589b Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents: 1
diff changeset
    13
def checkzmqerror(func):
2744eb2a589b Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents: 1
diff changeset
    14
    def wrapper(*args, **kwargs):
2744eb2a589b Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents: 1
diff changeset
    15
        try:
2744eb2a589b Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents: 1
diff changeset
    16
            func(*args, **kwargs)
2744eb2a589b Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents: 1
diff changeset
    17
        except zmq.ZMQError, err:
4
4b5a51cb5fc7 Add failure and near timeout workload.
Fabien Ninoles <fabien@tzone.org>
parents: 3
diff changeset
    18
            info("%r(*%r, **%r) is terminating with error %s", func, args, kwargs, err)
2
2744eb2a589b Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents: 1
diff changeset
    19
    return wrapper
2744eb2a589b Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents: 1
diff changeset
    20
1
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    21
def collector(name, frontend, backend):
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    22
    backends = set()
4
4b5a51cb5fc7 Add failure and near timeout workload.
Fabien Ninoles <fabien@tzone.org>
parents: 3
diff changeset
    23
    debug("collector %s is ready with %r backends", name, len(backends))
2
2744eb2a589b Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents: 1
diff changeset
    24
    dropped = 0
1
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    25
    while True:
2
2744eb2a589b Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents: 1
diff changeset
    26
        poller = zmq.Poller()
2744eb2a589b Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents: 1
diff changeset
    27
        poller.register(backend, zmq.POLLIN)
1
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    28
        poller.register(frontend, zmq.POLLIN)
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    29
        for socket, event in poller.poll(100):
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    30
            request = socket.recv_multipart()
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    31
            debug("collector %s received request %r", name, request)
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    32
            if socket is backend:
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    33
                if request[2] == READY:
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    34
                    debug("collector %s has new backend: %r", name, request[0])
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    35
                    backends.add(request[0])
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    36
                else:
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    37
                    debug("collector %s discard reply %r", name, request) 
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    38
            else:
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    39
                delim = request.index("")
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    40
                address_stack = request[:delim+1]
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    41
                timeout = request[delim+1]
2
2744eb2a589b Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents: 1
diff changeset
    42
                debug("collector %s has new work to do in %s ms", name, timeout)
1
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    43
                recipients = backends
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    44
                backends = set()
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    45
                debug("collector %s send requests to %r", name, recipients)
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    46
                for dest in recipients:
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    47
                    backend.send_multipart([dest] + request[delim:])
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    48
                timeout = int(timeout)
2
2744eb2a589b Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents: 1
diff changeset
    49
                poller = zmq.Poller()
2744eb2a589b Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents: 1
diff changeset
    50
                poller.register(backend, zmq.POLLIN)
1
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    51
                while recipients:
2
2744eb2a589b Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents: 1
diff changeset
    52
                    start = time.time()
2744eb2a589b Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents: 1
diff changeset
    53
                    debug("%r: collector %s wait %r on %r", start, name, timeout, recipients)
1
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    54
                    events = poller.poll(timeout)
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    55
                    if not events:
2
2744eb2a589b Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents: 1
diff changeset
    56
                        end = time.time()
2744eb2a589b Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents: 1
diff changeset
    57
                        if (end-start)*1000 < timeout:
2744eb2a589b Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents: 1
diff changeset
    58
                            info("no event but timeout: %r", events)
2744eb2a589b Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents: 1
diff changeset
    59
                        else:
2744eb2a589b Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents: 1
diff changeset
    60
                          dropped += 1
2744eb2a589b Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents: 1
diff changeset
    61
                          debug("%r: collector %s has a %d timeout with %r (%r)", end, name, dropped, recipients, timeout)
2744eb2a589b Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents: 1
diff changeset
    62
                          break
1
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    63
                    for socket, event in events:
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    64
                        reply = socket.recv_multipart()
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    65
                        if reply[2] == READY:
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    66
                            debug("%r is ready on %s", reply[0], name)
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    67
                            backends.add(reply[0])
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    68
                            recipients.discard(reply[0])
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    69
                        elif reply[0] in recipients:
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    70
                            debug("collector %s forward reply", name)
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    71
                            frontend.send_multipart(address_stack + reply[2:])
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    72
                        else:
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    73
                            debug("collector %s discard reply %r", name, reply)
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    74
                frontend.send_multipart(address_stack + [READY])
4
4b5a51cb5fc7 Add failure and near timeout workload.
Fabien Ninoles <fabien@tzone.org>
parents: 3
diff changeset
    75
                debug("collector %s is ready with %r backends", name, len(backends))
1
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    76
0
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
    77
2
2744eb2a589b Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents: 1
diff changeset
    78
@checkzmqerror
0
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
    79
def broker_collector(frontend_url, backend_url):
1
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    80
    frontend = context.socket(zmq.XREP)
3
197572da88ea Fix duplicate identity.
Fabien Ninoles <fabien@tzone.org>
parents: 2
diff changeset
    81
    frontend.setsockopt(zmq.IDENTITY, backend_url)
1
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    82
    backend = context.socket(zmq.XREP)
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    83
    info("Binding broker frontend to %s", frontend_url)
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    84
    frontend.bind(frontend_url)
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    85
    info("Binding broker backend to %s", backend_url)
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    86
    backend.bind(backend_url)
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    87
    collector("broker", frontend, backend)
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    88
2
2744eb2a589b Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents: 1
diff changeset
    89
@checkzmqerror
0
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
    90
def proxy_collector(frontend_url, backend_url):
1
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    91
    frontend = context.socket(zmq.XREQ)
3
197572da88ea Fix duplicate identity.
Fabien Ninoles <fabien@tzone.org>
parents: 2
diff changeset
    92
    frontend.setsockopt(zmq.IDENTITY, backend_url)
1
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    93
    backend = context.socket(zmq.XREP)
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    94
    info("Connecting proxy frontend to %s", frontend_url)
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    95
    frontend.connect(frontend_url)
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    96
    info("Binding proxy backend to %s", backend_url)
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    97
    # Sending presence to frontend.
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    98
    backend.bind(backend_url)
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    99
    frontend.send_multipart(["", READY])
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   100
    collector("proxy", frontend, backend)
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   101
0
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
   102
def worker(socket, workload, failure_rate = 0):
1
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   103
    while True:
4
4b5a51cb5fc7 Add failure and near timeout workload.
Fabien Ninoles <fabien@tzone.org>
parents: 3
diff changeset
   104
        debug("Worker is ready")
1
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   105
        socket.send_multipart(["",READY])
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   106
        request = socket.recv_multipart()
2
2744eb2a589b Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents: 1
diff changeset
   107
        debug("Worker receive request %r", request)
1
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   108
        delim = request.index("")
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   109
        address = request[:delim+1]
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   110
        timeout = request[delim+1]
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   111
        request = request[delim+2:]
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   112
        assert request[0] == "REQUEST"
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   113
        if failure_rate and random.randrange(failure_rate) == 0:
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   114
            info("worker failed")
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   115
            return False
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   116
        time.sleep(workload)
2
2744eb2a589b Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents: 1
diff changeset
   117
        debug("worker send reply")
1
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   118
        socket.send_multipart(address + [request[1], "DONE"])
0
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
   119
2
2744eb2a589b Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents: 1
diff changeset
   120
@checkzmqerror
0
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
   121
def connect_worker(url, workload, failure_rate = 0):
1
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   122
    while True:
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   123
        socket = context.socket(zmq.XREQ)
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   124
        info("Connecting worker to %s", url)
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   125
        socket.connect(url)
2
2744eb2a589b Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents: 1
diff changeset
   126
        worker(socket, workload, failure_rate)
0
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
   127
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
   128
def requester(socket, timeout = -1):
2
2744eb2a589b Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents: 1
diff changeset
   129
    while True:
1
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   130
        i = str(counter.next())
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   131
        info("Requester send request %s", i)
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   132
        socket.send_multipart(["", str(timeout), "REQUEST", i])
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   133
        results = 0
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   134
        while True:
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   135
            reply = socket.recv_multipart()
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   136
            debug("requester received reply %r", reply)
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   137
            if reply == ["",READY]:
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   138
                break
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   139
            assert reply[1] == i
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   140
            results += 1
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   141
        info("requester received %d results", results)
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   142
        # time.sleep(1)
0
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
   143
2
2744eb2a589b Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents: 1
diff changeset
   144
@checkzmqerror
0
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
   145
def connect_requester(url, timeout):
1
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   146
    socket = context.socket(zmq.XREQ)
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   147
    info("Connecting requester to %s", url)
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   148
    socket.connect(url)
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   149
    requester(socket, timeout)
0
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
   150
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
   151
if __name__ == "__main__":
1
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   152
    logging.getLogger().setLevel(logging.INFO)
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   153
    feurl = "inproc://frontend"
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   154
    beurl = "inproc://backend"
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   155
    brokers = []
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   156
    broker = threading.Thread(target = broker_collector, args = (feurl, beurl))
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   157
    broker.start()
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   158
    brokers.append(broker)
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   159
    time.sleep(2)
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   160
    senders = []
3
197572da88ea Fix duplicate identity.
Fabien Ninoles <fabien@tzone.org>
parents: 2
diff changeset
   161
    for sender in xrange(10):
1
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   162
        sender = threading.Thread(target = connect_requester, args = (feurl,5000))
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   163
        sender.start()
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   164
        senders.append(sender)
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   165
    proxies = []
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   166
    proxy_urls = []
3
197572da88ea Fix duplicate identity.
Fabien Ninoles <fabien@tzone.org>
parents: 2
diff changeset
   167
    for proxy in xrange(5):
1
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   168
        url = "inproc://proxy_be#%d" % (proxy,)
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   169
        proxy = threading.Thread(target = proxy_collector, args = (beurl, url))
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   170
        proxy.start()
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   171
        proxies.append(proxy)
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   172
        proxy_urls.append(url)
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   173
    time.sleep(2)
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   174
    workers = []
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   175
    for url in proxy_urls:
3
197572da88ea Fix duplicate identity.
Fabien Ninoles <fabien@tzone.org>
parents: 2
diff changeset
   176
        for work in xrange(5):
4
4b5a51cb5fc7 Add failure and near timeout workload.
Fabien Ninoles <fabien@tzone.org>
parents: 3
diff changeset
   177
            work = threading.Thread(target = connect_worker, args = (url, 1, 4500))
1
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   178
            work.start()
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   179
            workers.append(work)
3
197572da88ea Fix duplicate identity.
Fabien Ninoles <fabien@tzone.org>
parents: 2
diff changeset
   180
    time.sleep(20)
1
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   181
    info("Joining thread")
2
2744eb2a589b Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents: 1
diff changeset
   182
    context.term()
1
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   183
    for thread in senders + brokers + proxies + workers:
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   184
        thread.join()
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   185