1 """
2 The L{forkpool.SCGIServer} adapts a wsgi application to a scgi service.
3
4 It works with multiple processes that are periodically cleaned up to prevent
5 memory leaks having an impact to the system.
6 """
7
8 import socket
9 import os
10 import select
11 import sys
12 import errno
13 import signal
14
16 """Wraps a socket to a wsgi-compliant file-like object."""
18 """@param sock: is a C{socket.socket()}"""
19 self.sock = sock
20 self.buff = ""
21 self.toread = toread
22
23 - def _recv(self, size=4096):
24 """
25 internal method for receiving and counting incoming data
26 @raise socket.error:
27 """
28 toread = min(size, self.toread)
29 if not toread:
30 return ""
31 try:
32 data = self.sock.recv(toread)
33 except socket.error, why:
34 if why[0] in (errno.ECONNRESET, errno.ENOTCONN, errno.ESHUTDOWN):
35 data = ""
36 else:
37 raise
38 self.toread -= len(data)
39 return data
40
42 """Does not close the socket, because it might still be needed. It
43 reads all data that should have been read as given by C{CONTENT_LENGTH}.
44 """
45 try:
46 while self.toread > 0:
47 if not self._recv(min(self.toread, 4096)):
48 return
49 except socket.error:
50 pass
51
52 - def read(self, size=None):
53 """
54 see pep333
55 @raise socket.error:
56 """
57 if size is None:
58 retl = []
59 data = self.buff
60 self.buff = ""
61 while True:
62 retl.append(data)
63 try:
64 data = self._recv()
65 except socket.error:
66 break
67 if not data:
68 break
69 return "".join(retl)
70 datalist = [self.buff]
71 datalen = len(self.buff)
72 while datalen < size:
73 try:
74 data = self._recv(min(4096, size - datalen))
75 except socket.error:
76 break
77 if not data:
78 break
79 datalist.append(data)
80 datalen += len(data)
81 self.buff = "".join(datalist)
82
83 if size <= len(self.buff):
84 ret, self.buff = self.buff[:size], self.buff[size:]
85 return ret
86 ret, self.buff = self.buff, ""
87 return ret
88
90 """
91 see pep333
92 @raise socket.error:
93 """
94 while True:
95 try:
96 split = self.buff.index('\n') + 1
97 if size is not None and split > size:
98 split = size
99 ret, self.buff = self.buff[:split], self.buff[split:]
100 return ret
101 except ValueError:
102 if size is not None:
103 if len(self.buff) < size:
104 data = self._recv(size - len(self.buff))
105 else:
106 ret, self.buff = self.buff[:size], self.buff[size:]
107 return ret
108 else:
109 data = self._recv(4096)
110 if not data:
111 ret, self.buff = self.buff, ""
112 return ret
113 self.buff += data
114
116 """
117 see pep333
118 @raise socket.error:
119 """
120 data = self.readline()
121 while data:
122 yield data
123 data = self.readline()
125 """see pep333"""
126 return self
128 """
129 see pep333
130 @raise socket.error:
131 """
132 data = self.read(4096)
133 if not data:
134 raise StopIteration
135 return data
137 """see pep333"""
138 pass
140 """see pep333"""
141 assert isinstance(data, str)
142 try:
143 self.sock.sendall(data)
144 except socket.error:
145
146 return
148 """see pep333"""
149 for line in lines:
150 self.write(line)
151
153 """Usage: create an L{SCGIServer} object and invoke the run method which
154 will then turn this process into an scgi server."""
156 """state: 0 means idle and 1 means working.
157 These values are also sent as strings '0' and '1' over the socket."""
159 self.pid = pid
160 self.sock = sock
161 self.state = state
162
163 - def __init__(self, wsgiapp, port, interface="localhost", error=sys.stderr,
164 minworkers=2, maxworkers=32, maxrequests=1000, config={}):
165 """
166 @param wsgiapp: is the WSGI application to be run.
167 @type port: int
168 @param port: is the tcp port to listen on
169 @type interface: str
170 @param interface: is the interface to bind to (default: C{"localhost"})
171 @param error: is a file-like object beeing passed as C{wsgi.error} in
172 environ
173 @type minworkers: int
174 @param minworkers: is the number of worker processes to spawn
175 @type maxworkers: int
176 @param maxworkers: is the maximum number of workers that can be spawned
177 on demand
178 @type maxrequests: int
179 @param maxrequests: is the number of requests a worker processes before
180 dying
181 @type config: {}
182 @param config: the environ dictionary is updated using these values for
183 each request.
184 """
185 assert hasattr(error, "write")
186 self.wsgiapp = wsgiapp
187 self.port = port
188 self.interface = interface
189 self.minworkers = minworkers
190 self.maxworkers = maxworkers
191 self.maxrequests = maxrequests
192 self.config = config
193 self.error = error
194 self.server = None
195
196 self.workers = {}
197 self.running = False
198 self.ischild = False
199
201 """
202 Changes the signal handler for the given signal to terminate the run()
203 loop.
204 @param sig: is the signal to handle
205 @returns: self
206 """
207 signal.signal(sig, self.shutdownhandler)
208 return self
209
211 """
212 Serve the wsgi application.
213 """
214 self.server = socket.socket()
215 self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
216 self.server.bind((self.interface, self.port))
217 self.server.listen(5)
218 self.running = True
219 while self.running:
220 while (len(self.workers) < self.minworkers or
221 (len(self.workers) < self.maxworkers and
222 not len([w for w in
223 self.workers.values() if w.state == 0]))):
224 self.spawnworker()
225 try:
226 rs, _, _ = select.select(self.workers.keys(), [], [])
227 except select.error, e:
228 if e[0] != errno.EINTR:
229 raise
230 rs = []
231 for s in rs:
232 try:
233 data = self.workers[s].sock.recv(1)
234 except socket.error:
235
236 data = ''
237 if data == '':
238 self.workers[s].sock.close()
239 del self.workers[s]
240 elif data in ('0', '1'):
241 self.workers[s].state = int(data)
242 else:
243 raise RuntimeError("unexpected data from worker")
244 try:
245 pid = 1
246 while pid > 0:
247 pid, _ = os.waitpid(0, os.WNOHANG)
248 except OSError:
249 pass
250 self.server.close()
251 self.server = None
252 self.killworkers()
253
255 """
256 Kills all worker children.
257 @param sig: is the signal used to kill the children
258 """
259 while self.workers:
260 _, state = self.workers.popitem()
261 state.sock.close()
262 os.kill(state.pid, sig)
263
264
266 """
267 Signal handler function for stopping the run() loop. It works by
268 setting a variable that run() evaluates in each loop. As a signal
269 interrupts accept the loop is terminated, the accepting socket is
270 closed and the workers are killed.
271 @param sig: ignored for usage with signal.signal
272 @param stackframe: ignored for usage with signal.signal
273 """
274 if self.ischild:
275 sys.exit()
276 else:
277 self.running = False
278
280 """
281 internal! spawns a single worker
282 """
283 srvsock, worksock = socket.socketpair()
284
285 pid = os.fork()
286 if pid == 0:
287 self.ischild = True
288
289 srvsock.close()
290 for worker in self.workers.values():
291 worker.sock.close()
292 del self.workers
293
294 try:
295 self.work(worksock)
296 except socket.error:
297 pass
298
299 sys.exit()
300 elif pid > 0:
301
302 worksock.close()
303
304 self.workers[srvsock.fileno()] = SCGIServer.\
305 WorkerState(pid, srvsock, 0)
306 else:
307 raise RuntimeError("fork failed")
308
309 - def work(self, worksock):
310 """
311 internal! serves maxrequests times
312 @raise socket.error:
313 """
314 for _ in range(self.maxrequests):
315 (con, addr) = self.server.accept()
316
317 worksock.sendall('1')
318 self.process(con)
319 worksock.sendall('0')
320
322 """
323 internal! processes a single request on the connection con.
324 """
325
326
327
328
329
330
331 try:
332 data = con.recv(7)
333 except socket.error:
334 con.close()
335 return
336 if not ':' in data:
337 con.close()
338 return
339 length, data = data.split(':', 1)
340 if not length.isdigit():
341 con.close()
342 return
343 length = int(length)
344
345 while len(data) != length + 1:
346 try:
347 t = con.recv(min(4096, length + 1 - len(data)))
348 except socket.error:
349 con.close()
350 return
351 if not t:
352 con.close()
353 return
354 data += t
355
356
357 data = data.split('\0')
358
359
360 if data.pop() != ',' or len(data) % 2 != 0:
361 con.close()
362 return
363
364 environ = self.config.copy()
365 while data:
366 key = data.pop(0)
367 value = data.pop(0)
368 environ[key] = value
369
370
371
372
373
374
375
376 response_head = [None, None, None]
377
378 def sendheaders():
379 assert response_head[0] is not None
380 if response_head[0] != True:
381 response_head[0] = True
382 try:
383 con.sendall('Status: %s\r\n%s\r\n\r\n' % (response_head[1],
384 '\r\n'.join(map("%s: %s".__mod__,
385 response_head[2]))))
386 except socket.error:
387 pass
388
389 def dumbsend(data):
390 sendheaders()
391 try:
392 con.sendall(data)
393 except socket.error:
394 pass
395
396 def start_response(status, headers, exc_info=None):
397 if exc_info and response_head[0]:
398 try:
399 raise exc_info[0], exc_info[1], exc_info[2]
400 finally:
401 exc_info = None
402 assert not response_head[0]
403 response_head[0] = False
404 response_head[1] = status
405 response_head[2] = headers
406 return dumbsend
407
408 if not environ.get("CONTENT_LENGTH", "bad").isdigit():
409 con.close()
410 return
411
412 sfw = SocketFileWrapper(con, int(environ["CONTENT_LENGTH"]))
413 environ.update({
414 "wsgi.version": (1, 0),
415 "wsgi.input": sfw,
416 "wsgi.errors": self.error,
417 "wsgi.url_scheme": "http",
418 "wsgi.multithread": False,
419 "wsgi.multiprocess": True,
420 "wsgi.run_once": False})
421 if environ.get("HTTPS", "no").lower() in ('yes', 'y', '1'):
422 environ["wsgi.url_scheme"] = "https"
423 if "HTTP_CONTENT_TYPE" in environ:
424 environ["CONTENT_TYPE"] = environ.pop("HTTP_CONTENT_TYPE")
425 if "HTTP_CONTENT_LENGTH" in environ:
426 del environ["HTTP_CONTENT_LENGTH"]
427
428 result = self.wsgiapp(environ, start_response)
429 assert hasattr(result, "__iter__")
430
431 assert response_head[0] is not None
432 result_iter = iter(result)
433 for data in result_iter:
434 assert isinstance(data, str)
435 dumbsend(data)
436 if response_head[0] != True:
437 sendheaders()
438 if hasattr(result, "close"):
439 result.close()
440 sfw.close()
441 con.close()
442