Browse Source

Using clone for piper & use ctl pipe to read response size

Yann Weber 4 years ago
parent
commit
2bdf83fc43
2 changed files with 165 additions and 28 deletions
  1. 139
    25
      src/pyworker.c
  2. 26
    3
      src/pyworker.h

+ 139
- 25
src/pyworker.c View File

@@ -25,13 +25,21 @@ int work(char* py_entrypoint, int wrk_id, int semid, int max_reqs)
25 25
 {
26 26
 	PyObject *entry_fun, *pystdout_flush, *pystderr_flush,
27 27
 	         *py_osmod;
28
+	FCGX_Stream *in_stream, *out_stream, *err_stream;
29
+	char **envp;
28 30
 	int count, pipe_out[2], pipe_err[2], pipe_ctl[2], err, piper_status;
29 31
 	struct sembuf sop;
30 32
 	struct sigaction act;
31 33
 	struct timeval start, stop;
32 34
 	sigset_t emptyset;
33 35
 	char buf[PIPE_BUF];
36
+	size_t rep_sz;
37
+	piper_args_t piper_args;
38
+	char *piper_stack;
34 39
 
40
+
41
+	piper_args.wrk_id = wrk_id;
42
+	piper_args.act = &act;
35 43
 	sop.sem_num = 0;
36 44
 	sop.sem_flg = 0;
37 45
 
@@ -50,12 +58,23 @@ int work(char* py_entrypoint, int wrk_id, int semid, int max_reqs)
50 58
 	act.sa_restorer = NULL;
51 59
 
52 60
 
61
+	if( !(piper_stack = malloc(PIPER_STACK_SZ)) )
62
+	{
63
+		err = errno;
64
+		syslog(LOG_ALERT, "Error while allocating piper stack : %s",
65
+		       strerror(err));
66
+		exit(err);
67
+	}
68
+
53 69
 	syslog(	LOG_INFO,
54 70
 		"Worker %d started", wrk_id);
55 71
 
56 72
 	update_python_path(); // add cwd to python path
57 73
 	Py_Initialize(); // "start" python
58 74
 	update_python_fd(pipe_out, pipe_err);
75
+	piper_args.pystdout = pipe_out[0];
76
+	piper_args.pystderr = pipe_err[0];
77
+
59 78
 	fetch_pyflush(&pystdout_flush, &pystderr_flush);
60 79
 	syslog(	LOG_INFO,
61 80
 		"Worker[%d] Python started", wrk_id);
@@ -94,9 +113,10 @@ int work(char* py_entrypoint, int wrk_id, int semid, int max_reqs)
94 113
 		       wrk_id, strerror(err));
95 114
 		exit(err);
96 115
 	}
116
+	piper_args.ctl_pipe = pipe_ctl[1];
97 117
 
98 118
 	count = 0;
99
-	while ((!count || count != max_reqs) && FCGI_Accept() >= 0)
119
+	while ((!count || count != max_reqs) && FCGX_Accept(&in_stream, &out_stream, &err_stream, &envp) >= 0)
100 120
 	{
101 121
 		gettimeofday(&start, NULL);
102 122
 		sop.sem_op = -1; // decrementing sem to show worker busy
@@ -113,8 +133,20 @@ int work(char* py_entrypoint, int wrk_id, int semid, int max_reqs)
113 133
 		syslog(	LOG_INFO,
114 134
 			"Worker[%d] request %d", wrk_id, count);
115 135
 		*/
136
+		piper_args.out = out_stream;
137
+		//piper_args.req_id = count;
116 138
 		worker_piper_sigrcv = 0;
139
+		/*
117 140
 		pid_t pid = fork();
141
+		if(!pid) { exit(worker_piper(&piper_args)); }
142
+		*/
143
+
144
+
145
+		pid_t pid = clone(worker_piper, piper_stack + PIPER_STACK_SZ - 1,
146
+		                  CLONE_CHILD_CLEARTID | CLONE_CHILD_SETTID | \
147
+				  SIGCHLD | \
148
+				  CLONE_FILES | CLONE_FS | CLONE_IO | CLONE_VM,
149
+				  &piper_args);
118 150
 		if(pid < 0)
119 151
 		{
120 152
 			err = errno;
@@ -123,23 +155,17 @@ int work(char* py_entrypoint, int wrk_id, int semid, int max_reqs)
123 155
 			       wrk_id, count, strerror(err));
124 156
 			exit(err);
125 157
 		}
158
+		/*
126 159
 		if(!pid)
127 160
 		{
128
-			if(sigaction(WPIPER_SIG, &act, NULL))
129
-			{
130
-				err = errno;
131
-				syslog(	LOG_ALERT, "Unable to sigaction in piper : %s",
132
-					strerror(err));
133
-				exit(err);
134
-			}
135
-			write(pipe_ctl[1], " ", 1);
136
-	
161
+			
137 162
 			worker_piper(wrk_id, count, pipe_out[0], pipe_err[0],
138
-			             pipe_ctl[1]);
163
+			             pipe_ctl[1], out_stream);
139 164
 			//printf("Content-type: text/html\r\n\r\nHello world !\n");
140 165
 			exit(1);
141 166
 		}
142
-		update_pyenv(py_osmod);
167
+		*/
168
+		update_pyenv(py_osmod, envp);
143 169
 		//close(pipe_ctl[1]);
144 170
 		PyObject_CallObject(entry_fun, NULL);
145 171
 		if(PyErr_Occurred())
@@ -156,6 +182,8 @@ int work(char* py_entrypoint, int wrk_id, int semid, int max_reqs)
156 182
 		PyObject_CallObject(pystdout_flush, NULL);
157 183
 		PyObject_CallObject(pystderr_flush, NULL);
158 184
 		read(pipe_ctl[0], &buf, 1); // unblock when child ready
185
+		FCGX_FClose(in_stream);
186
+		FCGX_FClose(err_stream);
159 187
 		//close(pipe_ctl[0]);
160 188
 
161 189
 		kill(pid, WPIPER_SIG); //indicate child python call ended
@@ -184,6 +212,10 @@ syslog(LOG_DEBUG, "W%dR%dtimer W8  %ld.%06ld us", wrk_id, count, stop.tv_sec - s
184 212
 		}
185 213
 
186 214
 		FCGI_Finish();
215
+		if(ctl_get_rep_sz(pipe_ctl[0], &rep_sz))
216
+		{
217
+			rep_sz = 0;
218
+		}
187 219
 		//Increase sem showing the worker is idle
188 220
 		sop.sem_op = 1;
189 221
 		if(semop(semid, &sop, 1) < 0)
@@ -201,32 +233,63 @@ syslog(LOG_DEBUG, "W%dR%dtimer W8  %ld.%06ld us", wrk_id, count, stop.tv_sec - s
201 233
 			stop.tv_usec += 1000000;
202 234
 			stop.tv_sec -= 1;
203 235
 		}
204
-		syslog(LOG_DEBUG, "Worker[%d] request %d END [OK] in %ld.%06lds",
205
-			wrk_id, count, stop.tv_sec, stop.tv_usec);
236
+		syslog(LOG_DEBUG, "Worker[%d] request %d END [OK] %lu bytes in %ld.%06lds",
237
+			wrk_id, count, rep_sz, stop.tv_sec, stop.tv_usec);
206 238
 	}
239
+	free(piper_stack);
207 240
 	Py_Exit(count == max_reqs ?0:EXIT_PYERR);
208 241
 }
209 242
 
243
+/*
210 244
 void worker_piper(int wrk_id, int req_id, int pystdout, int pystderr,
211
-	int ctl_pipe)
245
+	int ctl_pipe, FCGX_Stream* out)
246
+*/
247
+int worker_piper(void *ptr)
212 248
 {
249
+	piper_args_t *args;
213 250
 	struct pollfd fds[2];
214 251
 	int err, ret, cont, poll_ret;
215
-	short revents;
252
+	short revents, closed, nfds;
216 253
 	char buf[PIPE_BUF];
254
+	size_t out_sz;
255
+
256
+	args = ptr;
257
+	int wrk_id, pystdout, pystderr, ctl_pipe;
258
+	//int req_id;
259
+	FCGX_Stream *out;
260
+	struct sigaction *act;
261
+	wrk_id = args->wrk_id;
262
+	//req_id = args->req_id;
263
+	pystdout = args->pystdout;
264
+	pystderr = args->pystderr;
265
+	ctl_pipe = args->ctl_pipe;
266
+	out = args->out;
267
+	act = args->act;
268
+
269
+	if(sigaction(WPIPER_SIG, act, NULL))
270
+	{
271
+		err = errno;
272
+		syslog(	LOG_ALERT, "Unable to sigaction in piper : %s",
273
+			strerror(err));
274
+		exit(err);
275
+	}
276
+	write(ctl_pipe, " ", 1);
277
+
217 278
 
218 279
 	//syslog(LOG_DEBUG, "Worker[%d] req #%d piper", wrk_id, req_id);
219 280
 
220
-	fds[0].fd = pystdout;
221
-	fds[1].fd = pystderr;
281
+	fds[0].fd = pystderr;
282
+	fds[1].fd = pystdout;
222 283
 	fds[0].events = fds[1].events = POLLIN;
223 284
 	fds[0].revents = fds[1].revents = 0;
285
+	nfds = 2;
224 286
 
225 287
 
288
+	closed = out_sz = 0;
226 289
 	cont = 2;
227 290
 	while(cont)
228 291
 	{
229
-		poll_ret = poll(fds, 2, 0);
292
+		poll_ret = poll(fds, nfds, 0);
230 293
 //syslog(LOG_DEBUG, "Worker[%d] req #%d poll_ret = %d", wrk_id, req_id, poll_ret);
231 294
 		if(poll_ret < 0)
232 295
 		{
@@ -245,7 +308,7 @@ void worker_piper(int wrk_id, int req_id, int pystdout, int pystderr,
245 308
 			cont--;
246 309
 			continue;
247 310
 		}
248
-		if(poll_ret && (revents = fds[0].revents))
311
+		if(poll_ret && (revents = fds[1].revents))
249 312
 		{
250 313
 			/*
251 314
 			syslog(LOG_DEBUG, "Worker[%d] req #%d STDOUT evt !",
@@ -275,12 +338,21 @@ void worker_piper(int wrk_id, int req_id, int pystdout, int pystderr,
275 338
 						strerror(err));
276 339
 					exit(err);
277 340
 				}
278
-				buf[ret] = '\0';
279
-				printf("%s", buf);
341
+				//buf[ret] = '\0';
342
+				ret = FCGX_PutStr(buf, ret, out);
343
+				out_sz += ret;
344
+				if(ret < PIPE_BUF && worker_piper_sigrcv)
345
+				{
346
+					FCGX_FClose(out);
347
+					FCGI_Finish();
348
+					write(ctl_pipe, &out_sz, sizeof(size_t));
349
+					closed = 1;
350
+					nfds = 1;
351
+				}
280 352
 			}
281 353
 			//TODO handle other poll events
282 354
 		}
283
-		if(poll_ret && (revents = fds[1].revents))
355
+		if(poll_ret && (revents = fds[0].revents))
284 356
 		{
285 357
 			/*
286 358
 			syslog(LOG_DEBUG, "Worker[%d] req #%d STDERR evt !",
@@ -322,7 +394,13 @@ void worker_piper(int wrk_id, int req_id, int pystdout, int pystderr,
322 394
 	syslog(LOG_DEBUG, "Worker[%d] req #%d piper exiting...",
323 395
 	       wrk_id, req_id);
324 396
 	*/
325
-	exit(0);
397
+	if(!closed)
398
+	{
399
+		FCGX_FClose(out);
400
+		FCGI_Finish();
401
+		write(ctl_pipe, &out_sz, sizeof(size_t));
402
+	}
403
+	return 0;
326 404
 }
327 405
 
328 406
 void worker_piper_sighandler(int signum)
@@ -331,6 +409,42 @@ void worker_piper_sighandler(int signum)
331 409
 	//syslog(LOG_DEBUG, "SIG");
332 410
 }
333 411
 
412
+int ctl_get_rep_sz(int ctl_pipe, size_t* rep_sz)
413
+{
414
+	struct pollfd fds;
415
+	int ret, err;
416
+
417
+	fds.fd = ctl_pipe;
418
+	fds.events = POLLIN;
419
+	fds.revents = 0;
420
+
421
+	if( (ret = poll(&fds, 1, 0)) < 0)
422
+	{
423
+		err = errno;
424
+		syslog(LOG_ERR, "Failed to poll ctl pipe : %s",
425
+		       strerror(err));
426
+		return -1;
427
+	}
428
+	if(!ret)
429
+	{
430
+		syslog(LOG_ERR, "No data in ctl pipe for rep_sz...");
431
+		return -1;
432
+	}
433
+	ret = read(ctl_pipe, rep_sz, sizeof(size_t));
434
+	if(ret < 0)
435
+	{
436
+		syslog(LOG_ERR, "Error reading ctl pipe : %s",
437
+		       strerror(errno));
438
+		return -1;
439
+	}
440
+	if(ret < sizeof(size_t))
441
+	{
442
+		syslog(LOG_ERR, "Incomplete read from ctl pipe, no rep_sz...");
443
+		return -1;
444
+	}
445
+	return 0;
446
+}
447
+
334 448
 PyObject* import_entrypoint(char* py_entrypoint)
335 449
 {
336 450
 	PyObject *entry_fname, *entry_module, *entry_fun;
@@ -642,7 +756,7 @@ update_python_fd_err:
642 756
 }
643 757
 
644 758
 
645
-void update_pyenv(PyObject *py_osmod)
759
+void update_pyenv(PyObject *py_osmod, char **environ)
646 760
 {
647 761
 	PyObject *pyenv, *pykey, *pyval;
648 762
 	char *key, *value, **cur;

+ 26
- 3
src/pyworker.h View File

@@ -59,6 +59,7 @@
59 59
 
60 60
 #include "config.h"
61 61
 
62
+#include <fcgiapp.h>
62 63
 #include <fcgi_stdio.h> /* fcgi library; put it first*/
63 64
 
64 65
 #define PY_SSIZE_T_CLEAN
@@ -71,6 +72,7 @@
71 72
 #include <signal.h>
72 73
 #include <limits.h>
73 74
 #include <poll.h>
75
+#include <sched.h>
74 76
 #include <sys/types.h>
75 77
 #include <sys/ipc.h>
76 78
 #include <sys/sem.h>
@@ -79,10 +81,20 @@
79 81
 #define EXIT_PYERR 42
80 82
 #define WPIPER_SIG 30
81 83
 #define PYENTRY_FUNNAME "entrypoint"
84
+#define PIPER_STACK_SZ (1024 * 1024 * 4)
82 85
 
83
-extern char **environ;
86
+//extern char **environ;
84 87
 
85 88
 typedef unsigned long int pywrkid_t;
89
+typedef struct piper_args_s piper_args_t;
90
+
91
+struct piper_args_s
92
+{
93
+	int wrk_id, pystdout, pystderr, ctl_pipe;
94
+	//int req_id;
95
+	FCGX_Stream *out;
96
+	struct sigaction *act;
97
+};
86 98
 
87 99
 /**@todo TODO on all request (when updating env ?) update stdin so
88 100
  * python will be able to read the request content */
@@ -108,12 +120,22 @@ int work(char*, int, int, int);
108 120
  * @note Exit after signal sent by parent & when poll indicate the pipes are
109 121
  * empty
110 122
  */
111
-void worker_piper(int, int, int, int, int);
123
+/*
124
+void worker_piper(int, int, int, int, int, FCGX_Stream*);
125
+*/
126
+int worker_piper(void*);
112 127
 
113 128
 /**@brief worker piper sigaction handler
114 129
  */
115 130
 void worker_piper_sighandler(int);
116 131
 
132
+/**@brief Attempt to read the request size from ctl pipe
133
+ * @param int ctl pipe read fd
134
+ * @param size_t* rep_sz
135
+ * @return 0 if no error
136
+ */
137
+int ctl_get_rep_sz(int, size_t*);
138
+
117 139
 /**@brief Import & return the python entrypoint callable
118 140
  */
119 141
 PyObject* import_entrypoint(char* py_entrypoint);
@@ -143,7 +165,8 @@ void update_python_fd(int[2], int[2]);
143 165
  * each request...
144 166
  * @param PyObject* os module
145 167
  */
146
-void update_pyenv(PyObject*);
168
+void update_pyenv(PyObject*, char**);
169
+
147 170
 
148 171
 void log_expt(int priority);
149 172
 

Loading…
Cancel
Save