Package wsgitools :: Package scgi :: Module forkpool
[hide private]
[frames] | no frames]

Source Code for Module wsgitools.scgi.forkpool

  1  """ 
  2  The L{forkpool.SCGIServer} adapts a wsgi application to a scgi service. 
  3   
  4  It works with multiple processes that are periodically cleaned up to prevent 
  5  memory leaks having an impact to the system. 
  6  """ 
  7   
  8  import socket 
  9  import os 
 10  import select 
 11  import sys 
 12  import errno 
 13  import signal 
 14   
15 -class SocketFileWrapper:
16 """Wraps a socket to a wsgi-compliant file-like object."""
17 - def __init__(self, sock, toread):
18 """@param sock: is a C{socket.socket()}""" 19 self.sock = sock 20 self.buff = "" 21 self.toread = toread
22
23 - def _recv(self, size=4096):
24 """ 25 internal method for receiving and counting incoming data 26 @raise socket.error: 27 """ 28 toread = min(size, self.toread) 29 if not toread: 30 return "" 31 try: 32 data = self.sock.recv(toread) 33 except socket.error, why: 34 if why[0] in (errno.ECONNRESET, errno.ENOTCONN, errno.ESHUTDOWN): 35 data = "" 36 else: 37 raise 38 self.toread -= len(data) 39 return data
40
41 - def close(self):
42 """Does not close the socket, because it might still be needed. It 43 reads all data that should have been read as given by C{CONTENT_LENGTH}. 44 """ 45 try: 46 while self.toread > 0: 47 if not self._recv(min(self.toread, 4096)): 48 return 49 except socket.error: 50 pass
51
52 - def read(self, size=None):
53 """ 54 see pep333 55 @raise socket.error: 56 """ 57 if size is None: 58 retl = [] 59 data = self.buff 60 self.buff = "" 61 while True: 62 retl.append(data) 63 try: 64 data = self._recv() 65 except socket.error: 66 break 67 if not data: 68 break 69 return "".join(retl) 70 datalist = [self.buff] 71 datalen = len(self.buff) 72 while datalen < size: 73 try: 74 data = self._recv(min(4096, size - datalen)) 75 except socket.error: 76 break 77 if not data: 78 break 79 datalist.append(data) 80 datalen += len(data) 81 self.buff = "".join(datalist) 82 83 if size <= len(self.buff): 84 ret, self.buff = self.buff[:size], self.buff[size:] 85 return ret 86 ret, self.buff = self.buff, "" 87 return ret
88
89 - def readline(self, size=None):
90 """ 91 see pep333 92 @raise socket.error: 93 """ 94 while True: 95 try: 96 split = self.buff.index('\n') + 1 97 if size is not None and split > size: 98 split = size 99 ret, self.buff = self.buff[:split], self.buff[split:] 100 return ret 101 except ValueError: 102 if size is not None: 103 if len(self.buff) < size: 104 data = self._recv(size - len(self.buff)) 105 else: 106 ret, self.buff = self.buff[:size], self.buff[size:] 107 return ret 108 else: 109 data = self._recv(4096) 110 if not data: 111 ret, self.buff = self.buff, "" 112 return ret 113 self.buff += data
114
115 - def readlines(self):
116 """ 117 see pep333 118 @raise socket.error: 119 """ 120 data = self.readline() 121 while data: 122 yield data 123 data = self.readline()
124 - def __iter__(self):
125 """see pep333""" 126 return self
127 - def next(self):
128 """ 129 see pep333 130 @raise socket.error: 131 """ 132 data = self.read(4096) 133 if not data: 134 raise StopIteration 135 return data
136 - def flush(self):
137 """see pep333""" 138 pass
139 - def write(self, data):
140 """see pep333""" 141 assert isinstance(data, str) 142 try: 143 self.sock.sendall(data) 144 except socket.error: 145 # ignore all socket errors: there is no way to report 146 return
147 - def writelines(self, lines):
148 """see pep333""" 149 for line in lines: 150 self.write(line)
151
152 -class SCGIServer:
153 """Usage: create an L{SCGIServer} object and invoke the run method which 154 will then turn this process into an scgi server."""
155 - class WorkerState:
156 """state: 0 means idle and 1 means working. 157 These values are also sent as strings '0' and '1' over the socket."""
158 - def __init__(self, pid, sock, state):
159 self.pid = pid 160 self.sock = sock 161 self.state = state
162
163 - def __init__(self, wsgiapp, port, interface="localhost", error=sys.stderr, 164 minworkers=2, maxworkers=32, maxrequests=1000, config={}):
165 """ 166 @param wsgiapp: is the WSGI application to be run. 167 @type port: int 168 @param port: is the tcp port to listen on 169 @type interface: str 170 @param interface: is the interface to bind to (default: C{"localhost"}) 171 @param error: is a file-like object beeing passed as C{wsgi.error} in 172 environ 173 @type minworkers: int 174 @param minworkers: is the number of worker processes to spawn 175 @type maxworkers: int 176 @param maxworkers: is the maximum number of workers that can be spawned 177 on demand 178 @type maxrequests: int 179 @param maxrequests: is the number of requests a worker processes before 180 dying 181 @type config: {} 182 @param config: the environ dictionary is updated using these values for 183 each request. 184 """ 185 assert hasattr(error, "write") 186 self.wsgiapp = wsgiapp 187 self.port = port 188 self.interface = interface 189 self.minworkers = minworkers 190 self.maxworkers = maxworkers 191 self.maxrequests = maxrequests 192 self.config = config 193 self.error = error 194 self.server = None # becomes a socket 195 # maps filedescriptors to WorkerStates 196 self.workers = {} 197 self.running = False 198 self.ischild = False
199
200 - def enable_sighandler(self, sig=signal.SIGTERM):
201 """ 202 Changes the signal handler for the given signal to terminate the run() 203 loop. 204 @param sig: is the signal to handle 205 @returns: self 206 """ 207 signal.signal(sig, self.shutdownhandler) 208 return self
209
210 - def run(self):
211 """ 212 Serve the wsgi application. 213 """ 214 self.server = socket.socket() 215 self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 216 self.server.bind((self.interface, self.port)) 217 self.server.listen(5) 218 self.running = True 219 while self.running: 220 while (len(self.workers) < self.minworkers or # less than min 221 (len(self.workers) < self.maxworkers and # less than max 222 not len([w for w in # no inactive 223 self.workers.values() if w.state == 0]))): 224 self.spawnworker() 225 try: 226 rs, _, _ = select.select(self.workers.keys(), [], []) 227 except select.error, e: 228 if e[0] != errno.EINTR: 229 raise 230 rs = [] 231 for s in rs: 232 try: 233 data = self.workers[s].sock.recv(1) 234 except socket.error: 235 # we cannot handle errors here, so drop the connection. 236 data = '' 237 if data == '': 238 self.workers[s].sock.close() 239 del self.workers[s] 240 elif data in ('0', '1'): 241 self.workers[s].state = int(data) 242 else: 243 raise RuntimeError("unexpected data from worker") 244 try: 245 pid = 1 246 while pid > 0: 247 pid, _ = os.waitpid(0, os.WNOHANG) 248 except OSError: 249 pass 250 self.server.close() 251 self.server = None 252 self.killworkers()
253
254 - def killworkers(self, sig=signal.SIGTERM):
255 """ 256 Kills all worker children. 257 @param sig: is the signal used to kill the children 258 """ 259 while self.workers: 260 _, state = self.workers.popitem() 261 state.sock.close() 262 os.kill(state.pid, sig)
263 # TODO: handle working children with a timeout 264
265 - def shutdownhandler(self, sig=None, stackframe=None):
266 """ 267 Signal handler function for stopping the run() loop. It works by 268 setting a variable that run() evaluates in each loop. As a signal 269 interrupts accept the loop is terminated, the accepting socket is 270 closed and the workers are killed. 271 @param sig: ignored for usage with signal.signal 272 @param stackframe: ignored for usage with signal.signal 273 """ 274 if self.ischild: 275 sys.exit() 276 else: 277 self.running = False
278
279 - def spawnworker(self):
280 """ 281 internal! spawns a single worker 282 """ 283 srvsock, worksock = socket.socketpair() 284 285 pid = os.fork() 286 if pid == 0: 287 self.ischild = True 288 # close unneeded sockets 289 srvsock.close() 290 for worker in self.workers.values(): 291 worker.sock.close() 292 del self.workers 293 294 try: 295 self.work(worksock) 296 except socket.error: 297 pass 298 299 sys.exit() 300 elif pid > 0: 301 # close unneeded sockets 302 worksock.close() 303 304 self.workers[srvsock.fileno()] = SCGIServer.\ 305 WorkerState(pid, srvsock, 0) 306 else: 307 raise RuntimeError("fork failed")
308
309 - def work(self, worksock):
310 """ 311 internal! serves maxrequests times 312 @raise socket.error: 313 """ 314 for _ in range(self.maxrequests): 315 (con, addr) = self.server.accept() 316 # we cannot handle socket.errors here. 317 worksock.sendall('1') # tell server we're working 318 self.process(con) 319 worksock.sendall('0') # tell server we've finished
320
321 - def process(self, con):
322 """ 323 internal! processes a single request on the connection con. 324 """ 325 # This is a little bit ugly: 326 # The server has to send the length of the request followed by a colon. 327 # We assume that 1. the colon is within the first seven bytes. 328 # 2. the packet isn't fragmented. 329 # Furthermore 1 implies that the request isn't longer than 999999 bytes. 330 # This method however works. :-) 331 try: 332 data = con.recv(7) 333 except socket.error: 334 con.close() 335 return 336 if not ':' in data: 337 con.close() 338 return 339 length, data = data.split(':', 1) 340 if not length.isdigit(): # clear protocol violation 341 con.close() 342 return 343 length = int(length) 344 345 while len(data) != length + 1: # read one byte beyond 346 try: 347 t = con.recv(min(4096, length + 1 - len(data))) 348 except socket.error: 349 con.close() 350 return 351 if not t: # request too short 352 con.close() 353 return 354 data += t 355 356 # netstrings! 357 data = data.split('\0') 358 # the byte beyond has to be a ','. 359 # and the number of netstrings excluding the final ',' has to be even 360 if data.pop() != ',' or len(data) % 2 != 0: 361 con.close() 362 return 363 364 environ = self.config.copy() 365 while data: 366 key = data.pop(0) 367 value = data.pop(0) 368 environ[key] = value 369 370 # elements: 371 # 0 -> None: no headers set 372 # 0 -> False: set but unsent 373 # 0 -> True: sent 374 # 1 -> status string 375 # 2 -> header list 376 response_head = [None, None, None] 377 378 def sendheaders(): 379 assert response_head[0] is not None # headers set 380 if response_head[0] != True: 381 response_head[0] = True 382 try: 383 con.sendall('Status: %s\r\n%s\r\n\r\n' % (response_head[1], 384 '\r\n'.join(map("%s: %s".__mod__, 385 response_head[2])))) 386 except socket.error: 387 pass
388 389 def dumbsend(data): 390 sendheaders() 391 try: 392 con.sendall(data) 393 except socket.error: 394 pass
395 396 def start_response(status, headers, exc_info=None): 397 if exc_info and response_head[0]: 398 try: 399 raise exc_info[0], exc_info[1], exc_info[2] 400 finally: 401 exc_info = None 402 assert not response_head[0] # unset or not sent 403 response_head[0] = False # set but nothing sent 404 response_head[1] = status 405 response_head[2] = headers 406 return dumbsend 407 408 if not environ.get("CONTENT_LENGTH", "bad").isdigit(): 409 con.close() 410 return 411 412 sfw = SocketFileWrapper(con, int(environ["CONTENT_LENGTH"])) 413 environ.update({ 414 "wsgi.version": (1, 0), 415 "wsgi.input": sfw, 416 "wsgi.errors": self.error, 417 "wsgi.url_scheme": "http", 418 "wsgi.multithread": False, 419 "wsgi.multiprocess": True, 420 "wsgi.run_once": False}) 421 if environ.get("HTTPS", "no").lower() in ('yes', 'y', '1'): 422 environ["wsgi.url_scheme"] = "https" 423 if "HTTP_CONTENT_TYPE" in environ: 424 environ["CONTENT_TYPE"] = environ.pop("HTTP_CONTENT_TYPE") 425 if "HTTP_CONTENT_LENGTH" in environ: 426 del environ["HTTP_CONTENT_LENGTH"] # TODO: better way? 427 428 result = self.wsgiapp(environ, start_response) 429 assert hasattr(result, "__iter__") 430 431 assert response_head[0] is not None 432 result_iter = iter(result) 433 for data in result_iter: 434 assert isinstance(data, str) 435 dumbsend(data) 436 if response_head[0] != True: 437 sendheaders() 438 if hasattr(result, "close"): 439 result.close() 440 sfw.close() 441 con.close() 442