diff --git a/src/pyworker.c b/src/pyworker.c index 595f3ac..69830db 100644 --- a/src/pyworker.c +++ b/src/pyworker.c @@ -25,13 +25,21 @@ int work(char* py_entrypoint, int wrk_id, int semid, int max_reqs) { PyObject *entry_fun, *pystdout_flush, *pystderr_flush, *py_osmod; + FCGX_Stream *in_stream, *out_stream, *err_stream; + char **envp; int count, pipe_out[2], pipe_err[2], pipe_ctl[2], err, piper_status; struct sembuf sop; struct sigaction act; struct timeval start, stop; sigset_t emptyset; char buf[PIPE_BUF]; + size_t rep_sz; + piper_args_t piper_args; + char *piper_stack; + + piper_args.wrk_id = wrk_id; + piper_args.act = &act; sop.sem_num = 0; sop.sem_flg = 0; @@ -50,12 +58,23 @@ int work(char* py_entrypoint, int wrk_id, int semid, int max_reqs) act.sa_restorer = NULL; + if( !(piper_stack = malloc(PIPER_STACK_SZ)) ) + { + err = errno; + syslog(LOG_ALERT, "Error while allocating piper stack : %s", + strerror(err)); + exit(err); + } + syslog( LOG_INFO, "Worker %d started", wrk_id); update_python_path(); // add cwd to python path Py_Initialize(); // "start" python update_python_fd(pipe_out, pipe_err); + piper_args.pystdout = pipe_out[0]; + piper_args.pystderr = pipe_err[0]; + fetch_pyflush(&pystdout_flush, &pystderr_flush); syslog( LOG_INFO, "Worker[%d] Python started", wrk_id); @@ -94,9 +113,10 @@ int work(char* py_entrypoint, int wrk_id, int semid, int max_reqs) wrk_id, strerror(err)); exit(err); } + piper_args.ctl_pipe = pipe_ctl[1]; count = 0; - while ((!count || count != max_reqs) && FCGI_Accept() >= 0) + while ((!count || count != max_reqs) && FCGX_Accept(&in_stream, &out_stream, &err_stream, &envp) >= 0) { gettimeofday(&start, NULL); sop.sem_op = -1; // decrementing sem to show worker busy @@ -113,8 +133,20 @@ int work(char* py_entrypoint, int wrk_id, int semid, int max_reqs) syslog( LOG_INFO, "Worker[%d] request %d", wrk_id, count); */ + piper_args.out = out_stream; + //piper_args.req_id = count; worker_piper_sigrcv = 0; + /* pid_t pid = fork(); + if(!pid) { exit(worker_piper(&piper_args)); } + */ + + + pid_t pid = clone(worker_piper, piper_stack + PIPER_STACK_SZ - 1, + CLONE_CHILD_CLEARTID | CLONE_CHILD_SETTID | \ + SIGCHLD | \ + CLONE_FILES | CLONE_FS | CLONE_IO | CLONE_VM, + &piper_args); if(pid < 0) { err = errno; @@ -123,23 +155,17 @@ int work(char* py_entrypoint, int wrk_id, int semid, int max_reqs) wrk_id, count, strerror(err)); exit(err); } + /* if(!pid) { - if(sigaction(WPIPER_SIG, &act, NULL)) - { - err = errno; - syslog( LOG_ALERT, "Unable to sigaction in piper : %s", - strerror(err)); - exit(err); - } - write(pipe_ctl[1], " ", 1); - + worker_piper(wrk_id, count, pipe_out[0], pipe_err[0], - pipe_ctl[1]); + pipe_ctl[1], out_stream); //printf("Content-type: text/html\r\n\r\nHello world !\n"); exit(1); } - update_pyenv(py_osmod); + */ + update_pyenv(py_osmod, envp); //close(pipe_ctl[1]); PyObject_CallObject(entry_fun, NULL); if(PyErr_Occurred()) @@ -156,6 +182,8 @@ int work(char* py_entrypoint, int wrk_id, int semid, int max_reqs) PyObject_CallObject(pystdout_flush, NULL); PyObject_CallObject(pystderr_flush, NULL); read(pipe_ctl[0], &buf, 1); // unblock when child ready + FCGX_FClose(in_stream); + FCGX_FClose(err_stream); //close(pipe_ctl[0]); kill(pid, WPIPER_SIG); //indicate child python call ended @@ -184,6 +212,10 @@ syslog(LOG_DEBUG, "W%dR%dtimer W8 %ld.%06ld us", wrk_id, count, stop.tv_sec - s } FCGI_Finish(); + if(ctl_get_rep_sz(pipe_ctl[0], &rep_sz)) + { + rep_sz = 0; + } //Increase sem showing the worker is idle sop.sem_op = 1; if(semop(semid, &sop, 1) < 0) @@ -201,32 +233,63 @@ syslog(LOG_DEBUG, "W%dR%dtimer W8 %ld.%06ld us", wrk_id, count, stop.tv_sec - s stop.tv_usec += 1000000; stop.tv_sec -= 1; } - syslog(LOG_DEBUG, "Worker[%d] request %d END [OK] in %ld.%06lds", - wrk_id, count, stop.tv_sec, stop.tv_usec); + syslog(LOG_DEBUG, "Worker[%d] request %d END [OK] %lu bytes in %ld.%06lds", + wrk_id, count, rep_sz, stop.tv_sec, stop.tv_usec); } + free(piper_stack); Py_Exit(count == max_reqs ?0:EXIT_PYERR); } +/* void worker_piper(int wrk_id, int req_id, int pystdout, int pystderr, - int ctl_pipe) + int ctl_pipe, FCGX_Stream* out) +*/ +int worker_piper(void *ptr) { + piper_args_t *args; struct pollfd fds[2]; int err, ret, cont, poll_ret; - short revents; + short revents, closed, nfds; char buf[PIPE_BUF]; + size_t out_sz; + + args = ptr; + int wrk_id, pystdout, pystderr, ctl_pipe; + //int req_id; + FCGX_Stream *out; + struct sigaction *act; + wrk_id = args->wrk_id; + //req_id = args->req_id; + pystdout = args->pystdout; + pystderr = args->pystderr; + ctl_pipe = args->ctl_pipe; + out = args->out; + act = args->act; + + if(sigaction(WPIPER_SIG, act, NULL)) + { + err = errno; + syslog( LOG_ALERT, "Unable to sigaction in piper : %s", + strerror(err)); + exit(err); + } + write(ctl_pipe, " ", 1); + //syslog(LOG_DEBUG, "Worker[%d] req #%d piper", wrk_id, req_id); - fds[0].fd = pystdout; - fds[1].fd = pystderr; + fds[0].fd = pystderr; + fds[1].fd = pystdout; fds[0].events = fds[1].events = POLLIN; fds[0].revents = fds[1].revents = 0; + nfds = 2; + closed = out_sz = 0; cont = 2; while(cont) { - poll_ret = poll(fds, 2, 0); + poll_ret = poll(fds, nfds, 0); //syslog(LOG_DEBUG, "Worker[%d] req #%d poll_ret = %d", wrk_id, req_id, poll_ret); if(poll_ret < 0) { @@ -245,7 +308,7 @@ void worker_piper(int wrk_id, int req_id, int pystdout, int pystderr, cont--; continue; } - if(poll_ret && (revents = fds[0].revents)) + if(poll_ret && (revents = fds[1].revents)) { /* syslog(LOG_DEBUG, "Worker[%d] req #%d STDOUT evt !", @@ -275,12 +338,21 @@ void worker_piper(int wrk_id, int req_id, int pystdout, int pystderr, strerror(err)); exit(err); } - buf[ret] = '\0'; - printf("%s", buf); + //buf[ret] = '\0'; + ret = FCGX_PutStr(buf, ret, out); + out_sz += ret; + if(ret < PIPE_BUF && worker_piper_sigrcv) + { + FCGX_FClose(out); + FCGI_Finish(); + write(ctl_pipe, &out_sz, sizeof(size_t)); + closed = 1; + nfds = 1; + } } //TODO handle other poll events } - if(poll_ret && (revents = fds[1].revents)) + if(poll_ret && (revents = fds[0].revents)) { /* syslog(LOG_DEBUG, "Worker[%d] req #%d STDERR evt !", @@ -322,7 +394,13 @@ void worker_piper(int wrk_id, int req_id, int pystdout, int pystderr, syslog(LOG_DEBUG, "Worker[%d] req #%d piper exiting...", wrk_id, req_id); */ - exit(0); + if(!closed) + { + FCGX_FClose(out); + FCGI_Finish(); + write(ctl_pipe, &out_sz, sizeof(size_t)); + } + return 0; } void worker_piper_sighandler(int signum) @@ -331,6 +409,42 @@ void worker_piper_sighandler(int signum) //syslog(LOG_DEBUG, "SIG"); } +int ctl_get_rep_sz(int ctl_pipe, size_t* rep_sz) +{ + struct pollfd fds; + int ret, err; + + fds.fd = ctl_pipe; + fds.events = POLLIN; + fds.revents = 0; + + if( (ret = poll(&fds, 1, 0)) < 0) + { + err = errno; + syslog(LOG_ERR, "Failed to poll ctl pipe : %s", + strerror(err)); + return -1; + } + if(!ret) + { + syslog(LOG_ERR, "No data in ctl pipe for rep_sz..."); + return -1; + } + ret = read(ctl_pipe, rep_sz, sizeof(size_t)); + if(ret < 0) + { + syslog(LOG_ERR, "Error reading ctl pipe : %s", + strerror(errno)); + return -1; + } + if(ret < sizeof(size_t)) + { + syslog(LOG_ERR, "Incomplete read from ctl pipe, no rep_sz..."); + return -1; + } + return 0; +} + PyObject* import_entrypoint(char* py_entrypoint) { PyObject *entry_fname, *entry_module, *entry_fun; @@ -642,7 +756,7 @@ update_python_fd_err: } -void update_pyenv(PyObject *py_osmod) +void update_pyenv(PyObject *py_osmod, char **environ) { PyObject *pyenv, *pykey, *pyval; char *key, *value, **cur; diff --git a/src/pyworker.h b/src/pyworker.h index 05f37a6..7f5e397 100644 --- a/src/pyworker.h +++ b/src/pyworker.h @@ -59,6 +59,7 @@ #include "config.h" +#include #include /* fcgi library; put it first*/ #define PY_SSIZE_T_CLEAN @@ -71,6 +72,7 @@ #include #include #include +#include #include #include #include @@ -79,10 +81,20 @@ #define EXIT_PYERR 42 #define WPIPER_SIG 30 #define PYENTRY_FUNNAME "entrypoint" +#define PIPER_STACK_SZ (1024 * 1024 * 4) -extern char **environ; +//extern char **environ; typedef unsigned long int pywrkid_t; +typedef struct piper_args_s piper_args_t; + +struct piper_args_s +{ + int wrk_id, pystdout, pystderr, ctl_pipe; + //int req_id; + FCGX_Stream *out; + struct sigaction *act; +}; /**@todo TODO on all request (when updating env ?) update stdin so * python will be able to read the request content */ @@ -108,12 +120,22 @@ int work(char*, int, int, int); * @note Exit after signal sent by parent & when poll indicate the pipes are * empty */ -void worker_piper(int, int, int, int, int); +/* +void worker_piper(int, int, int, int, int, FCGX_Stream*); +*/ +int worker_piper(void*); /**@brief worker piper sigaction handler */ void worker_piper_sighandler(int); +/**@brief Attempt to read the request size from ctl pipe + * @param int ctl pipe read fd + * @param size_t* rep_sz + * @return 0 if no error + */ +int ctl_get_rep_sz(int, size_t*); + /**@brief Import & return the python entrypoint callable */ PyObject* import_entrypoint(char* py_entrypoint); @@ -143,7 +165,8 @@ void update_python_fd(int[2], int[2]); * each request... * @param PyObject* os module */ -void update_pyenv(PyObject*); +void update_pyenv(PyObject*, char**); + void log_expt(int priority);