1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17 import socket
18 from threading import Thread
19 from gofer.messaging import getuuid, Queue
20 from gofer.messaging.consumer import Consumer, Reader
21 from gofer.messaging.producer import Producer, BinaryProducer
22 from logging import getLogger
23
24 log = getLogger(__name__)
25
26
27
28
29
32
33
34
35
36
37
38
39
41
42 HOST = socket.gethostname()
43
49
50
63
64
66
68 Thread.__init__(self)
69 self.url = url
70 self.peer = peer
71 self.port = int(port)
72 self.setDaemon(True)
73
75 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
76 sock.bind((socket.gethostname(), self.port))
77 sock.listen(5)
78 while True:
79 (client, address) = sock.accept()
80 try:
81 self.accepted(client)
82 except Exception:
83 log.exception(address)
84
95
96
98
100 Thread.__init__(self)
101 self.reader = reader
102 self.sock = sock
103 self.setDaemon(True)
104
106 try:
107 self.__read()
108 finally:
109 self.reader.close()
110
112 self.sock.setsockopt(
113 socket.IPPROTO_TCP,
114 socket.TCP_NODELAY, 1)
115 while True:
116 m = self.reader.read(5)
117 if not m:
118 continue
119 if m.content:
120 n = self.__write(m.content)
121 if n == 0:
122
123 break
124 else:
125
126 self.close()
127 break
128
130 try:
131 return self.sock.send(content)
132 except:
133 return 0
134
136 try:
137 self.sock.close()
138 except:
139 pass
140
141
143
144 BUFSIZE = 0x100000
145
147 Thread.__init__(self)
148 self.url = url
149 self.sock = sock
150 self.queue = queue
151 self.setDaemon(True)
152
163
165 try:
166 return self.sock.recv(self.BUFSIZE)
167 except:
168 pass
169
171 try:
172 self.sock.close()
173 except:
174 pass
175
176
177
178
179
180
181 UUID = 'GATEWAY'
182 URL = 'tcp://localhost:5672'
183
184 if __name__ == '__main__':
185 from logging import basicConfig
186 basicConfig()
187 b = Bridge(URL, UUID, 443)
188 b.start()
189 g = Gateway(URL, UUID, 9443)
190 g.start()
191 g.join()
192