/* * Copyright (C) 2019 Weber Yann * * This file is part of PyFCGI. * * PyFCGI is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License as published by * the Free Software Foundation, either version 3 of the License, or * any later version. * * PyFCGI is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with PyFCGI. If not, see . */ #include "pyworker.h" static int worker_piper_sigrcv = 0; int work(int wrk_id, int semid) { PyObject *entry_fun, *pystdout_flush, *pystderr_flush, *py_osmod; FCGX_Stream *in_stream, *out_stream, *err_stream; char **envp; int count, pipe_out[2], pipe_err[2], pipe_ctl[2], err, piper_status; int max_reqs; struct sembuf sop; struct sigaction act; struct timeval start, stop; sigset_t emptyset; char buf[PIPE_BUF]; size_t rep_sz; piper_args_t piper_args; char *piper_stack; max_reqs = PyFCGI_conf.max_reqs; piper_args.wrk_id = wrk_id; piper_args.act = &act; sop.sem_num = 0; sop.sem_flg = 0; // preparing sigaction for piper if(sigemptyset(&emptyset)) { err = errno; pyfcgi_log( LOG_ALERT, "sigemptyset fails in piper : %s", strerror(err)); exit(err); } act.sa_handler = worker_piper_sighandler; act.sa_mask = emptyset; //act.sa_flags = SA_RESTART; act.sa_flags = 0; act.sa_restorer = NULL; if( !(piper_stack = malloc(PIPER_STACK_SZ)) ) { err = errno; pyfcgi_log(LOG_ALERT, "Error while allocating piper stack : %s", strerror(err)); exit(err); } pyfcgi_log( LOG_INFO, "Worker %d started", wrk_id); update_python_path(); // add cwd to python path Py_Initialize(); // "start" python update_python_fd(pipe_out, pipe_err); piper_args.pystdout = pipe_out[0]; piper_args.pystderr = pipe_err[0]; fetch_pyflush(&pystdout_flush, &pystderr_flush); pyfcgi_log( LOG_INFO, "Worker[%d] Python started", wrk_id); //importing os py_osmod = PyImport_ImportModule("os"); if(!py_osmod) { pyfcgi_log(LOG_ALERT, "Unable to import os module"); log_expt(LOG_ALERT); Py_Exit(EXIT_PYERR); } // loading module entry_fun = import_entrypoint(); pyfcgi_log( LOG_INFO, "Worker[%d] Waiting request with %s.%s()", wrk_id, PyFCGI_conf.py_entrymod, PyFCGI_conf.py_entryfun); sop.sem_op = 1; //indicate worker as ready & idle if(semop(semid, &sop, 1) < 0) { err = errno; pyfcgi_log(LOG_ERR, "Worker[%d] error incrementing the semaphore : %s", wrk_id, strerror(err)); } if(!entry_fun) //but exit if import failed { pyfcgi_log(LOG_ALERT, "Unable to import entrypoint"); exit(PYFCGI_FATAL); } if(pipe(pipe_ctl) == -1) { err = errno; pyfcgi_log(LOG_ALERT, "Worker[%d] Pipe fails for piper ctl : %s", wrk_id, strerror(err)); exit(err); } piper_args.ctl_pipe = pipe_ctl[1]; count = 0; while ((!count || count != max_reqs) && FCGX_Accept(&in_stream, &out_stream, &err_stream, &envp) >= 0) { gettimeofday(&start, NULL); sop.sem_op = -1; // decrementing sem to show worker busy if(semop(semid, &sop, 1) < 0) { err = errno; pyfcgi_log(LOG_ERR, "Worker[%d] error decrementing the semaphore : %s", wrk_id, strerror(err)); } count++; piper_args.out = out_stream; //piper_args.req_id = count; worker_piper_sigrcv = 0; pid_t pid = clone(worker_piper, piper_stack + PIPER_STACK_SZ - 1, CLONE_CHILD_CLEARTID | CLONE_CHILD_SETTID | \ SIGCHLD | \ CLONE_FILES | CLONE_FS | CLONE_IO | CLONE_VM, &piper_args); if(pid < 0) { err = errno; pyfcgi_log(LOG_ALERT, "Worker[%d] req #%d Fork failed for piper : %s", wrk_id, count, strerror(err)); exit(err); } update_pyenv(py_osmod, envp); //close(pipe_ctl[1]); PyObject_CallObject(entry_fun, NULL); if(PyErr_Occurred()) { log_expt(LOG_ERR); } else { /* pyfcgi_log(LOG_DEBUG, "Worker[%d] request %d funcall [OK]", wrk_id, count); */ } PyObject_CallObject(pystdout_flush, NULL); PyObject_CallObject(pystderr_flush, NULL); read(pipe_ctl[0], &buf, 1); // unblock when child ready FCGX_FClose(in_stream); FCGX_FClose(err_stream); //close(pipe_ctl[0]); kill(pid, WPIPER_SIG); //indicate child python call ended /* gettimeofday(&stop, NULL); pyfcgi_log(LOG_DEBUG, "W%dR%dtimer KIL %ld.%06ld us", wrk_id, count, stop.tv_sec - start.tv_sec, stop.tv_usec - start.tv_usec); */ waitpid(pid, &piper_status, 0); /* gettimeofday(&stop, NULL); pyfcgi_log(LOG_DEBUG, "W%dR%dtimer W8 %ld.%06ld us", wrk_id, count, stop.tv_sec - start.tv_sec, stop.tv_usec - start.tv_usec); */ if(WIFSIGNALED(piper_status)) { pyfcgi_log(LOG_ERR, "Woker[%d] req #%d piper terminated by sig %d", wrk_id, count, WTERMSIG(piper_status)); exit(40); } else if(WEXITSTATUS(piper_status)) { pyfcgi_log(LOG_ERR, "Woker[%d] req #%d piper exited with error status %d", wrk_id, count, WEXITSTATUS(piper_status)); exit(41); } FCGI_Finish(); if(ctl_get_rep_sz(pipe_ctl[0], &rep_sz)) { rep_sz = 0; } //Increase sem showing the worker is idle sop.sem_op = 1; if(semop(semid, &sop, 1) < 0) { err = errno; pyfcgi_log(LOG_ERR, "Worker[%d] error incrementing the semaphore : %s", wrk_id, strerror(err)); } gettimeofday(&stop, NULL); stop.tv_sec = stop.tv_sec - start.tv_sec; stop.tv_usec = stop.tv_usec - start.tv_usec; if(stop.tv_usec < 0) { stop.tv_usec += 1000000; stop.tv_sec -= 1; } pyfcgi_log(LOG_DEBUG, "Worker[%d] request %d END [OK] %lu bytes in %ld.%06lds", wrk_id, count, rep_sz, stop.tv_sec, stop.tv_usec); } free(piper_stack); Py_Exit(count == max_reqs ?0:EXIT_PYERR); } /* void worker_piper(int wrk_id, int req_id, int pystdout, int pystderr, int ctl_pipe, FCGX_Stream* out) */ int worker_piper(void *ptr) { piper_args_t *args; struct pollfd fds[2]; int err, ret, cont, poll_ret; short revents, closed, nfds; char buf[PIPE_BUF]; size_t out_sz; args = ptr; int wrk_id, pystdout, pystderr, ctl_pipe; //int req_id; FCGX_Stream *out; struct sigaction *act; wrk_id = args->wrk_id; //req_id = args->req_id; pystdout = args->pystdout; pystderr = args->pystderr; ctl_pipe = args->ctl_pipe; out = args->out; act = args->act; if(sigaction(WPIPER_SIG, act, NULL)) { err = errno; pyfcgi_log( LOG_ALERT, "Unable to sigaction in piper : %s", strerror(err)); exit(err); } write(ctl_pipe, " ", 1); //pyfcgi_log(LOG_DEBUG, "Worker[%d] req #%d piper", wrk_id, req_id); fds[0].fd = pystderr; fds[1].fd = pystdout; fds[0].events = fds[1].events = POLLIN; fds[0].revents = fds[1].revents = 0; nfds = 2; closed = out_sz = 0; cont = 2; while(cont) { poll_ret = poll(fds, nfds, 0); //pyfcgi_log(LOG_DEBUG, "Worker[%d] req #%d poll_ret = %d", wrk_id, req_id, poll_ret); if(poll_ret < 0) { err = errno; if(err == EINTR) { continue; } pyfcgi_log( LOG_WARNING, "Poll error : %s", strerror(err)); fds[0].revents = fds[1].revents = 0; continue; } if(!poll_ret && worker_piper_sigrcv) { cont--; continue; } if(poll_ret && (revents = fds[1].revents)) { /* pyfcgi_log(LOG_DEBUG, "Worker[%d] req #%d STDOUT evt !", wrk_id, req_id); */ poll_ret--; if(revents & POLLIN) { /* pyfcgi_log(LOG_DEBUG, "Worker[%d] req #%d POLLIN STDOUT !", wrk_id, req_id); */ ret = read(pystdout, buf, PIPE_BUF); /* pyfcgi_log(LOG_DEBUG, "Worker[%d] req #%d read(stdout) ret %d", wrk_id, req_id, ret); */ if(ret < 0) { err = errno; if(err == EINTR) { continue; } pyfcgi_log( LOG_ERR, "Error reading python stdout : %s", strerror(err)); exit(err); } //buf[ret] = '\0'; ret = FCGX_PutStr(buf, ret, out); out_sz += ret; if(ret < PIPE_BUF && worker_piper_sigrcv) { FCGX_FClose(out); FCGI_Finish(); write(ctl_pipe, &out_sz, sizeof(size_t)); closed = 1; nfds = 1; } } //TODO handle other poll events } if(poll_ret && (revents = fds[0].revents)) { /* pyfcgi_log(LOG_DEBUG, "Worker[%d] req #%d STDERR evt !", wrk_id, req_id); */ if(revents & POLLIN) { /* pyfcgi_log(LOG_DEBUG, "Worker[%d] req #%d POLLIN STDERR !", wrk_id, req_id); */ while(1) { ret = read(pystderr, buf, PIPE_BUF); if(ret < 0) { err = errno; if(err == EINTR) { continue; } pyfcgi_log( LOG_ERR, "Error reading python stdout : %s", strerror(err)); exit(err); } buf[ret] = '\0'; pyfcgi_log(LOG_ERR, "Worker[%d] Python error : %s", wrk_id, buf); break; } } //TODO handle other poll events } fds[0].revents = fds[1].revents = 0; } /* pyfcgi_log(LOG_DEBUG, "Worker[%d] req #%d piper exiting...", wrk_id, req_id); */ if(!closed) { FCGX_FClose(out); FCGI_Finish(); write(ctl_pipe, &out_sz, sizeof(size_t)); } return 0; } void worker_piper_sighandler(int signum) { worker_piper_sigrcv = 1; //pyfcgi_log(LOG_DEBUG, "SIG"); } int ctl_get_rep_sz(int ctl_pipe, size_t* rep_sz) { struct pollfd fds; int ret, err; fds.fd = ctl_pipe; fds.events = POLLIN; fds.revents = 0; if( (ret = poll(&fds, 1, 0)) < 0) { err = errno; pyfcgi_log(LOG_ERR, "Failed to poll ctl pipe : %s", strerror(err)); return -1; } if(!ret) { pyfcgi_log(LOG_ERR, "No data in ctl pipe for rep_sz..."); return -1; } ret = read(ctl_pipe, rep_sz, sizeof(size_t)); if(ret < 0) { pyfcgi_log(LOG_ERR, "Error reading ctl pipe : %s", strerror(errno)); return -1; } if(ret < sizeof(size_t)) { pyfcgi_log(LOG_ERR, "Incomplete read from ctl pipe, no rep_sz..."); return -1; } return 0; } static PyObject* _fetch_pyflush(const char *fdname) { PyObject *pyfd, *pyflush; pyfd = PySys_GetObject(fdname); if(!pyfd) { pyfcgi_log(LOG_ALERT, "Unable to fetch sys.%s", fdname); log_expt(LOG_ALERT); Py_Exit(EXIT_PYERR); } pyflush = PyObject_GetAttrString(pyfd, "flush"); Py_DECREF(pyfd); if(!pyflush) { pyfcgi_log(LOG_ALERT, "Unable to fetch sys.%s.flush", fdname); log_expt(LOG_ALERT); Py_Exit(EXIT_PYERR); } if(!PyCallable_Check(pyflush)) { pyfcgi_log(LOG_ALERT, "sys.%s.flush is not callable !", fdname); Py_Exit(EXIT_PYERR); } return pyflush; } void fetch_pyflush(PyObject** pystdout_flush, PyObject** pystderr_flush) { *pystdout_flush = _fetch_pyflush("stdout"); *pystderr_flush = _fetch_pyflush("stderr"); } void update_python_fd(int pipe_out[2], int pipe_err[2]) { int pri, err; char *err_fmt; PyObject *os_mod, *pyfdopen, *args, *new_fd; pri = LOG_ALERT; if(pipe2(pipe_out, O_DIRECT) == -1) { err = errno; err_fmt = "Unable to create pipe for python stdout : %s"; goto update_python_fd_err; } if(pipe2(pipe_err, O_DIRECT) == -1) { err = errno; err_fmt = "Unable to create pipe for python stderr : %s"; goto update_python_fd_err_pipeout; } os_mod = PyImport_ImportModule("os"); if(!os_mod) { if(PyErr_Occurred()) { log_expt(LOG_ALERT); } else { pyfcgi_log( LOG_ALERT, "Unable to import python os module, got NULL."); } err_fmt = NULL; goto update_python_fd_err_pipes; } pyfdopen = PyObject_GetAttrString(os_mod, "fdopen"); Py_DECREF(os_mod); if(!pyfdopen) { if(PyErr_Occurred()) { log_expt(LOG_ALERT); } else { pyfcgi_log( LOG_ALERT, "Unable to fetch os.fdopen() , got NULL."); } err_fmt = NULL; goto update_python_fd_err_pipes; } args = Py_BuildValue("is", pipe_out[1], "w"); if(!args) { pyfcgi_log( LOG_ERR, "Error building values with '%d', '%s' for stdout", pipe_out[1], "w"); log_expt(LOG_ALERT); err_fmt = NULL; goto update_python_fd_err_fdopen; } new_fd = PyObject_CallObject(pyfdopen, args); if(!new_fd || PyErr_Occurred()) { pyfcgi_log( LOG_ERR, "Error calling fdopen(%d, '%s')", pipe_out[1], "w"); log_expt(LOG_ALERT); err_fmt = NULL; goto update_python_fd_err_args; } Py_DECREF(args); if(PySys_SetObject("stdout", new_fd)) { pyfcgi_log(LOG_ERR, "Unable to set sys.stdout"); log_expt(LOG_ALERT); goto update_python_fd_err_newfd; } Py_DECREF(new_fd); args = Py_BuildValue("is", pipe_err[1], "w"); if(!args) { pyfcgi_log( LOG_ERR, "Error building values with '%d', '%s' for stderr", pipe_out[1], "w"); log_expt(LOG_ALERT); err_fmt = NULL; goto update_python_fd_err_fdopen; } new_fd = PyObject_CallObject(pyfdopen, args); if(!new_fd || PyErr_Occurred()) { pyfcgi_log( LOG_ERR, "Error calling fdopen(%d, '%s')", pipe_out[1], "w"); log_expt(LOG_ALERT); err_fmt = NULL; goto update_python_fd_err_args; } Py_DECREF(args); if(PySys_SetObject("stderr", new_fd)) { pyfcgi_log(LOG_ERR, "Unable to set sys.stderr"); log_expt(LOG_ALERT); goto update_python_fd_err_newfd; } Py_DECREF(new_fd); return; update_python_fd_err_newfd: Py_DECREF(new_fd); update_python_fd_err_args: Py_DECREF(args); update_python_fd_err_fdopen: Py_DECREF(fdopen); update_python_fd_err_pipes: close(pipe_err[0]); close(pipe_err[1]); update_python_fd_err_pipeout: close(pipe_out[0]); close(pipe_out[1]); update_python_fd_err: if(err_fmt) { pyfcgi_log(pri, err_fmt, strerror(err)); } exit(1); } void update_pyenv(PyObject *py_osmod, char **environ) { PyObject *pyenv, *pykey, *pyval; char *key, *value, **cur; cur = environ; pyenv = PyObject_GetAttrString(py_osmod, "environ"); if(!pyenv) { pyfcgi_log(LOG_WARNING, "Unable to get os.environ"); log_expt(LOG_ALERT); } else { Py_DECREF(pyenv); } pyenv = PyDict_New(); while(*cur) { key = value = *cur; while(*value && *value != '=') { value++; } if(!*value) { pyfcgi_log(LOG_WARNING, "Droping environment value without value : '%s'", key); cur++; continue; } value++; *(value-1) = '\0'; // dirty modification of **environ //pyfcgi_log(LOG_DEBUG, "PySetEnv '%s'='%s'", key, value); pykey = PyUnicode_DecodeLocale(key, "surrogateescape"); if(!pykey) { *(value-1) = '='; // **environ restore pyfcgi_log(LOG_ALERT, "Unable to parse environ key string '%s'", key); log_expt(LOG_ALERT); Py_Exit(EXIT_PYERR); } *(value-1) = '='; // **environ restore pyval = PyUnicode_DecodeFSDefault(value); if(!pyval) { pyfcgi_log(LOG_ALERT, "Unable to parse environ val string '%s'", value); log_expt(LOG_ALERT); Py_Exit(EXIT_PYERR); } if(PyDict_SetItem(pyenv, pykey, pyval) == -1) { pyfcgi_log(LOG_ERR, "Unable to set environ '%s'='%s'", key, value); log_expt(LOG_ERR); } Py_DECREF(pyval); Py_DECREF(pykey); cur++; } PyObject_SetAttrString(py_osmod, "environ", pyenv); }