Browse Source

Add timeout & watchdog for pool & workers

Yann Weber 4 years ago
parent
commit
bcbbb99770
8 changed files with 370 additions and 78 deletions
  1. 3
    2
      configure.ac
  2. 37
    1
      include/conf.h
  3. 3
    4
      include/pyworker.h
  4. 4
    4
      include/responder.h
  5. 166
    0
      src/conf.c
  6. 22
    1
      src/main.c
  7. 26
    26
      src/pyworker.c
  8. 109
    40
      src/responder.c

+ 3
- 2
configure.ac View File

@@ -51,10 +51,11 @@ AC_C_INLINE
51 51
 
52 52
 # Checks for libraries.
53 53
 AC_CHECK_LIB([fcgi], [FCGI_Accept])
54
+AC_CHECK_LIB([rt], [timer_create])
54 55
 PKG_CHECK_MODULES([CHECK], [check >= 0.9.4])
55 56
 
56 57
 # Checks for header files.
57
-AC_CHECK_HEADERS([fcntl.h limits.h stdlib.h string.h syslog.h unistd.h sem.h])
58
+AC_CHECK_HEADERS([fcntl.h limits.h stdlib.h string.h syslog.h unistd.h sem.h time.h])
58 59
 
59 60
 # Checks for typedefs, structures, and compiler characteristics.
60 61
 AC_TYPE_PID_T
@@ -64,7 +65,7 @@ AC_TYPE_SIZE_T
64 65
 AC_FUNC_FORK
65 66
 AC_FUNC_MALLOC
66 67
 AC_FUNC_REALLOC
67
-AC_CHECK_FUNCS([bzero getcwd gettimeofday memmove strdup strerror strndup semop clone get_current_dir_name])
68
+AC_CHECK_FUNCS([bzero getcwd gettimeofday memmove strdup strerror strndup semop clone get_current_dir_name timer_settime])
68 69
 
69 70
 AM_INIT_AUTOMAKE
70 71
 AC_CONFIG_FILES([Makefile

+ 37
- 1
include/conf.h View File

@@ -44,6 +44,7 @@
44 44
 
45 45
 /**@defgroup ret_status Function & process return status
46 46
  */
47
+#define PYFCGI_TIMEOUT 8
47 48
 #define PYFCGI_ERR 16
48 49
 /**@ingroup ret_status */
49 50
 #define PYFCGI_WORKER_FAIL 32
@@ -118,20 +119,35 @@ typedef struct pyfcgi_context_s pyfcgi_context_t;
118 119
 
119 120
 /**@brief Stores contextual informations about current process */
120 121
 struct pyfcgi_context_s {
122
+
123
+	/**@brief Stores current process PID */
121 124
 	pid_t pid;
125
+	/**@brief Stores parent process PID */
122 126
 	pid_t ppid;
123 127
 
128
+	/**@brief Stores the watchdog timer */
129
+	timer_t wd_timer;
130
+	/**@brief Security timer sending a sigkill */
131
+	timer_t wd_timerkill;
132
+	/**@brief Watchdog delay */
133
+	struct timespec wd_delay;
134
+	/**@brief Watchdog sig restorer */
135
+	struct sigaction wd_oldsig;
136
+	/**@brief watchdog flag */
137
+	short wd;
124 138
 
125 139
 	/**@brief array of worker pids (pool handler context) */
126 140
 	pid_t **wrk_pids;
127 141
 	/**@brief workers count */
128 142
 	unsigned int n_wrk;
129 143
 
144
+	/**@brief semid for worker<->pool semaphore */
145
+	int semid;
146
+
130 147
 	/**@brief Stores python_path (not dupped by python) */
131 148
 	wchar_t python_path[PATH_MAX];
132 149
 	/**@brief Stores venv_path for python home (not dupped by python) */
133 150
 	wchar_t venv_path[PATH_MAX];
134
-
135 151
 	/**@brief Stores a part of the environ (containing wsgi.* keys) */
136 152
 	PyObject *wsgi_dict;
137 153
 };
@@ -165,6 +181,12 @@ struct pyfcgi_conf_s
165 181
 	 * @ingroup conf_glob */
166 182
 	int max_reqs;
167 183
 
184
+	/**@brief Worker timeout in seconds (if 0 no timeout)*/
185
+	time_t worker_timeout;
186
+
187
+	/**@brief Pool timeout in seconds (if 0 no timeout)*/
188
+	time_t pool_timeout;
189
+
168 190
 	/**@brief 0 is silent 1 means logs to stderr */
169 191
 	short verbosity;
170 192
 
@@ -197,4 +219,18 @@ int check_entrypoint_import();
197 219
 
198 220
 int parse_optlog(const char*);
199 221
 
222
+/**@brief Initialize the watchdog
223
+ * @param void (*wd_sig_cleaner)(int) pointer on a signal handler (or NULL to
224
+ * use @ref pyfcgi_default_sighandler
225
+ * @param const struct timespec* delay a pointer on the watchdog timeout
226
+ * @return 0 if no error else -1
227
+ */
228
+int pyfcgi_wd_init(void (*wd_sig_cleaner)(int), const struct timespec *delay);
229
+int pyfcgi_wd_arm();
230
+int pyfcgi_wd_pause();
231
+int pyfcgi_wd_stop();
232
+
233
+/**@brief Watchdog default signal handler */
234
+void pyfcgi_wd_default_sighandler(int signum);
235
+
200 236
 #endif

+ 3
- 4
include/pyworker.h View File

@@ -113,10 +113,9 @@ extern PyObject* response_headers;
113 113
  * @ref response_headers globals.
114 114
  * @param char* python_entrypoint a path to a python entrypoint
115 115
  * @param int worker uid
116
- * @param int semid for FCGI access
117 116
  * @return 0 if exit avec max requests
118 117
  */
119
-int work333(int, int);
118
+int work333(int);
120 119
 
121 120
 /**@brief the function that initialize the alternate python worker
122 121
  * @ingroup worker_process
@@ -125,10 +124,9 @@ int work333(int, int);
125 124
  * FCGI. This function clones for each request, running worker_piper()
126 125
  * @param char* python_entrypoint a path to a python entrypoint
127 126
  * @param int worker uid
128
- * @param int semid for FCGI access
129 127
  * @return 0 if exit avec max requests
130 128
  */
131
-int work(int, int);
129
+int work(int);
132 130
 
133 131
 /**@brief function for a subprocess designed to read stdin & stdout from
134 132
  * python & forward them to syslog or FCGI
@@ -168,6 +166,7 @@ void worker_log_pipes(int, int, PyObject*[2]);
168 166
 int ctl_get_rep_sz(int, size_t*);
169 167
 
170 168
 void worker_sighandler(int);
169
+void worker_sigalrmhandler(int signum);
171 170
 
172 171
 #endif
173 172
 

+ 4
- 4
include/responder.h View File

@@ -58,8 +58,6 @@
58 58
 #include "pyworker.h"
59 59
 
60 60
 
61
-extern int pyfcgi_semid;
62
-
63 61
 void init_context();
64 62
 
65 63
 /**@brief The responder loop
@@ -81,10 +79,9 @@ int responder_loop();
81 79
  * @ingroup work_master_proc
82 80
  * Spawn a new worker process and prepare ENV & request forwarding
83 81
  * @param int worker uid
84
- * @param int semid for FCGI access
85 82
  * @return child PID
86 83
  */
87
-pid_t spawn(int, int);
84
+pid_t spawn(int);
88 85
 
89 86
 /**@brief Generate a new semaphore from given key
90 87
  * @ingroup work_master_proc
@@ -101,4 +98,7 @@ void clean_exit(int);
101 98
 /**@brief Handle signals and forward it to workers */
102 99
 void pool_sighandler(int signum);
103 100
 
101
+/**@brief Handle watchdog alarm signal */
102
+void pool_wd_sighandler(int signum);
103
+
104 104
 #endif

+ 166
- 0
src/conf.c View File

@@ -46,6 +46,8 @@ void default_conf()
46 46
 	PyFCGI_conf.max_reqs = 1000;
47 47
 	PyFCGI_conf.pep333 = 1;
48 48
 	PyFCGI_conf.verbosity = 0;
49
+	PyFCGI_conf.pool_timeout = 5;
50
+	PyFCGI_conf.worker_timeout = 3;
49 51
 }
50 52
 
51 53
 int parse_args(int argc, char *argv[])
@@ -202,3 +204,167 @@ int parse_optlog(const char* logspec)
202 204
 	return 0;
203 205
 }
204 206
 
207
+int pyfcgi_wd_init(void (*wd_sig_cleaner)(int), const struct timespec *delay)
208
+{
209
+	struct sigaction act;
210
+	struct sigevent sev;
211
+	pyfcgi_context_t *context;
212
+	int err;
213
+
214
+	context = &(PyFCGI_conf.context);
215
+
216
+	PyFCGI_conf.context.wd_delay.tv_sec = delay->tv_sec;
217
+	PyFCGI_conf.context.wd_delay.tv_nsec = delay->tv_nsec;
218
+	pyfcgi_log(LOG_DEBUG, "Set watchdog with %ds timeout", PyFCGI_conf.context.wd_delay.tv_sec);
219
+
220
+	// Creating new timer with default sigevent (SIGEV_SIGNAL with SIGALRM)
221
+	if(timer_create(CLOCK_REALTIME, NULL, &(context->wd_timer)) < 0)
222
+	{
223
+		err = errno;
224
+		pyfcgi_log(LOG_ALERT, "Unable to create watchdog timer : %s",
225
+			strerror(err));
226
+		errno = err;
227
+		return -1;
228
+	}
229
+	// Creating a new timer sending SIGKILL after 1.5 delay
230
+	sev.sigev_notify = SIGEV_SIGNAL;
231
+	sev.sigev_signo = SIGKILL;
232
+	if(timer_create(CLOCK_REALTIME, &sev, &(context->wd_timerkill)) < 0)
233
+	{
234
+		err = errno;
235
+		pyfcgi_log(LOG_ALERT, "Unable to create kill watchdog timer : %s",
236
+			strerror(err));
237
+		errno = err;
238
+		return -1;
239
+	}
240
+
241
+	// Registering SIGALRM signal handler
242
+	act.sa_handler = wd_sig_cleaner?wd_sig_cleaner:pyfcgi_wd_default_sighandler;
243
+	sigemptyset(&act.sa_mask);
244
+	act.sa_flags = 0;
245
+	act.sa_restorer = NULL;
246
+	if(sigaction(SIGALRM, &act, &(context->wd_oldsig)) < 0)
247
+	{
248
+		err = errno;
249
+		pyfcgi_log(LOG_ALERT, "Unable to set the signal handler for watchdog timer : %s",
250
+			strerror(err));
251
+		errno = err;
252
+		return -1;
253
+	}
254
+	context->wd = 1;
255
+	return 0;
256
+}
257
+
258
+int pyfcgi_wd_arm()
259
+{
260
+	pyfcgi_context_t *context;
261
+	struct itimerspec timeout;
262
+	int err, res;
263
+
264
+	context = &(PyFCGI_conf.context);
265
+
266
+	if(!context->wd) { return 0; }
267
+
268
+	timeout.it_value = context->wd_delay;
269
+	timeout.it_interval.tv_sec = 0;
270
+	timeout.it_interval.tv_nsec = 0;
271
+
272
+	context = &(PyFCGI_conf.context);
273
+	res = 0;
274
+
275
+	if(timer_settime(context->wd_timer, 0, &timeout, NULL) < 0)
276
+	{
277
+		err = errno;
278
+		pyfcgi_log(LOG_ALERT, "Unable to arm watchdog : %s",
279
+			strerror(err));
280
+		res = -1;
281
+	}
282
+
283
+	timeout.it_value.tv_sec = (time_t)(timeout.it_value.tv_sec * 1.5);
284
+	timeout.it_value.tv_nsec = (long)(timeout.it_value.tv_nsec * 1.5);
285
+	timeout.it_interval = timeout.it_value;
286
+
287
+	if(timer_settime(context->wd_timerkill, 0, &timeout, NULL) < 0)
288
+	{
289
+		err = errno;
290
+		pyfcgi_log(LOG_ALERT, "Unable to pause killer watchdog : %s",
291
+			strerror(err));
292
+		res = -1;
293
+	}
294
+	return res;
295
+}
296
+
297
+int pyfcgi_wd_pause()
298
+{
299
+	pyfcgi_context_t *context;
300
+	struct itimerspec zero;
301
+	int err, res;
302
+
303
+	context = &(PyFCGI_conf.context);
304
+	if(!context->wd) { return 0; }
305
+
306
+	memset(&zero, 0, sizeof(struct itimerspec));
307
+
308
+	res = 0;
309
+
310
+	if(timer_settime(context->wd_timer, TIMER_ABSTIME, &zero, NULL) < 0)
311
+	{
312
+		err = errno;
313
+		pyfcgi_log(LOG_ALERT, "Unable to pause watchdog : %s",
314
+			strerror(err));
315
+		res = -1;
316
+	}
317
+	if(timer_settime(context->wd_timerkill, TIMER_ABSTIME, &zero, NULL) < 0)
318
+	{
319
+		err = errno;
320
+		pyfcgi_log(LOG_ALERT, "Unable to pause killer watchdog : %s",
321
+			strerror(err));
322
+		res = -1;
323
+	}
324
+	return res;
325
+}
326
+
327
+int pyfcgi_wd_stop()
328
+{	
329
+	int err, res;
330
+	pyfcgi_context_t *context;
331
+
332
+	context = &(PyFCGI_conf.context);
333
+	if(!context->wd) { return 0; }
334
+	res = 0;
335
+
336
+	if(timer_delete(context->wd_timer) < 0)
337
+	{
338
+		err = errno;
339
+		pyfcgi_log(LOG_WARNING, "Unable to delete watchdog timer : %s",
340
+			strerror(err));
341
+		res = -1;
342
+	}
343
+	if(timer_delete(context->wd_timerkill) < 0)
344
+	{
345
+		err = errno;
346
+		pyfcgi_log(LOG_WARNING, "Unable to delete killer watchdog timer : %s",
347
+			strerror(err));
348
+		res = -1;
349
+	}
350
+	if(sigaction(SIGALRM, &(context->wd_oldsig), NULL) < 0)
351
+	{
352
+		err = errno;
353
+		pyfcgi_log(LOG_WARNING, "Unable to restore signal handler : %s",
354
+			strerror(err));
355
+		res = -1;
356
+	}
357
+	context->wd = 0;
358
+pyfcgi_log(LOG_DEBUG, "Watchdog stopped");
359
+	return res;
360
+}
361
+
362
+void pyfcgi_wd_default_sighandler(int signum)
363
+{
364
+	pyfcgi_log(LOG_ALERT, "Timeout after %d.%09ds",
365
+		PyFCGI_conf.context.wd_delay.tv_sec,
366
+		PyFCGI_conf.context.wd_delay.tv_nsec);
367
+	pyfcgi_wd_stop();
368
+	exit(PYFCGI_TIMEOUT);
369
+}
370
+

+ 22
- 1
src/main.c View File

@@ -56,7 +56,12 @@ pid_t pool_handler_pid;
56 56
 void sighandler(int signum)
57 57
 {
58 58
 	int ret;
59
-	if(signum == SIGINT)
59
+	if(signum == SIGALRM)
60
+	{
61
+		pyfcgi_log(LOG_WARNING, "Master process received SIGALRM !");
62
+		return;
63
+	}
64
+	else if(signum == SIGINT)
60 65
 	{
61 66
 		pyfcgi_log(LOG_INFO,
62 67
 			"Master process received ctrl+c, exiting...");
@@ -77,6 +82,13 @@ void sighandler(int signum)
77 82
 	exit(0);
78 83
 }
79 84
 
85
+void debug_sighandler(int signum)
86
+{
87
+	pyfcgi_log(LOG_WARNING, "Master process received signal %s(%d)",
88
+		strsignal(signum), signum);
89
+	return;
90
+}
91
+
80 92
 int main(int argc, char **argv)
81 93
 {
82 94
 	int child_ret;
@@ -94,6 +106,15 @@ int main(int argc, char **argv)
94 106
 		perror("Sigaction error");
95 107
 		exit(4);
96 108
 	}
109
+	sigfillset(&act.sa_mask);
110
+	sigdelset(&act.sa_mask, SIGINT);
111
+	sigdelset(&act.sa_mask, SIGTERM);
112
+	act.sa_handler = debug_sighandler;
113
+	if(sigaction(SIGALRM, &act, NULL))
114
+	{
115
+		perror("Sigaction2 error");
116
+		exit(4);
117
+	}
97 118
 
98 119
 
99 120
 	pool_handler_pid = 0;

+ 26
- 26
src/pyworker.c View File

@@ -20,12 +20,10 @@
20 20
 #include "pyworker.h"
21 21
 
22 22
 
23
-/**@brief Indicate that a worker is idle
24
- * @param int semid */
25
-static inline void worker_set_idle(int);
26
-/**@brief Indicate that a worker is busy
27
- * @param int semid */
28
-static inline void worker_set_busy(int);
23
+/**@brief Indicate that a worker is idle */
24
+static inline void worker_set_idle();
25
+/**@brief Indicate that a worker is busy */
26
+static inline void worker_set_busy();
29 27
 
30 28
 /**@brief Process results from a pep333 worker
31 29
  * @param FCGX_Stream* out stream from libFCGI
@@ -36,7 +34,7 @@ static inline int work333_send_result(FCGX_Stream*, PyObject* ret);
36 34
 
37 35
 static int worker_piper_sigrcv = 0;
38 36
 
39
-int work333(int wrk_id, int semid)
37
+int work333(int wrk_id)
40 38
 {
41 39
 	PyObject *entry_fun, *pyflush[2], *py_osmod, *entry_ret, *environ,
42 40
 		*start_response, *args;
@@ -44,14 +42,10 @@ int work333(int wrk_id, int semid)
44 42
 	char **envp;
45 43
 	int count, pipe_out[2], pipe_err[2];
46 44
 	int max_reqs;
47
-	char ident[128];
48 45
 	struct timeval start, stop;
49 46
 
50 47
 	max_reqs = PyFCGI_conf.max_reqs;
51 48
 
52
-	snprintf(ident, 128, "Worker[%d]", wrk_id);
53
-	pyfcgi_logger_set_ident(ident);
54
-
55 49
 	pyfcgi_log(LOG_INFO, "Worker started with PEP333 App");
56 50
 
57 51
 	pyinit();
@@ -67,7 +61,7 @@ int work333(int wrk_id, int semid)
67 61
 	pyfcgi_log(LOG_INFO, "Waiting request with %s.%s()",
68 62
 		PyFCGI_conf.py_entrymod, PyFCGI_conf.py_entryfun);
69 63
 
70
-	worker_set_idle(semid); //before failing on import
64
+	worker_set_idle(); //before failing on import
71 65
 
72 66
 	if(!entry_fun) //but exit if import failed
73 67
 	{
@@ -83,8 +77,9 @@ int work333(int wrk_id, int semid)
83 77
 	while ((!count || count != max_reqs) &&
84 78
 		FCGX_Accept(&in_stream, &out_stream, &err_stream, &envp) >= 0)
85 79
 	{
80
+		pyfcgi_wd_arm();
86 81
 		gettimeofday(&start, NULL);
87
-		worker_set_busy(semid);
82
+		worker_set_busy();
88 83
 		count++;
89 84
 		environ = update_pyenv(py_osmod, envp);
90 85
 
@@ -126,7 +121,7 @@ int work333(int wrk_id, int semid)
126 121
 		FCGX_FClose(in_stream);
127 122
 		FCGX_FClose(err_stream);
128 123
 		FCGI_Finish();
129
-		worker_set_idle(semid);
124
+		worker_set_idle();
130 125
 		gettimeofday(&stop, NULL);
131 126
 		stop.tv_sec = stop.tv_sec - start.tv_sec;
132 127
 		stop.tv_usec = stop.tv_usec - start.tv_usec;
@@ -135,6 +130,7 @@ int work333(int wrk_id, int semid)
135 130
 			stop.tv_usec += 1000000;
136 131
 			stop.tv_sec -= 1;
137 132
 		}
133
+		pyfcgi_wd_pause();
138 134
 		pyfcgi_log(LOG_DEBUG, "Worker[%d] request %d END [OK] %lu bytes in %ld.%06lds",
139 135
 			wrk_id, count, libpyfcgi.rep_sz, stop.tv_sec, stop.tv_usec);
140 136
 	}
@@ -146,7 +142,7 @@ static inline int work333_send_result(FCGX_Stream *out, PyObject* ret)
146 142
 	return 0;
147 143
 }
148 144
 
149
-int work(int wrk_id, int semid)
145
+int work(int wrk_id)
150 146
 {
151 147
 	PyObject *entry_fun, *pystdout_flush, *pystderr_flush, *py_osmod,
152 148
 		*environ;
@@ -162,10 +158,6 @@ int work(int wrk_id, int semid)
162 158
 	piper_args_t piper_args;
163 159
 	char *piper_stack;
164 160
 
165
-	char ident[128];
166
-	snprintf(ident, 128, "Worker[%d]", wrk_id);
167
-	pyfcgi_logger_set_ident(ident);
168
-
169 161
 	max_reqs = PyFCGI_conf.max_reqs;
170 162
 	piper_args.wrk_id = wrk_id;
171 163
 	piper_args.act = &act;
@@ -215,7 +207,7 @@ int work(int wrk_id, int semid)
215 207
 		PyFCGI_conf.py_entrymod, PyFCGI_conf.py_entryfun);
216 208
 
217 209
 
218
-	worker_set_idle(semid); //before failing on import
210
+	worker_set_idle(); //before failing on import
219 211
 
220 212
 	if(!entry_fun) //but exit if import failed
221 213
 	{
@@ -237,8 +229,9 @@ int work(int wrk_id, int semid)
237 229
 	count = 0;
238 230
 	while ((!count || count != max_reqs) && FCGX_Accept(&in_stream, &out_stream, &err_stream, &envp) >= 0)
239 231
 	{
232
+		pyfcgi_wd_arm();
240 233
 		gettimeofday(&start, NULL);
241
-		worker_set_busy(semid);
234
+		worker_set_busy();
242 235
 
243 236
 		count++;
244 237
 		piper_args.out = out_stream;
@@ -304,7 +297,7 @@ int work(int wrk_id, int semid)
304 297
 			rep_sz = 0;
305 298
 		}
306 299
 		//Increase sem showing the worker is idle
307
-		worker_set_idle(semid);
300
+		worker_set_idle();
308 301
 		gettimeofday(&stop, NULL);
309 302
 		stop.tv_sec = stop.tv_sec - start.tv_sec;
310 303
 		stop.tv_usec = stop.tv_usec - start.tv_usec;
@@ -313,6 +306,7 @@ int work(int wrk_id, int semid)
313 306
 			stop.tv_usec += 1000000;
314 307
 			stop.tv_sec -= 1;
315 308
 		}
309
+		pyfcgi_wd_pause();
316 310
 		pyfcgi_log(LOG_DEBUG, "Worker[%d] request %d END [OK] %lu bytes in %ld.%06lds",
317 311
 			wrk_id, count, rep_sz, stop.tv_sec, stop.tv_usec);
318 312
 	}
@@ -573,7 +567,7 @@ int ctl_get_rep_sz(int ctl_pipe, size_t* rep_sz)
573 567
 	return 0;
574 568
 }
575 569
 
576
-static void worker_set_idle(int semid)
570
+static void worker_set_idle()
577 571
 {
578 572
 	int err;
579 573
 	struct sembuf sop;
@@ -582,7 +576,7 @@ static void worker_set_idle(int semid)
582 576
 	sop.sem_flg = 0;
583 577
 	sop.sem_op = 1;
584 578
 
585
-	if(semop(semid, &sop, 1) < 0)
579
+	if(semop(PyFCGI_conf.context.semid, &sop, 1) < 0)
586 580
 	{
587 581
 		err = errno;
588 582
 		pyfcgi_log(LOG_ERR, "error incrementing the semaphore : %s",
@@ -590,7 +584,7 @@ static void worker_set_idle(int semid)
590 584
 	}
591 585
 }
592 586
 
593
-static void worker_set_busy(int semid)
587
+static void worker_set_busy()
594 588
 {
595 589
 	int err;
596 590
 	struct sembuf sop;
@@ -599,7 +593,7 @@ static void worker_set_busy(int semid)
599 593
 	sop.sem_flg = 0;
600 594
 	sop.sem_op = -1;
601 595
 
602
-	if(semop(semid, &sop, 1) < 0)
596
+	if(semop(PyFCGI_conf.context.semid, &sop, 1) < 0)
603 597
 	{
604 598
 		err = errno;
605 599
 		pyfcgi_log(LOG_ERR, "error incrementing the semaphore : %s",
@@ -612,3 +606,9 @@ void worker_sighandler(int signum)
612 606
 	pyfcgi_log(LOG_INFO, "%s signal received, exiting...", strsignal(signum));
613 607
 	exit(0);
614 608
 }
609
+
610
+void worker_sigalrmhandler(int signum)
611
+{
612
+	pyfcgi_log(LOG_WARNING, "Timeout, exiting...");
613
+	exit(PYFCGI_TIMEOUT);
614
+}

+ 109
- 40
src/responder.c View File

@@ -17,11 +17,12 @@
17 17
  * along with PyFCGI.  If not, see <http://www.gnu.org/licenses/>.
18 18
  */
19 19
 #include "responder.h"
20
-int pyfcgi_semid = 0;
21 20
 
22 21
 void init_context()
23 22
 {
24
-	pyfcgi_semid = 0;
23
+	PyFCGI_conf.context.semid = 0;
24
+	PyFCGI_conf.context.pid = getpid();
25
+	PyFCGI_conf.context.ppid = getppid();
25 26
 }
26 27
 
27 28
 int responder_loop()
@@ -32,7 +33,10 @@ int responder_loop()
32 33
 	int status;
33 34
 	pid_t ret;
34 35
 	struct sembuf sop;
36
+	/**@brief poll timeout */
35 37
 	struct timespec timeout;
38
+	/**@brief watchdog timeout */
39
+	struct timespec pool_timeout;
36 40
 	short idle;
37 41
 	struct sigaction act;
38 42
 
@@ -56,10 +60,20 @@ int responder_loop()
56 60
 	idle = 0;
57 61
 
58 62
 	pyfcgi_logger_set_ident("pool");
63
+
64
+	if(PyFCGI_conf.pool_timeout)
65
+	{
66
+		pool_timeout.tv_nsec = 0;
67
+		pool_timeout.tv_sec = PyFCGI_conf.pool_timeout;
68
+		pyfcgi_wd_init(pool_wd_sighandler, &pool_timeout);
69
+	}
70
+
59 71
 	pyfcgi_log(LOG_INFO, "Preparing workers");
60 72
 
61 73
 	init_context();
62 74
 
75
+	pyfcgi_wd_arm();
76
+
63 77
 	PyFCGI_conf.context.wrk_pids = &wrk_pids;
64 78
 	PyFCGI_conf.context.n_wrk = 0;
65 79
 	wrk_pids = malloc(sizeof(int) * PyFCGI_conf.max_wrk);
@@ -73,14 +87,14 @@ int responder_loop()
73 87
 	}
74 88
 	bzero(wrk_pids, sizeof(int) * PyFCGI_conf.max_wrk);
75 89
 
76
-	semid = new_semaphore();
90
+	PyFCGI_conf.context.semid = semid = new_semaphore();
77 91
 	
78 92
 	wanted_n = PyFCGI_conf.min_wrk;
79 93
 	n_wrk = 0;
80 94
 	// prespawning minimum worker count
81 95
 	for(n_wrk=0; n_wrk < wanted_n; n_wrk++)
82 96
 	{
83
-		wrk_pids[n_wrk] = spawn(n_wrk, semid);
97
+		wrk_pids[n_wrk] = spawn(n_wrk);
84 98
 		PyFCGI_conf.context.n_wrk = n_wrk;
85 99
 	}
86 100
 	//Wait at least for a process to be ready
@@ -90,6 +104,7 @@ int responder_loop()
90 104
 	// spawn new one if needed, etc.
91 105
 	while(1)
92 106
 	{
107
+		pyfcgi_wd_arm();
93 108
 		PyFCGI_conf.context.n_wrk = n_wrk;
94 109
 		if( (ret = waitpid(0, &status, WNOHANG)) )
95 110
 		{
@@ -97,10 +112,6 @@ int responder_loop()
97 112
 			{
98 113
 				//TODO : error
99 114
 			}
100
-			if(!ret)
101
-			{
102
-				continue;
103
-			}
104 115
 			for(n=0; n<n_wrk; n++)
105 116
 			{
106 117
 				if(wrk_pids[n] == ret)
@@ -117,15 +128,18 @@ int responder_loop()
117 128
 			}
118 129
 			idle=0;
119 130
 			sop.sem_op = -1;
120
-			ret = semop(semid, &sop, 1);
131
+			ret = semtimedop(semid, &sop, 1, &timeout);
121 132
 			sop.sem_op = 0;
122 133
 			if(ret < 0)
123 134
 			{
124 135
 				err = errno;
125
-				pyfcgi_log(LOG_ALERT,
126
-				       "Unable to dec sem after child exit : %s",
127
-				       strerror(err));
128
-				clean_exit(err);
136
+				if(err != EAGAIN) //can fail if wrokers timeout
137
+				{
138
+					pyfcgi_log(LOG_ALERT,
139
+					       "Unable to dec sem after child exit : %s",
140
+					       strerror(err));
141
+					clean_exit(err);
142
+				}
129 143
 			}
130 144
 			if(status)
131 145
 			{
@@ -164,22 +178,35 @@ int responder_loop()
164 178
 				       "Worker[%d] PID %d exited normally",
165 179
 				       n, wrk_pids[n]);
166 180
 			}
167
-			// child stopped, looking for it
168
-			if(wanted_n < n_wrk)
169
-			{	// need to shift the list and dec n_wrk
170
-				pyfcgi_log(LOG_DEBUG, "GC Workers");
171
-pyfcgi_log( LOG_DEBUG, "GC want %d have %d", wanted_n, n_wrk);
172
-				memmove(wrk_pids+n, wrk_pids+n+1,
173
-				        sizeof(pid_t) * (n_wrk - n));
174
-				n_wrk--;
181
+
182
+			// respawn on same slot
183
+			pyfcgi_log(LOG_INFO, "respawn #%d", n);
184
+			wrk_pids[n] = spawn(n);
185
+
186
+			
187
+		}
188
+		// Stopping & deleting useless childs
189
+		if(wanted_n < n_wrk)
190
+		{	// need to shift the list and dec n_wrk
191
+			pyfcgi_log(LOG_DEBUG, "GC Workers");
192
+			n_wrk--;
193
+			kill(wrk_pids[n_wrk], SIGTERM);
194
+			nanosleep(&timeout, NULL);
195
+			kill(wrk_pids[n_wrk], SIGKILL);
196
+			nanosleep(&timeout, NULL);
197
+			if( (ret = waitpid(wrk_pids[n_wrk], &status, WNOHANG)) < 0 )
198
+			{
199
+				pyfcgi_log(LOG_ERR, "Unable to kill child %d (PID %d)",
200
+					n_wrk, wrk_pids[n_wrk]);
175 201
 			}
176 202
 			else
177
-			{	// respawn on same slot
178
-				pyfcgi_log(LOG_INFO, "respawn #%d", n);
179
-				wrk_pids[n] = spawn(n, semid);
180
-				continue;
203
+			{
204
+				pyfcgi_log(LOG_INFO, "worker[%d](%d) killed",
205
+					n_wrk, wrk_pids[n_wrk]);
181 206
 			}
207
+			continue;
182 208
 		}
209
+
183 210
 		ret = semtimedop(semid, &sop, 1, &timeout);
184 211
 //pyfcgi_log( LOG_DEBUG, "semtimeop ret=%d want %d have %d", ret, wanted_n, n_wrk);
185 212
 		if(ret < 0)
@@ -203,18 +230,26 @@ pyfcgi_log( LOG_DEBUG, "GC want %d have %d", wanted_n, n_wrk);
203 230
 			pyfcgi_log(LOG_ERR, "Unable to read semaphore : %s",
204 231
 			       strerror(err));
205 232
 		}
206
-		if(!ret && n_wrk < PyFCGI_conf.max_wrk)
233
+		if(!ret)
207 234
 		{
208
-			idle=0;
209
-			pyfcgi_log( LOG_DEBUG,
210
-				"All workers busy, spawning a new one");
211
-			n = n_wrk;
212
-			n_wrk++;
213
-			wanted_n = n_wrk;
214
-			wrk_pids[n] = spawn(n, semid);
235
+			if(n_wrk < PyFCGI_conf.max_wrk)
236
+			{
237
+				idle=0;
238
+				pyfcgi_log( LOG_DEBUG,
239
+					"All workers busy, spawning a new one");
240
+				n = n_wrk;
241
+				n_wrk++;
242
+				wanted_n = n_wrk;
243
+				wrk_pids[n] = spawn(n);
244
+			}
245
+			else
246
+			{
247
+				nanosleep(&timeout, NULL);
248
+			}
215 249
 		}
216 250
 	}
217 251
 	
252
+	pyfcgi_wd_arm();
218 253
 	//Debug wait & exit
219 254
 	for(; n_wrk != 0; n_wrk--)
220 255
 	{
@@ -224,15 +259,18 @@ pyfcgi_log( LOG_DEBUG, "GC want %d have %d", wanted_n, n_wrk);
224 259
 		PyFCGI_conf.context.n_wrk = n_wrk;
225 260
 	}
226 261
 		//printf("Content-Type: text/html\r\n\r\nHello world !\n");
262
+	pyfcgi_wd_stop();
227 263
 	pyfcgi_log(LOG_INFO,"Child workers stoped, stopping responder");
228 264
 	exit(0);
229 265
 }
230 266
 
231
-pid_t spawn(int wrk_id, int semid)
267
+pid_t spawn(int wrk_id)
232 268
 {
233 269
 	pid_t res;
234 270
 	struct timespec timeout;
271
+	struct timespec wd_timeout;
235 272
 	struct sigaction act;
273
+	char ident[128];
236 274
 
237 275
 	timeout.tv_sec = 0;
238 276
 	timeout.tv_nsec = 100000000;
@@ -254,19 +292,29 @@ pid_t spawn(int wrk_id, int semid)
254 292
 	else if(!res)
255 293
 	{
256 294
 		// Child process
295
+		snprintf(ident, 128, "Worker[%d]", wrk_id);
296
+		pyfcgi_logger_set_ident(ident);
297
+		// Set handler for SIGINT & SIGTERM
257 298
 		if(sigaction(SIGINT, &act, NULL))
258 299
 		{
259 300
 			perror("Sigaction error for pool process");
260 301
 			exit(PYFCGI_FATAL);
261 302
 		}
303
+		// Set watchdog
304
+		if(PyFCGI_conf.worker_timeout)
305
+		{
306
+			wd_timeout.tv_nsec = 0;
307
+			wd_timeout.tv_sec = PyFCGI_conf.worker_timeout;
308
+			pyfcgi_wd_init(worker_sigalrmhandler, &wd_timeout);
309
+		}
262 310
 
263 311
 		if(PyFCGI_conf.pep333)
264 312
 		{
265
-			exit(work333(wrk_id, semid));
313
+			exit(work333(wrk_id));
266 314
 		}
267 315
 		else
268 316
 		{
269
-			exit(work(wrk_id, semid));
317
+			exit(work(wrk_id));
270 318
 		}
271 319
 	}
272 320
 	// Sleep to avoid spawning like hell thinking all workers are
@@ -292,20 +340,21 @@ int new_semaphore(key_t semkey)
292 340
 			strerror(err));
293 341
 		clean_exit(err);
294 342
 	}
295
-	if(pyfcgi_semid)
343
+	if(PyFCGI_conf.context.semid)
296 344
 	{
297 345
 		pyfcgi_log(	LOG_WARNING,
298 346
 			"The semid context was not zero when calling new_semaphore, attempt to closeing it.");
299
-		semctl(pyfcgi_semid, 0, IPC_RMID);
347
+		semctl(PyFCGI_conf.context.semid, 0, IPC_RMID);
300 348
 
301 349
 	}
302
-	pyfcgi_semid = semid;
350
+	PyFCGI_conf.context.semid = semid;
303 351
 	return semid;
304 352
 }
305 353
 
306 354
 void clean_exit(int status)
307 355
 {
308
-	if(pyfcgi_semid && semctl(pyfcgi_semid, 0, IPC_RMID) == -1)
356
+	if(PyFCGI_conf.context.semid &&
357
+		semctl(PyFCGI_conf.context.semid, 0, IPC_RMID) == -1)
309 358
 	{
310 359
 		/*silentely fails (when exiting from sighandler)
311 360
 		pyfcgi_log(	LOG_CRIT,
@@ -342,3 +391,23 @@ void pool_sighandler(int signum)
342 391
 	}
343 392
 	clean_exit(0);
344 393
 }
394
+
395
+void pool_wd_sighandler(int signum)
396
+{
397
+	unsigned int i;
398
+	pyfcgi_log(LOG_ALERT, "Worker pool timeout ! Attempt to kill all childs");
399
+	for(i=0; i<PyFCGI_conf.context.n_wrk; i++)
400
+	{
401
+		pyfcgi_log(LOG_ALERT, "Child[%d] PID %d", i, (*PyFCGI_conf.context.wrk_pids)[i]);
402
+		kill((*PyFCGI_conf.context.wrk_pids)[i], SIGALRM);
403
+	}
404
+	while(PyFCGI_conf.context.n_wrk)
405
+	{
406
+		kill((*PyFCGI_conf.context.wrk_pids)[PyFCGI_conf.context.n_wrk], SIGALRM);
407
+		PyFCGI_conf.context.n_wrk--;
408
+	}
409
+	pyfcgi_wd_stop();
410
+	kill(PyFCGI_conf.context.pid, SIGTERM);
411
+	clean_exit(PYFCGI_TIMEOUT);
412
+	exit(PYFCGI_TIMEOUT);
413
+}

Loading…
Cancel
Save