1 """Network base classes.
2
3 Copyright 2007-2015 Research Foundation State University of New York
4 This file is part of QUB Express.
5
6 QUB Express is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation, either version 3 of the License, or
9 (at your option) any later version.
10
11 QUB Express is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
15
16 You should have received a copy of the GNU General Public License,
17 named LICENSE.txt, in the QUB Express program directory. If not, see
18 <http://www.gnu.org/licenses/>.
19
20 """
21
22 from __future__ import absolute_import
23
24 from threading import *
25 import cStringIO
26 import Queue
27 import socket
28 import select
29 import traceback
30
31 import json
32 import urllib2
33 from qubx.util_types import JsonAnon
34
35 -class Stop(Exception):
36 """
37 Raised and caught internally to break out of a read when Socket.stop() is called
38 """
40 Exception.__init__(self, 'stop requested')
41
43 """
44 Socket(host, port, async=True)
45
46 Common base class for sockets that communcate by structured blocks.
47 e.g. L{RemoteConnect}.
48 A Socket can manage its own reading-thread and enqueue messages for user-thread processing.
49
50 To specialize, override readMsg, sendMsg, msgNonNull, and msgIsNull.
51
52 @ivar sock: a socket.socket
53 """
55 self.sock = sock
56 self.sock.settimeout(1.0)
57 self.stopFlag = False
58 self.stopped = False
59 self.async = async
60 if async:
61 self.rq = Queue.Queue()
62 self.thread = Thread(target=self.readerThread)
63 self.thread.setDaemon(True)
64 self.thread.start()
66 try:
67 if async:
68 self.stopFlag = True
69 self.thread.join()
70 else:
71 self.sock.close()
72 except:
73 pass
74 - def recvMsg(self, block=True, timeout=None):
75 """
76 ::
77 if async:
78 return the next message from the queue, possibly blocking until timeout.
79 if the timeout is reached, raise Queue.empty
80 if not async:
81 return readMsg()
82 """
83 if self.async:
84 return self.rq.get(block, timeout)
85 else:
86 return self.readMsg()
96 """If async, stops the reader thread."""
97 self.stopFlag = True
99 """Returns True if the socket is closed and rq is empty."""
100 return self.stopped and not self.rq.qsize()
102 """
103 Returns a block of count bytes as a string,
104 or raises qubsock.Stop,
105 or any relevant socket.error from select or recv.
106 """
107 stopRequested = lambda: self.async and self.stopFlag
108 block = ''
109 while len(block) < count:
110 if stopRequested():
111 raise Stop()
112 try:
113 readers, writers, errs = select.select((self.sock,), (), (self.sock,), 1.0)
114 if errs:
115 raise socket.error()
116 if readers:
117 sub = self.sock.recv(count-len(block))
118 if len(sub) == 0:
119 raise socket.error()
120 block += sub
121 except socket.timeout:
122 pass
123 return block
125 """
126 Returns the next line as a string, including the trailing terminator,
127 or raises qubsock.Stop,
128 or any relevant socket.error from select or recv.
129 """
130 stopRequested = lambda: self.async and self.stopFlag
131 block = cStringIO.StringIO()
132 while True:
133 if stopRequested():
134 raise Stop()
135 try:
136 readers, writers, errs = select.select((self.sock,), (), (self.sock,), 1.0)
137 if errs:
138 raise socket.error()
139 if readers:
140 sub = self.sock.recv(1)
141 if len(sub) == 0:
142 raise socket.error()
143 block.write(sub)
144 if sub == terminator:
145 break
146 except socket.timeout:
147 pass
148 return block.getvalue()
150 """Sends block as a string of bytes; returns True on success, False if the socket closed."""
151 blocklen = len(block)
152 sent = 0
153 while sent < blocklen:
154 try:
155 sent1 = self.sock.send(block[sent:])
156 if sent1 == 0:
157 self.stop()
158 return False
159 sent += sent1
160 except socket.error:
161 self.stop()
162 return False
163 except socket.timeout:
164 pass
165 return True
167 """Override this method to return a (simple) non-null message (for use as place-holder)."""
168 raise Exception('unimplemented')
170 """Override this method to return whether a message is null."""
171 raise Exception('unimplemented')
173 """Override this method to send a message out self.sock."""
174 raise Exception('unimplemented')
176 """
177 Override this method to read a structured message using readBlock and return it.
178 Returns a null message to indicate end-of-stream (handle all exceptions).
179 """
180 raise Exception('unimplemented')
181
183 """Returns a new client L{socket.socket} connected to host, port."""
184 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
185 sock.connect((host, port))
186 return sock
187
189 """Listens for connections on its own thread; calls session_f(each new L{socket.socket}).
190
191 @ivar host:
192 @ivar port:
193 """
194 - def __init__(self, host, port, session_f, seek_ports=0):
195 """
196 @param host:
197 @param port: base TCP port number
198 @param session_f: function(L{socket.socket}) called on each new connection
199 @param seek_ports: attempts to bind ports (port, port+1, port+2, ..., port+seek_ports) until a port is free
200 """
201 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
202 self.session_f = session_f
203 self.host = host
204 self.port = port
205 ports2check = seek_ports + 1
206 while True:
207 try:
208 self.sock.bind((host, self.port))
209 break
210 except socket.error:
211 if ports2check <= 1:
212 raise
213 self.port += 1
214 ports2check -= 1
215 self.sock.listen(1)
216 self.sock.settimeout(1.0)
217 self.stopFlag = False
218 self.thread = Thread(target=self.listenThread)
219 self.thread.setDaemon(True)
220 self.thread.start()
222 try:
223 self.stopFlag = True
224 self.thread.join()
225 except:
226 pass
228 """Requests the listening thread to stop and close the socket; does not .thread.join()."""
229 self.stopFlag = True
231 while not self.stopFlag:
232 try:
233 client, addr = self.sock.accept()
234 self.session_f(client, addr)
235 except socket.timeout:
236 pass
237 except:
238 traceback.print_exc()
239 self.sock.close()
240
241
242 -def postJson(x, url, wantResult=True):
243 if isinstance(x, str) or isinstance(x, unicode):
244 s = x
245 elif isinstance(x, JsonAnon):
246 s = repr(x)
247 else:
248 s = json.dumps(x)
249 req = urllib2.Request(url)
250 req.add_header('Content-Type', 'application/json')
251 response = urllib2.urlopen(req, s)
252 if wantResult:
253 resp = response.read()
254 result = json.loads(resp)
255 if isinstance(x, JsonAnon):
256 result = JsonAnon(result)
257 return result
258