collector.py
author Fabien Ninoles <fabien@tzone.org>
Tue, 10 May 2011 00:21:11 -0400
changeset 6 fdf7da5a5d21
parent 5 55f26e7ee45e
permissions -rw-r--r--
Set timeout by collector instead of by message. More aggressive in error simulation.
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
0
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
     1
import random
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
     2
import time
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
     3
import threading
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
     4
import zmq
1
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
     5
import logging
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
     6
from logging import debug, info
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
     7
from itertools import count
0
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
     8
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
     9
READY = "READY"
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
    10
context = zmq.Context()
1
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    11
counter = count()
0
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
    12
2
2744eb2a589b Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents: 1
diff changeset
    13
def checkzmqerror(func):
2744eb2a589b Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents: 1
diff changeset
    14
    def wrapper(*args, **kwargs):
2744eb2a589b Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents: 1
diff changeset
    15
        try:
2744eb2a589b Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents: 1
diff changeset
    16
            func(*args, **kwargs)
2744eb2a589b Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents: 1
diff changeset
    17
        except zmq.ZMQError, err:
4
4b5a51cb5fc7 Add failure and near timeout workload.
Fabien Ninoles <fabien@tzone.org>
parents: 3
diff changeset
    18
            info("%r(*%r, **%r) is terminating with error %s", func, args, kwargs, err)
2
2744eb2a589b Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents: 1
diff changeset
    19
    return wrapper
2744eb2a589b Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents: 1
diff changeset
    20
6
fdf7da5a5d21 Set timeout by collector instead of by message.
Fabien Ninoles <fabien@tzone.org>
parents: 5
diff changeset
    21
def collector(name, frontend, backend, timeout):
1
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    22
    backends = set()
4
4b5a51cb5fc7 Add failure and near timeout workload.
Fabien Ninoles <fabien@tzone.org>
parents: 3
diff changeset
    23
    debug("collector %s is ready with %r backends", name, len(backends))
2
2744eb2a589b Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents: 1
diff changeset
    24
    dropped = 0
1
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    25
    while True:
2
2744eb2a589b Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents: 1
diff changeset
    26
        poller = zmq.Poller()
2744eb2a589b Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents: 1
diff changeset
    27
        poller.register(backend, zmq.POLLIN)
1
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    28
        poller.register(frontend, zmq.POLLIN)
6
fdf7da5a5d21 Set timeout by collector instead of by message.
Fabien Ninoles <fabien@tzone.org>
parents: 5
diff changeset
    29
        for socket, event in poller.poll():
1
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    30
            request = socket.recv_multipart()
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    31
            debug("collector %s received request %r", name, request)
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    32
            if socket is backend:
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    33
                if request[2] == READY:
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    34
                    debug("collector %s has new backend: %r", name, request[0])
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    35
                    backends.add(request[0])
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    36
                else:
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    37
                    debug("collector %s discard reply %r", name, request) 
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    38
            else:
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    39
                delim = request.index("")
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    40
                address_stack = request[:delim+1]
6
fdf7da5a5d21 Set timeout by collector instead of by message.
Fabien Ninoles <fabien@tzone.org>
parents: 5
diff changeset
    41
                debug("collector %s has new work to do", name)
1
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    42
                recipients = backends
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    43
                backends = set()
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    44
                debug("collector %s send requests to %r", name, recipients)
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    45
                for dest in recipients:
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    46
                    backend.send_multipart([dest] + request[delim:])
2
2744eb2a589b Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents: 1
diff changeset
    47
                poller = zmq.Poller()
2744eb2a589b Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents: 1
diff changeset
    48
                poller.register(backend, zmq.POLLIN)
6
fdf7da5a5d21 Set timeout by collector instead of by message.
Fabien Ninoles <fabien@tzone.org>
parents: 5
diff changeset
    49
                start = time.time()
fdf7da5a5d21 Set timeout by collector instead of by message.
Fabien Ninoles <fabien@tzone.org>
parents: 5
diff changeset
    50
                deadline = start + timeout / 1000.0
1
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    51
                while recipients:
6
fdf7da5a5d21 Set timeout by collector instead of by message.
Fabien Ninoles <fabien@tzone.org>
parents: 5
diff changeset
    52
                    debug("%r: collector %s wait on on %r", start, name, recipients)
fdf7da5a5d21 Set timeout by collector instead of by message.
Fabien Ninoles <fabien@tzone.org>
parents: 5
diff changeset
    53
                    events = poller.poll(max(0,deadline-time.time()))
1
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    54
                    for socket, event in events:
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    55
                        reply = socket.recv_multipart()
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    56
                        if reply[2] == READY:
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    57
                            debug("%r is ready on %s", reply[0], name)
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    58
                            backends.add(reply[0])
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    59
                            recipients.discard(reply[0])
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    60
                        elif reply[0] in recipients:
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    61
                            debug("collector %s forward reply", name)
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    62
                            frontend.send_multipart(address_stack + reply[2:])
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    63
                        else:
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    64
                            debug("collector %s discard reply %r", name, reply)
6
fdf7da5a5d21 Set timeout by collector instead of by message.
Fabien Ninoles <fabien@tzone.org>
parents: 5
diff changeset
    65
                    end = time.time()
fdf7da5a5d21 Set timeout by collector instead of by message.
Fabien Ninoles <fabien@tzone.org>
parents: 5
diff changeset
    66
                    if recipients and end > deadline:
fdf7da5a5d21 Set timeout by collector instead of by message.
Fabien Ninoles <fabien@tzone.org>
parents: 5
diff changeset
    67
                        info("%r: collector %s has timeout with %d recipients", end, name, len(recipients))
fdf7da5a5d21 Set timeout by collector instead of by message.
Fabien Ninoles <fabien@tzone.org>
parents: 5
diff changeset
    68
                        break
1
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    69
                frontend.send_multipart(address_stack + [READY])
4
4b5a51cb5fc7 Add failure and near timeout workload.
Fabien Ninoles <fabien@tzone.org>
parents: 3
diff changeset
    70
                debug("collector %s is ready with %r backends", name, len(backends))
1
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    71
0
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
    72
2
2744eb2a589b Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents: 1
diff changeset
    73
@checkzmqerror
6
fdf7da5a5d21 Set timeout by collector instead of by message.
Fabien Ninoles <fabien@tzone.org>
parents: 5
diff changeset
    74
def broker_collector(frontend_url, backend_url, timeout):
1
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    75
    frontend = context.socket(zmq.XREP)
3
197572da88ea Fix duplicate identity.
Fabien Ninoles <fabien@tzone.org>
parents: 2
diff changeset
    76
    frontend.setsockopt(zmq.IDENTITY, backend_url)
1
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    77
    backend = context.socket(zmq.XREP)
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    78
    info("Binding broker frontend to %s", frontend_url)
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    79
    frontend.bind(frontend_url)
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    80
    info("Binding broker backend to %s", backend_url)
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    81
    backend.bind(backend_url)
6
fdf7da5a5d21 Set timeout by collector instead of by message.
Fabien Ninoles <fabien@tzone.org>
parents: 5
diff changeset
    82
    collector("broker", frontend, backend, timeout)
1
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    83
2
2744eb2a589b Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents: 1
diff changeset
    84
@checkzmqerror
6
fdf7da5a5d21 Set timeout by collector instead of by message.
Fabien Ninoles <fabien@tzone.org>
parents: 5
diff changeset
    85
def proxy_collector(frontend_url, backend_url, timeout):
1
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    86
    frontend = context.socket(zmq.XREQ)
3
197572da88ea Fix duplicate identity.
Fabien Ninoles <fabien@tzone.org>
parents: 2
diff changeset
    87
    frontend.setsockopt(zmq.IDENTITY, backend_url)
1
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    88
    backend = context.socket(zmq.XREP)
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    89
    info("Connecting proxy frontend to %s", frontend_url)
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    90
    frontend.connect(frontend_url)
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    91
    info("Binding proxy backend to %s", backend_url)
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    92
    # Sending presence to frontend.
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    93
    backend.bind(backend_url)
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    94
    frontend.send_multipart(["", READY])
6
fdf7da5a5d21 Set timeout by collector instead of by message.
Fabien Ninoles <fabien@tzone.org>
parents: 5
diff changeset
    95
    collector("proxy", frontend, backend, timeout)
1
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    96
0
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
    97
def worker(socket, workload, failure_rate = 0):
1
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    98
    while True:
4
4b5a51cb5fc7 Add failure and near timeout workload.
Fabien Ninoles <fabien@tzone.org>
parents: 3
diff changeset
    99
        debug("Worker is ready")
1
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   100
        socket.send_multipart(["",READY])
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   101
        request = socket.recv_multipart()
2
2744eb2a589b Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents: 1
diff changeset
   102
        debug("Worker receive request %r", request)
6
fdf7da5a5d21 Set timeout by collector instead of by message.
Fabien Ninoles <fabien@tzone.org>
parents: 5
diff changeset
   103
        content = request.index("") + 1
fdf7da5a5d21 Set timeout by collector instead of by message.
Fabien Ninoles <fabien@tzone.org>
parents: 5
diff changeset
   104
        address = request[:content]
fdf7da5a5d21 Set timeout by collector instead of by message.
Fabien Ninoles <fabien@tzone.org>
parents: 5
diff changeset
   105
        request = request[content:]
1
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   106
        assert request[0] == "REQUEST"
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   107
        if failure_rate and random.randrange(failure_rate) == 0:
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   108
            info("worker failed")
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   109
            return False
5
55f26e7ee45e Add randomness to workload.
Fabien Ninoles <fabien@tzone.org>
parents: 4
diff changeset
   110
        time.sleep(workload * (1 + random.random()))
2
2744eb2a589b Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents: 1
diff changeset
   111
        debug("worker send reply")
1
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   112
        socket.send_multipart(address + [request[1], "DONE"])
0
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
   113
2
2744eb2a589b Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents: 1
diff changeset
   114
@checkzmqerror
0
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
   115
def connect_worker(url, workload, failure_rate = 0):
1
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   116
    while True:
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   117
        socket = context.socket(zmq.XREQ)
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   118
        info("Connecting worker to %s", url)
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   119
        socket.connect(url)
2
2744eb2a589b Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents: 1
diff changeset
   120
        worker(socket, workload, failure_rate)
0
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
   121
6
fdf7da5a5d21 Set timeout by collector instead of by message.
Fabien Ninoles <fabien@tzone.org>
parents: 5
diff changeset
   122
def requester(socket):
2
2744eb2a589b Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents: 1
diff changeset
   123
    while True:
1
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   124
        i = str(counter.next())
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   125
        info("Requester send request %s", i)
6
fdf7da5a5d21 Set timeout by collector instead of by message.
Fabien Ninoles <fabien@tzone.org>
parents: 5
diff changeset
   126
        socket.send_multipart(["", "REQUEST", i])
1
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   127
        results = 0
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   128
        while True:
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   129
            reply = socket.recv_multipart()
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   130
            debug("requester received reply %r", reply)
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   131
            if reply == ["",READY]:
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   132
                break
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   133
            assert reply[1] == i
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   134
            results += 1
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   135
        info("requester received %d results", results)
0
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
   136
2
2744eb2a589b Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents: 1
diff changeset
   137
@checkzmqerror
6
fdf7da5a5d21 Set timeout by collector instead of by message.
Fabien Ninoles <fabien@tzone.org>
parents: 5
diff changeset
   138
def connect_requester(url):
1
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   139
    socket = context.socket(zmq.XREQ)
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   140
    info("Connecting requester to %s", url)
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   141
    socket.connect(url)
6
fdf7da5a5d21 Set timeout by collector instead of by message.
Fabien Ninoles <fabien@tzone.org>
parents: 5
diff changeset
   142
    requester(socket)
0
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
   143
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
   144
if __name__ == "__main__":
1
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   145
    logging.getLogger().setLevel(logging.INFO)
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   146
    feurl = "inproc://frontend"
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   147
    beurl = "inproc://backend"
6
fdf7da5a5d21 Set timeout by collector instead of by message.
Fabien Ninoles <fabien@tzone.org>
parents: 5
diff changeset
   148
    workload = 2.5
fdf7da5a5d21 Set timeout by collector instead of by message.
Fabien Ninoles <fabien@tzone.org>
parents: 5
diff changeset
   149
    broker_timeout = 5000
fdf7da5a5d21 Set timeout by collector instead of by message.
Fabien Ninoles <fabien@tzone.org>
parents: 5
diff changeset
   150
    proxy_timeout = 5000
1
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   151
    brokers = []
6
fdf7da5a5d21 Set timeout by collector instead of by message.
Fabien Ninoles <fabien@tzone.org>
parents: 5
diff changeset
   152
    broker = threading.Thread(target = broker_collector, args = (feurl, beurl, broker_timeout))
1
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   153
    broker.start()
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   154
    brokers.append(broker)
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   155
    time.sleep(2)
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   156
    senders = []
3
197572da88ea Fix duplicate identity.
Fabien Ninoles <fabien@tzone.org>
parents: 2
diff changeset
   157
    for sender in xrange(10):
6
fdf7da5a5d21 Set timeout by collector instead of by message.
Fabien Ninoles <fabien@tzone.org>
parents: 5
diff changeset
   158
        sender = threading.Thread(target = connect_requester, args = (feurl,))
1
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   159
        sender.start()
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   160
        senders.append(sender)
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   161
    proxies = []
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   162
    proxy_urls = []
3
197572da88ea Fix duplicate identity.
Fabien Ninoles <fabien@tzone.org>
parents: 2
diff changeset
   163
    for proxy in xrange(5):
1
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   164
        url = "inproc://proxy_be#%d" % (proxy,)
6
fdf7da5a5d21 Set timeout by collector instead of by message.
Fabien Ninoles <fabien@tzone.org>
parents: 5
diff changeset
   165
        proxy = threading.Thread(target = proxy_collector, args = (beurl, url, proxy_timeout))
1
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   166
        proxy.start()
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   167
        proxies.append(proxy)
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   168
        proxy_urls.append(url)
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   169
    time.sleep(2)
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   170
    workers = []
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   171
    for url in proxy_urls:
3
197572da88ea Fix duplicate identity.
Fabien Ninoles <fabien@tzone.org>
parents: 2
diff changeset
   172
        for work in xrange(5):
6
fdf7da5a5d21 Set timeout by collector instead of by message.
Fabien Ninoles <fabien@tzone.org>
parents: 5
diff changeset
   173
            work = threading.Thread(target = connect_worker, args = (url, 3, 10))
1
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   174
            work.start()
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   175
            workers.append(work)
3
197572da88ea Fix duplicate identity.
Fabien Ninoles <fabien@tzone.org>
parents: 2
diff changeset
   176
    time.sleep(20)
1
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   177
    info("Joining thread")
2
2744eb2a589b Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents: 1
diff changeset
   178
    context.term()
1
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   179
    for thread in senders + brokers + proxies + workers:
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   180
        thread.join()
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   181