| author | Fabien Ninoles <fabien@tzone.org> |
| Tue, 10 May 2011 00:21:11 -0400 | |
| changeset 6 | fdf7da5a5d21 |
| parent 5 | 55f26e7ee45e |
| permissions | -rw-r--r-- |
| 0 | 1 |
import random |
2 |
import time |
|
3 |
import threading |
|
4 |
import zmq |
|
|
1
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
5 |
import logging |
|
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
6 |
from logging import debug, info |
|
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
7 |
from itertools import count |
| 0 | 8 |
|
9 |
READY = "READY" |
|
10 |
context = zmq.Context() |
|
|
1
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
11 |
counter = count() |
| 0 | 12 |
|
|
2
2744eb2a589b
Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents:
1
diff
changeset
|
13 |
def checkzmqerror(func): |
|
2744eb2a589b
Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents:
1
diff
changeset
|
14 |
def wrapper(*args, **kwargs): |
|
2744eb2a589b
Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents:
1
diff
changeset
|
15 |
try: |
|
2744eb2a589b
Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents:
1
diff
changeset
|
16 |
func(*args, **kwargs) |
|
2744eb2a589b
Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents:
1
diff
changeset
|
17 |
except zmq.ZMQError, err: |
|
4
4b5a51cb5fc7
Add failure and near timeout workload.
Fabien Ninoles <fabien@tzone.org>
parents:
3
diff
changeset
|
18 |
info("%r(*%r, **%r) is terminating with error %s", func, args, kwargs, err)
|
|
2
2744eb2a589b
Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents:
1
diff
changeset
|
19 |
return wrapper |
|
2744eb2a589b
Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents:
1
diff
changeset
|
20 |
|
|
6
fdf7da5a5d21
Set timeout by collector instead of by message.
Fabien Ninoles <fabien@tzone.org>
parents:
5
diff
changeset
|
21 |
def collector(name, frontend, backend, timeout): |
|
1
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
22 |
backends = set() |
|
4
4b5a51cb5fc7
Add failure and near timeout workload.
Fabien Ninoles <fabien@tzone.org>
parents:
3
diff
changeset
|
23 |
debug("collector %s is ready with %r backends", name, len(backends))
|
|
2
2744eb2a589b
Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents:
1
diff
changeset
|
24 |
dropped = 0 |
|
1
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
25 |
while True: |
|
2
2744eb2a589b
Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents:
1
diff
changeset
|
26 |
poller = zmq.Poller() |
|
2744eb2a589b
Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents:
1
diff
changeset
|
27 |
poller.register(backend, zmq.POLLIN) |
|
1
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
28 |
poller.register(frontend, zmq.POLLIN) |
|
6
fdf7da5a5d21
Set timeout by collector instead of by message.
Fabien Ninoles <fabien@tzone.org>
parents:
5
diff
changeset
|
29 |
for socket, event in poller.poll(): |
|
1
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
30 |
request = socket.recv_multipart() |
|
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
31 |
debug("collector %s received request %r", name, request)
|
|
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
32 |
if socket is backend: |
|
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
33 |
if request[2] == READY: |
|
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
34 |
debug("collector %s has new backend: %r", name, request[0])
|
|
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
35 |
backends.add(request[0]) |
|
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
36 |
else: |
|
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
37 |
debug("collector %s discard reply %r", name, request)
|
|
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
38 |
else: |
|
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
39 |
delim = request.index("")
|
|
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
40 |
address_stack = request[:delim+1] |
|
6
fdf7da5a5d21
Set timeout by collector instead of by message.
Fabien Ninoles <fabien@tzone.org>
parents:
5
diff
changeset
|
41 |
debug("collector %s has new work to do", name)
|
|
1
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
42 |
recipients = backends |
|
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
43 |
backends = set() |
|
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
44 |
debug("collector %s send requests to %r", name, recipients)
|
|
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
45 |
for dest in recipients: |
|
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
46 |
backend.send_multipart([dest] + request[delim:]) |
|
2
2744eb2a589b
Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents:
1
diff
changeset
|
47 |
poller = zmq.Poller() |
|
2744eb2a589b
Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents:
1
diff
changeset
|
48 |
poller.register(backend, zmq.POLLIN) |
|
6
fdf7da5a5d21
Set timeout by collector instead of by message.
Fabien Ninoles <fabien@tzone.org>
parents:
5
diff
changeset
|
49 |
start = time.time() |
|
fdf7da5a5d21
Set timeout by collector instead of by message.
Fabien Ninoles <fabien@tzone.org>
parents:
5
diff
changeset
|
50 |
deadline = start + timeout / 1000.0 |
|
1
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
51 |
while recipients: |
|
6
fdf7da5a5d21
Set timeout by collector instead of by message.
Fabien Ninoles <fabien@tzone.org>
parents:
5
diff
changeset
|
52 |
debug("%r: collector %s wait on on %r", start, name, recipients)
|
|
fdf7da5a5d21
Set timeout by collector instead of by message.
Fabien Ninoles <fabien@tzone.org>
parents:
5
diff
changeset
|
53 |
events = poller.poll(max(0,deadline-time.time())) |
|
1
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
54 |
for socket, event in events: |
|
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
55 |
reply = socket.recv_multipart() |
|
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
56 |
if reply[2] == READY: |
|
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
57 |
debug("%r is ready on %s", reply[0], name)
|
|
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
58 |
backends.add(reply[0]) |
|
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
59 |
recipients.discard(reply[0]) |
|
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
60 |
elif reply[0] in recipients: |
|
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
61 |
debug("collector %s forward reply", name)
|
|
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
62 |
frontend.send_multipart(address_stack + reply[2:]) |
|
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
63 |
else: |
|
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
64 |
debug("collector %s discard reply %r", name, reply)
|
|
6
fdf7da5a5d21
Set timeout by collector instead of by message.
Fabien Ninoles <fabien@tzone.org>
parents:
5
diff
changeset
|
65 |
end = time.time() |
|
fdf7da5a5d21
Set timeout by collector instead of by message.
Fabien Ninoles <fabien@tzone.org>
parents:
5
diff
changeset
|
66 |
if recipients and end > deadline: |
|
fdf7da5a5d21
Set timeout by collector instead of by message.
Fabien Ninoles <fabien@tzone.org>
parents:
5
diff
changeset
|
67 |
info("%r: collector %s has timeout with %d recipients", end, name, len(recipients))
|
|
fdf7da5a5d21
Set timeout by collector instead of by message.
Fabien Ninoles <fabien@tzone.org>
parents:
5
diff
changeset
|
68 |
break |
|
1
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
69 |
frontend.send_multipart(address_stack + [READY]) |
|
4
4b5a51cb5fc7
Add failure and near timeout workload.
Fabien Ninoles <fabien@tzone.org>
parents:
3
diff
changeset
|
70 |
debug("collector %s is ready with %r backends", name, len(backends))
|
|
1
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
71 |
|
| 0 | 72 |
|
|
2
2744eb2a589b
Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents:
1
diff
changeset
|
73 |
@checkzmqerror |
|
6
fdf7da5a5d21
Set timeout by collector instead of by message.
Fabien Ninoles <fabien@tzone.org>
parents:
5
diff
changeset
|
74 |
def broker_collector(frontend_url, backend_url, timeout): |
|
1
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
75 |
frontend = context.socket(zmq.XREP) |
| 3 | 76 |
frontend.setsockopt(zmq.IDENTITY, backend_url) |
|
1
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
77 |
backend = context.socket(zmq.XREP) |
|
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
78 |
info("Binding broker frontend to %s", frontend_url)
|
|
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
79 |
frontend.bind(frontend_url) |
|
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
80 |
info("Binding broker backend to %s", backend_url)
|
|
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
81 |
backend.bind(backend_url) |
|
6
fdf7da5a5d21
Set timeout by collector instead of by message.
Fabien Ninoles <fabien@tzone.org>
parents:
5
diff
changeset
|
82 |
collector("broker", frontend, backend, timeout)
|
|
1
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
83 |
|
|
2
2744eb2a589b
Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents:
1
diff
changeset
|
84 |
@checkzmqerror |
|
6
fdf7da5a5d21
Set timeout by collector instead of by message.
Fabien Ninoles <fabien@tzone.org>
parents:
5
diff
changeset
|
85 |
def proxy_collector(frontend_url, backend_url, timeout): |
|
1
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
86 |
frontend = context.socket(zmq.XREQ) |
| 3 | 87 |
frontend.setsockopt(zmq.IDENTITY, backend_url) |
|
1
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
88 |
backend = context.socket(zmq.XREP) |
|
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
89 |
info("Connecting proxy frontend to %s", frontend_url)
|
|
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
90 |
frontend.connect(frontend_url) |
|
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
91 |
info("Binding proxy backend to %s", backend_url)
|
|
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
92 |
# Sending presence to frontend. |
|
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
93 |
backend.bind(backend_url) |
|
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
94 |
frontend.send_multipart(["", READY]) |
|
6
fdf7da5a5d21
Set timeout by collector instead of by message.
Fabien Ninoles <fabien@tzone.org>
parents:
5
diff
changeset
|
95 |
collector("proxy", frontend, backend, timeout)
|
|
1
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
96 |
|
| 0 | 97 |
def worker(socket, workload, failure_rate = 0): |
|
1
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
98 |
while True: |
|
4
4b5a51cb5fc7
Add failure and near timeout workload.
Fabien Ninoles <fabien@tzone.org>
parents:
3
diff
changeset
|
99 |
debug("Worker is ready")
|
|
1
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
100 |
socket.send_multipart(["",READY]) |
|
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
101 |
request = socket.recv_multipart() |
|
2
2744eb2a589b
Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents:
1
diff
changeset
|
102 |
debug("Worker receive request %r", request)
|
|
6
fdf7da5a5d21
Set timeout by collector instead of by message.
Fabien Ninoles <fabien@tzone.org>
parents:
5
diff
changeset
|
103 |
content = request.index("") + 1
|
|
fdf7da5a5d21
Set timeout by collector instead of by message.
Fabien Ninoles <fabien@tzone.org>
parents:
5
diff
changeset
|
104 |
address = request[:content] |
|
fdf7da5a5d21
Set timeout by collector instead of by message.
Fabien Ninoles <fabien@tzone.org>
parents:
5
diff
changeset
|
105 |
request = request[content:] |
|
1
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
106 |
assert request[0] == "REQUEST" |
|
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
107 |
if failure_rate and random.randrange(failure_rate) == 0: |
|
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
108 |
info("worker failed")
|
|
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
109 |
return False |
|
5
55f26e7ee45e
Add randomness to workload.
Fabien Ninoles <fabien@tzone.org>
parents:
4
diff
changeset
|
110 |
time.sleep(workload * (1 + random.random())) |
|
2
2744eb2a589b
Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents:
1
diff
changeset
|
111 |
debug("worker send reply")
|
|
1
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
112 |
socket.send_multipart(address + [request[1], "DONE"]) |
| 0 | 113 |
|
|
2
2744eb2a589b
Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents:
1
diff
changeset
|
114 |
@checkzmqerror |
| 0 | 115 |
def connect_worker(url, workload, failure_rate = 0): |
|
1
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
116 |
while True: |
|
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
117 |
socket = context.socket(zmq.XREQ) |
|
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
118 |
info("Connecting worker to %s", url)
|
|
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
119 |
socket.connect(url) |
|
2
2744eb2a589b
Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents:
1
diff
changeset
|
120 |
worker(socket, workload, failure_rate) |
| 0 | 121 |
|
|
6
fdf7da5a5d21
Set timeout by collector instead of by message.
Fabien Ninoles <fabien@tzone.org>
parents:
5
diff
changeset
|
122 |
def requester(socket): |
|
2
2744eb2a589b
Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents:
1
diff
changeset
|
123 |
while True: |
|
1
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
124 |
i = str(counter.next()) |
|
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
125 |
info("Requester send request %s", i)
|
|
6
fdf7da5a5d21
Set timeout by collector instead of by message.
Fabien Ninoles <fabien@tzone.org>
parents:
5
diff
changeset
|
126 |
socket.send_multipart(["", "REQUEST", i]) |
|
1
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
127 |
results = 0 |
|
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
128 |
while True: |
|
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
129 |
reply = socket.recv_multipart() |
|
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
130 |
debug("requester received reply %r", reply)
|
|
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
131 |
if reply == ["",READY]: |
|
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
132 |
break |
|
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
133 |
assert reply[1] == i |
|
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
134 |
results += 1 |
|
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
135 |
info("requester received %d results", results)
|
| 0 | 136 |
|
|
2
2744eb2a589b
Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents:
1
diff
changeset
|
137 |
@checkzmqerror |
|
6
fdf7da5a5d21
Set timeout by collector instead of by message.
Fabien Ninoles <fabien@tzone.org>
parents:
5
diff
changeset
|
138 |
def connect_requester(url): |
|
1
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
139 |
socket = context.socket(zmq.XREQ) |
|
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
140 |
info("Connecting requester to %s", url)
|
|
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
141 |
socket.connect(url) |
|
6
fdf7da5a5d21
Set timeout by collector instead of by message.
Fabien Ninoles <fabien@tzone.org>
parents:
5
diff
changeset
|
142 |
requester(socket) |
| 0 | 143 |
|
144 |
if __name__ == "__main__": |
|
|
1
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
145 |
logging.getLogger().setLevel(logging.INFO) |
|
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
146 |
feurl = "inproc://frontend" |
|
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
147 |
beurl = "inproc://backend" |
|
6
fdf7da5a5d21
Set timeout by collector instead of by message.
Fabien Ninoles <fabien@tzone.org>
parents:
5
diff
changeset
|
148 |
workload = 2.5 |
|
fdf7da5a5d21
Set timeout by collector instead of by message.
Fabien Ninoles <fabien@tzone.org>
parents:
5
diff
changeset
|
149 |
broker_timeout = 5000 |
|
fdf7da5a5d21
Set timeout by collector instead of by message.
Fabien Ninoles <fabien@tzone.org>
parents:
5
diff
changeset
|
150 |
proxy_timeout = 5000 |
|
1
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
151 |
brokers = [] |
|
6
fdf7da5a5d21
Set timeout by collector instead of by message.
Fabien Ninoles <fabien@tzone.org>
parents:
5
diff
changeset
|
152 |
broker = threading.Thread(target = broker_collector, args = (feurl, beurl, broker_timeout)) |
|
1
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
153 |
broker.start() |
|
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
154 |
brokers.append(broker) |
|
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
155 |
time.sleep(2) |
|
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
156 |
senders = [] |
| 3 | 157 |
for sender in xrange(10): |
|
6
fdf7da5a5d21
Set timeout by collector instead of by message.
Fabien Ninoles <fabien@tzone.org>
parents:
5
diff
changeset
|
158 |
sender = threading.Thread(target = connect_requester, args = (feurl,)) |
|
1
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
159 |
sender.start() |
|
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
160 |
senders.append(sender) |
|
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
161 |
proxies = [] |
|
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
162 |
proxy_urls = [] |
| 3 | 163 |
for proxy in xrange(5): |
|
1
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
164 |
url = "inproc://proxy_be#%d" % (proxy,) |
|
6
fdf7da5a5d21
Set timeout by collector instead of by message.
Fabien Ninoles <fabien@tzone.org>
parents:
5
diff
changeset
|
165 |
proxy = threading.Thread(target = proxy_collector, args = (beurl, url, proxy_timeout)) |
|
1
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
166 |
proxy.start() |
|
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
167 |
proxies.append(proxy) |
|
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
168 |
proxy_urls.append(url) |
|
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
169 |
time.sleep(2) |
|
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
170 |
workers = [] |
|
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
171 |
for url in proxy_urls: |
| 3 | 172 |
for work in xrange(5): |
|
6
fdf7da5a5d21
Set timeout by collector instead of by message.
Fabien Ninoles <fabien@tzone.org>
parents:
5
diff
changeset
|
173 |
work = threading.Thread(target = connect_worker, args = (url, 3, 10)) |
|
1
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
174 |
work.start() |
|
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
175 |
workers.append(work) |
| 3 | 176 |
time.sleep(20) |
|
1
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
177 |
info("Joining thread")
|
|
2
2744eb2a589b
Work better if we don't reused the same poller each time.
Fabien Ninoles <fabien@tzone.org>
parents:
1
diff
changeset
|
178 |
context.term() |
|
1
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
179 |
for thread in senders + brokers + proxies + workers: |
|
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
180 |
thread.join() |
|
48d514cc3309
Some rewrite and debugging.
Fabien Ninoles <fabien@tzone.org>
parents:
0
diff
changeset
|
181 |