collector.py
author Fabien Ninoles <fabien@tzone.org>
Sun, 08 May 2011 22:14:10 -0400
changeset 1 48d514cc3309
parent 0 57d81f2bf26f
child 2 2744eb2a589b
permissions -rw-r--r--
Some rewrite and debugging. Issue: worker is accumulating past job.
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
0
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
     1
import random
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
     2
import time
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
     3
import threading
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
     4
import zmq
1
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
     5
import logging
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
     6
from logging import debug, info
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
     7
from itertools import count
0
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
     8
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
     9
READY = "READY"
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
    10
context = zmq.Context()
1
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    11
counter = count()
0
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
    12
1
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    13
def collector(name, frontend, backend):
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    14
    poller = zmq.Poller()
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    15
    poller.register(backend, zmq.POLLIN)
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    16
    backends = set()
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    17
    info("collector %s is ready with %r backends", name, len(backends))
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    18
    while True:
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    19
        poller.register(frontend, zmq.POLLIN)
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    20
        for socket, event in poller.poll(100):
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    21
            request = socket.recv_multipart()
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    22
            debug("collector %s received request %r", name, request)
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    23
            if socket is backend:
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    24
                if request[2] == READY:
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    25
                    debug("collector %s has new backend: %r", name, request[0])
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    26
                    backends.add(request[0])
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    27
                else:
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    28
                    debug("collector %s discard reply %r", name, request) 
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    29
            else:
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    30
                delim = request.index("")
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    31
                address_stack = request[:delim+1]
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    32
                timeout = request[delim+1]
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    33
                if timeout != "STOP":
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    34
                    debug("collector %s has new work to do in %s ms", name, timeout)
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    35
                recipients = backends
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    36
                backends = set()
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    37
                debug("collector %s send requests to %r", name, recipients)
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    38
                for dest in recipients:
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    39
                    backend.send_multipart([dest] + request[delim:])
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    40
                if timeout == "STOP":
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    41
                    info("collector %s is terminating", name)
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    42
                    return
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    43
                timeout = int(timeout)
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    44
                poller.unregister(frontend)
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    45
                while recipients:
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    46
                    events = poller.poll(timeout)
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    47
                    if not events:
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    48
                        info("collector %s has a timeout with %r", name, recipients)
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    49
                        break
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    50
                    for socket, event in events:
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    51
                        reply = socket.recv_multipart()
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    52
                        if reply[2] == READY:
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    53
                            debug("%r is ready on %s", reply[0], name)
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    54
                            backends.add(reply[0])
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    55
                            recipients.discard(reply[0])
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    56
                        elif reply[0] in recipients:
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    57
                            debug("collector %s forward reply", name)
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    58
                            frontend.send_multipart(address_stack + reply[2:])
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    59
                        else:
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    60
                            debug("collector %s discard reply %r", name, reply)
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    61
                frontend.send_multipart(address_stack + [READY])
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    62
                info("collector %s is ready with %r backends", name, len(backends))
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    63
0
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
    64
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
    65
def broker_collector(frontend_url, backend_url):
1
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    66
    frontend = context.socket(zmq.XREP)
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    67
    frontend.setsockopt(zmq.IDENTITY, "broker")
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    68
    backend = context.socket(zmq.XREP)
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    69
    info("Binding broker frontend to %s", frontend_url)
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    70
    frontend.bind(frontend_url)
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    71
    info("Binding broker backend to %s", backend_url)
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    72
    backend.bind(backend_url)
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    73
    collector("broker", frontend, backend)
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    74
0
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
    75
def proxy_collector(frontend_url, backend_url):
1
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    76
    frontend = context.socket(zmq.XREQ)
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    77
    frontend.setsockopt(zmq.IDENTITY, "proxy")
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    78
    backend = context.socket(zmq.XREP)
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    79
    info("Connecting proxy frontend to %s", frontend_url)
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    80
    frontend.connect(frontend_url)
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    81
    info("Binding proxy backend to %s", backend_url)
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    82
    # Sending presence to frontend.
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    83
    backend.bind(backend_url)
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    84
    frontend.send_multipart(["", READY])
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    85
    collector("proxy", frontend, backend)
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    86
0
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
    87
def worker(socket, workload, failure_rate = 0):
1
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    88
    while True:
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    89
        info("Worker is ready")
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    90
        socket.send_multipart(["",READY])
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    91
        request = socket.recv_multipart()
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    92
        info("Worker receive request %r", request)
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    93
        delim = request.index("")
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    94
        address = request[:delim+1]
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    95
        timeout = request[delim+1]
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    96
        if timeout == "STOP":
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    97
            info("worker is terminating")
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    98
            return True
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
    99
        request = request[delim+2:]
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   100
        assert request[0] == "REQUEST"
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   101
        if failure_rate and random.randrange(failure_rate) == 0:
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   102
            info("worker failed")
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   103
            return False
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   104
        time.sleep(workload)
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   105
        info("worker send reply")
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   106
        socket.send_multipart(address + [request[1], "DONE"])
0
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
   107
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
   108
def connect_worker(url, workload, failure_rate = 0):
1
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   109
    while True:
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   110
        socket = context.socket(zmq.XREQ)
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   111
        info("Connecting worker to %s", url)
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   112
        socket.connect(url)
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   113
        if worker(socket, workload, failure_rate):
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   114
            return
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   115
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   116
stop = False
0
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
   117
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
   118
def requester(socket, timeout = -1):
1
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   119
    while not stop:
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   120
        i = str(counter.next())
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   121
        info("Requester send request %s", i)
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   122
        socket.send_multipart(["", str(timeout), "REQUEST", i])
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   123
        results = 0
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   124
        while True:
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   125
            reply = socket.recv_multipart()
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   126
            debug("requester received reply %r", reply)
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   127
            if reply == ["",READY]:
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   128
                break
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   129
            assert reply[1] == i
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   130
            results += 1
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   131
        info("requester received %d results", results)
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   132
        # time.sleep(1)
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   133
    info("requester is terminating")
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   134
    socket.send_multipart(["", "STOP"])
0
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
   135
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
   136
def connect_requester(url, timeout):
1
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   137
    socket = context.socket(zmq.XREQ)
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   138
    info("Connecting requester to %s", url)
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   139
    socket.connect(url)
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   140
    requester(socket, timeout)
0
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
   141
57d81f2bf26f Add collector.
fabien@tzone.org
parents:
diff changeset
   142
if __name__ == "__main__":
1
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   143
    logging.getLogger().setLevel(logging.INFO)
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   144
    feurl = "inproc://frontend"
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   145
    beurl = "inproc://backend"
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   146
    brokers = []
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   147
    broker = threading.Thread(target = broker_collector, args = (feurl, beurl))
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   148
    broker.start()
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   149
    brokers.append(broker)
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   150
    time.sleep(2)
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   151
    senders = []
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   152
    for sender in xrange(5):
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   153
        sender = threading.Thread(target = connect_requester, args = (feurl,5000))
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   154
        sender.start()
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   155
        senders.append(sender)
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   156
    proxies = []
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   157
    proxy_urls = []
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   158
    for proxy in xrange(1):
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   159
        url = "inproc://proxy_be#%d" % (proxy,)
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   160
        proxy = threading.Thread(target = proxy_collector, args = (beurl, url))
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   161
        proxy.start()
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   162
        proxies.append(proxy)
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   163
        proxy_urls.append(url)
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   164
    time.sleep(2)
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   165
    workers = []
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   166
    for url in proxy_urls:
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   167
        for work in xrange(1):
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   168
            work = threading.Thread(target = connect_worker, args = (url, 1, 0))
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   169
            work.start()
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   170
            workers.append(work)
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   171
    time.sleep(10)
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   172
    stop = True
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   173
    info("Joining thread")
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   174
    for thread in senders + brokers + proxies + workers:
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   175
        thread.join()
48d514cc3309 Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents: 0
diff changeset
   176