Package qubx :: Module sock
[hide private]
[frames] | no frames]

Source Code for Module qubx.sock

  1  """Network base classes. 
  2   
  3  Copyright 2007-2015 Research Foundation State University of New York  
  4  This file is part of QUB Express.                                           
  5   
  6  QUB Express is free software; you can redistribute it and/or modify           
  7  it under the terms of the GNU General Public License as published by  
  8  the Free Software Foundation, either version 3 of the License, or     
  9  (at your option) any later version.                                   
 10   
 11  QUB Express is distributed in the hope that it will be useful,                
 12  but WITHOUT ANY WARRANTY; without even the implied warranty of        
 13  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the         
 14  GNU General Public License for more details.                          
 15   
 16  You should have received a copy of the GNU General Public License,    
 17  named LICENSE.txt, in the QUB Express program directory.  If not, see         
 18  <http://www.gnu.org/licenses/>.                                       
 19   
 20  """ 
 21   
 22  from __future__ import absolute_import 
 23   
 24  from threading import * 
 25  import cStringIO 
 26  import Queue 
 27  import socket 
 28  import select 
 29  import traceback 
 30   
 31  import json 
 32  import urllib2 
 33  from qubx.util_types import JsonAnon 
 34   
35 -class Stop(Exception):
36 """ 37 Raised and caught internally to break out of a read when Socket.stop() is called 38 """
39 - def __init__(self):
40 Exception.__init__(self, 'stop requested')
41
42 -class Socket(object):
43 """ 44 Socket(host, port, async=True) 45 46 Common base class for sockets that communcate by structured blocks. 47 e.g. L{RemoteConnect}. 48 A Socket can manage its own reading-thread and enqueue messages for user-thread processing. 49 50 To specialize, override readMsg, sendMsg, msgNonNull, and msgIsNull. 51 52 @ivar sock: a socket.socket 53 """
54 - def __init__(self, sock, async=True):
55 self.sock = sock 56 self.sock.settimeout(1.0) 57 self.stopFlag = False 58 self.stopped = False 59 self.async = async 60 if async: 61 self.rq = Queue.Queue() 62 self.thread = Thread(target=self.readerThread) 63 self.thread.setDaemon(True) 64 self.thread.start()
65 - def __del__(self):
66 try: 67 if async: 68 self.stopFlag = True 69 self.thread.join() 70 else: 71 self.sock.close() 72 except: 73 pass
74 - def recvMsg(self, block=True, timeout=None): # async
75 """ 76 :: 77 if async: 78 return the next message from the queue, possibly blocking until timeout. 79 if the timeout is reached, raise Queue.empty 80 if not async: 81 return readMsg() 82 """ 83 if self.async: 84 return self.rq.get(block, timeout) # raises Queue.Empty if timeout or not block 85 else: # ignoring block, timeout 86 return self.readMsg()
87 - def readerThread(self):
88 msg = self.msgNonNull() 89 while (not self.msgIsNull(msg)) and (not self.stopFlag): 90 msg = self.readMsg() 91 if not self.msgIsNull(msg): 92 self.rq.put( msg ) 93 self.sock.close() 94 self.stopped = True
95 - def stop(self):
96 """If async, stops the reader thread.""" 97 self.stopFlag = True
98 - def done(self):
99 """Returns True if the socket is closed and rq is empty.""" 100 return self.stopped and not self.rq.qsize()
101 - def readBlock(self, count):
102 """ 103 Returns a block of count bytes as a string, 104 or raises qubsock.Stop, 105 or any relevant socket.error from select or recv. 106 """ 107 stopRequested = lambda: self.async and self.stopFlag 108 block = '' 109 while len(block) < count: 110 if stopRequested(): 111 raise Stop() 112 try: 113 readers, writers, errs = select.select((self.sock,), (), (self.sock,), 1.0) 114 if errs: 115 raise socket.error() 116 if readers: 117 sub = self.sock.recv(count-len(block)) 118 if len(sub) == 0: 119 raise socket.error() 120 block += sub 121 except socket.timeout: 122 pass 123 return block
124 - def readLine(self, terminator='\n'):
125 """ 126 Returns the next line as a string, including the trailing terminator, 127 or raises qubsock.Stop, 128 or any relevant socket.error from select or recv. 129 """ 130 stopRequested = lambda: self.async and self.stopFlag 131 block = cStringIO.StringIO() 132 while True: 133 if stopRequested(): 134 raise Stop() 135 try: 136 readers, writers, errs = select.select((self.sock,), (), (self.sock,), 1.0) 137 if errs: 138 raise socket.error() 139 if readers: 140 sub = self.sock.recv(1) 141 if len(sub) == 0: 142 raise socket.error() 143 block.write(sub) 144 if sub == terminator: 145 break 146 except socket.timeout: 147 pass 148 return block.getvalue()
149 - def sendBlock(self, block):
150 """Sends block as a string of bytes; returns True on success, False if the socket closed.""" 151 blocklen = len(block) 152 sent = 0 153 while sent < blocklen: 154 try: 155 sent1 = self.sock.send(block[sent:]) 156 if sent1 == 0: # hangup 157 self.stop() 158 return False 159 sent += sent1 160 except socket.error: 161 self.stop() 162 return False 163 except socket.timeout: 164 pass 165 return True
166 - def msgNonNull(self):
167 """Override this method to return a (simple) non-null message (for use as place-holder).""" 168 raise Exception('unimplemented')
169 - def msgIsNull(self, msg):
170 """Override this method to return whether a message is null.""" 171 raise Exception('unimplemented')
172 - def sendMsg(self, msg):
173 """Override this method to send a message out self.sock.""" 174 raise Exception('unimplemented')
175 - def readMsg(self):
176 """ 177 Override this method to read a structured message using readBlock and return it. 178 Returns a null message to indicate end-of-stream (handle all exceptions). 179 """ 180 raise Exception('unimplemented')
181
182 -def connect_as_client(host, port):
183 """Returns a new client L{socket.socket} connected to host, port.""" 184 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 185 sock.connect((host, port)) 186 return sock
187
188 -class TCPServer(object):
189 """Listens for connections on its own thread; calls session_f(each new L{socket.socket}). 190 191 @ivar host: 192 @ivar port: 193 """
194 - def __init__(self, host, port, session_f, seek_ports=0):
195 """ 196 @param host: 197 @param port: base TCP port number 198 @param session_f: function(L{socket.socket}) called on each new connection 199 @param seek_ports: attempts to bind ports (port, port+1, port+2, ..., port+seek_ports) until a port is free 200 """ 201 self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 202 self.session_f = session_f 203 self.host = host 204 self.port = port 205 ports2check = seek_ports + 1 206 while True: 207 try: 208 self.sock.bind((host, self.port)) 209 break 210 except socket.error: 211 if ports2check <= 1: 212 raise # exit on socket.error when ports2check are exhausted 213 self.port += 1 214 ports2check -= 1 215 self.sock.listen(1) 216 self.sock.settimeout(1.0) 217 self.stopFlag = False 218 self.thread = Thread(target=self.listenThread) 219 self.thread.setDaemon(True) 220 self.thread.start()
221 - def __del__(self):
222 try: 223 self.stopFlag = True 224 self.thread.join() 225 except: 226 pass
227 - def stop(self):
228 """Requests the listening thread to stop and close the socket; does not .thread.join().""" 229 self.stopFlag = True
230 - def listenThread(self):
231 while not self.stopFlag: 232 try: 233 client, addr = self.sock.accept() 234 self.session_f(client, addr) 235 except socket.timeout: 236 pass 237 except: 238 traceback.print_exc() 239 self.sock.close()
240 241
242 -def postJson(x, url, wantResult=True):
243 if isinstance(x, str) or isinstance(x, unicode): 244 s = x 245 elif isinstance(x, JsonAnon): 246 s = repr(x) 247 else: 248 s = json.dumps(x) 249 req = urllib2.Request(url) 250 req.add_header('Content-Type', 'application/json') 251 response = urllib2.urlopen(req, s) 252 if wantResult: 253 resp = response.read() 254 result = json.loads(resp) 255 if isinstance(x, JsonAnon): 256 result = JsonAnon(result) 257 return result
258