Kaynağa Gözat

Testing multiprocessing.pool.Pool as process handler for standalone server

Not really working....
Yann Weber 7 yıl önce
ebeveyn
işleme
60378ae4d5
1 değiştirilmiş dosya ile 58 ekleme ve 31 silme
  1. 58
    31
      lodel/plugins/multisite/main.py

+ 58
- 31
lodel/plugins/multisite/main.py Dosyayı Görüntüle

@@ -16,6 +16,10 @@ from io import BufferedWriter
16 16
 from lodel.context import LodelContext
17 17
 from lodel.context import ContextError
18 18
 
19
+
20
+from multiprocessing.pool import Pool
21
+import multiprocessing
22
+
19 23
 ##@brief Set the poll interval to detect shutdown requests (do not work)
20 24
 SHUTDOWN_POLL_INTERVAL = 0.1 # <-- No impact because of ForkingTCPServer bug
21 25
 
@@ -36,12 +40,7 @@ class LodelWSGIHandler(wsgiref.simple_server.WSGIRequestHandler):
36 40
     def handle(self):
37 41
         #Register a signal handler for sigint in the child process
38 42
         req_ref = self.request
39
-        def sigstop_handler_client(signal, frame):
40
-            req_ref.close()
41
-            print("Client %d stopping by signal" % os.getpid())
42
-            os._exit(0)
43
-        signal.signal(KILLING_CHILDS_SIGNAL, sigstop_handler_client)
44
-        #Dirty copy & past from Lib/http/server.py in Cpython sources       
43
+	#Dirty copy & past from Lib/http/server.py in Cpython sources
45 44
         try:
46 45
             self.raw_requestline = self.rfile.readline(65537)
47 46
             if len(self.raw_requestline) > 65536:
@@ -75,37 +74,64 @@ class LodelWSGIHandler(wsgiref.simple_server.WSGIRequestHandler):
75 74
         self.request.close()
76 75
         super().close()
77 76
 
77
+##@brief ForkMixIn using multiprocessing.pool.Pool
78
+class PoolMixIn:
79
+
80
+    pool_size = 10
81
+
82
+    def __init__(self, *args, **kwargs):
83
+        super().__init__(*args, **kwargs)
84
+        self.__pool = Pool(self.__class__.pool_size)
85
+        self.__results = list()
86
+
87
+    def __pool_callback(self, request, client_address):
88
+        try:
89
+            self.finish_request(request, client_address)
90
+            self.shutdown_request(request)
91
+            pass
92
+        except:
93
+            try:
94
+                self.handle_error(request, client_address)
95
+                self.shutdown_request(request)
96
+            finally:
97
+                pass
98
+
99
+    def collect_results(self):
100
+        new_res = list()
101
+        while len(self.__results) > 0:
102
+            cur = self.__results.pop()
103
+            try:
104
+                cur.get(0.0)
105
+            except multiprocessing.TimeoutError:
106
+                new_res.append(cur)
107
+        self.__results = new_res
108
+
109
+    def process_request(self, request, client_address):
110
+        self.collect_results()
111
+        res = self.__pool.apply_async(
112
+            self.__pool_callback, (self, request, client_address))
113
+        self.__results.append(res)
114
+        self.close_request(request)
115
+        return
116
+
117
+    def shutdown(self):
118
+        print("Terminating jobs")
119
+        self.__pool.terminate()
120
+        print("Waiting jobs to end...")
121
+        self.__pool.join()
122
+ 
123
+
78 124
 ##@brief WSGIServer implementing ForkingTCPServer.
79 125
 #
80 126
 #Same features than wsgiref.simple_server.WSGIServer but process each requests
81 127
 #in a child process
82 128
 class ForkingWSGIServer(
83
-        wsgiref.simple_server.WSGIServer, socketserver.ForkingTCPServer):
129
+        wsgiref.simple_server.WSGIServer, PoolMixIn):
84 130
     
85 131
     ##@brief static property indicating the max number of childs allowed
86 132
     max_children = 40
87
-
88
-    ##@brief Custom reimplementation of shutdown method in order to ensure
89
-    #that we close all listening sockets
90
-    #
91
-    #This method is here because of a bug (or a missing feature) : 
92
-    #The socketserver implementation force to call the shutdown method
93
-    #from another thread/process else it leads in a deadlock.
94
-    #The problem is that the implementation of shutdown set a private attribute
95
-    #__shutdown_request to true. So we cannot reimplement a method that will
96
-    #just set the flag to True, we have to manually collect each actives 
97
-    #childs. A patch is prepared and will be proposed for cpython upstream.
98
-    def shutdown(self):
99
-        if self.active_children is not None:
100
-            for pid in self.active_children.copy():
101
-                print("Killing : %d"%pid)
102
-                os.kill(pid, KILLING_CHILDS_SIGNAL)
103
-                try:
104
-                    pid, _ = os.waitpid(pid, 0)
105
-                    self.active_children.discard(pid)
106
-                except ChildProcessError:
107
-                    self.active_children.discard(pid)
108
-        self.server_close()
133
+    request_queue_size = 40
134
+    allow_reuse_address = True
109 135
 
110 136
 ##@brief utility function to extract site id from an url
111 137
 def site_id_from_url(url):
@@ -151,8 +177,9 @@ def wsgi_router(env, start_response):
151 177
 
152 178
 ##@brief Starts the server until a SIGINT is received
153 179
 def main_loop():
180
+    multiprocessing.set_start_method('forkserver')
154 181
     LodelContext.expose_modules(globals(), {'lodel.settings': ['Settings']})
155
-    ForkingWSGIServer.max_children = Settings.server.max_children
182
+    ForkingWSGIServer.pool_size = Settings.server.max_children
156 183
     listen_addr = Settings.server.listen_address
157 184
     listen_port = Settings.server.listen_port
158 185
     server = wsgiref.simple_server.make_server(
@@ -161,7 +188,7 @@ def main_loop():
161 188
     #Signal handler to close server properly on sigint
162 189
     def sigint_handler(signal, frame):
163 190
         print("Ctrl-c pressed, exiting")
164
-        server.shutdown()
191
+        #server.shutdown()
165 192
         server.server_close()
166 193
         exit(0)
167 194
     signal.signal(signal.SIGINT, sigint_handler)

Loading…
İptal
Kaydet