/* * Copyright (C) 2019 Weber Yann * * This file is part of PyFCGI. * * PyFCGI is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License as published by * the Free Software Foundation, either version 3 of the License, or * any later version. * * PyFCGI is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with PyFCGI. If not, see . */ #include "pyworker.h" static short _wtimeout; /**@brief 1 if worker idle else 0 */ static short _worker_idle; /**@brief Indicate that a worker is idle */ static inline void worker_set_idle(); /**@brief Indicate that a worker is busy */ static inline void worker_set_busy(); /**@brief Process results from a pep333 worker * @param out out stream from libFCGI * @param ret application function returned value */ static inline int work333_send_result(FCGX_Stream* out, PyObject* ret); static int worker_piper_sigrcv = 0; int work333(int wrk_id) { PyObject *entry_fun, *py_osmod, *entry_ret, *environ, *start_response, *args; FCGX_Stream *in_stream, *out_stream, *err_stream; char **envp; int count; int max_reqs; struct timeval start, stop; FCGX_Request *request; max_reqs = PyFCGI_conf.max_reqs; pyfcgi_log(LOG_INFO, "Worker started with PEP333 App"); pyinit(); pyfcgi_log(LOG_DEBUG, "Python started"); //importing os py_osmod = python_osmod(); //importing libpyfcgi if(!pyinit_libpyfcgi()) { exit(PYFCGI_FATAL); } pyfcgi_log(LOG_DEBUG, "libpyfcgi imported"); // loading module entry_fun = import_entrypoint(); pyfcgi_log(LOG_INFO, "Waiting request with %s.%s()", PyFCGI_conf.py_entrymod, PyFCGI_conf.py_entryfun); if(!entry_fun) //but exit if import failed { pyfcgi_log(LOG_ALERT, "Unable to import entrypoint"); exit(PYFCGI_FATAL); } start_response = get_start_response(); // Initialise FCGI request request = &(PyFCGI_conf.context.fcgi_request); if(FCGX_InitRequest(request, PyFCGI_conf.context.fcgi_socket, FCGI_FAIL_ACCEPT_ON_INTR) == -1) { pyfcgi_log(LOG_ALERT, "Unable to init FCGI request"); exit(PYFCGI_FATAL); } _worker_idle = 0; worker_set_idle(); // requests accepting loop count = 0; while ((!count || count != max_reqs) && FCGX_Accept_r(request) == 0) { in_stream = request->in; out_stream = request->out; err_stream = request->err; envp = request->envp; pyfcgi_wd_arm(); sem_post(PyFCGI_SEM(SEM_WREQS).sem); // increment request counter discarding OF gettimeofday(&start, NULL); worker_set_busy(); count++; environ = update_pyenv(py_osmod, envp); _wtimeout = 0; libpyfcgi_clean_response(); libpyfcgi.out = out_stream; libpyfcgi.in = in_stream; args = Py_BuildValue("OO", environ, start_response); // Call application function entry_ret = PyObject_CallObject(entry_fun, args); if(entry_ret && entry_ret != Py_None ) { Py_INCREF(entry_ret); } if(_wtimeout) { PyObject *type, *exc, *tb, *tmp; // Changing exception to a TimeoutError, but keeping // the traceback to display PyErr_Fetch(&type, &exc, &tb); PyErr_SetString(PyExc_TimeoutError, "Request timeout"); PyErr_Fetch(&type, &exc, &tmp); PyErr_Restore(type, exc, tb); log_expt(LOG_ERR); libpyfcgi_timeout(); // Sending headers if possible } if(PyErr_Occurred()) { pyfcgi_log(LOG_ERR, "PEP333 entrypoint function triggered an exception"); log_expt(LOG_ERR); } // able to process returned value // Simulate python call of libpyfcgi.write_body() if(entry_ret && entry_ret != Py_None) { _pyfcgi_write_body(entry_ret); if(PyErr_Occurred()) { log_expt(LOG_ERR); } Py_DECREF(entry_ret); } // clean stuffs Py_DECREF(args); Py_DECREF(environ); FCGX_FClose(out_stream); FCGX_FClose(in_stream); FCGX_FClose(err_stream); FCGX_Finish_r(request); gettimeofday(&stop, NULL); stop.tv_sec = stop.tv_sec - start.tv_sec; stop.tv_usec = stop.tv_usec - start.tv_usec; if(stop.tv_usec < 0) { stop.tv_usec += 1000000; stop.tv_sec -= 1; } pyfcgi_wd_pause(); if(_wtimeout) { exit(PYFCGI_TIMEOUT); } pyfcgi_log(LOG_DEBUG, "Worker[%d] request %d END [OK] %lu bytes in %ld.%06lds", wrk_id, count, libpyfcgi.rep_sz, stop.tv_sec, stop.tv_usec); worker_set_idle(); } worker_set_busy(); return 0; } static inline int work333_send_result(FCGX_Stream *out, PyObject* ret) { return 0; } int work(int wrk_id) { PyObject *entry_fun, *pystdout_flush, *pystderr_flush, *py_osmod, *environ; FCGX_Stream *in_stream, *out_stream, *err_stream; char **envp; int count, pipe_out[2], pipe_err[2], pipe_ctl[2], err, piper_status; int max_reqs; struct sigaction act; struct timeval start, stop; sigset_t emptyset; char buf[PIPE_BUF]; size_t rep_sz; piper_args_t piper_args; char *piper_stack; max_reqs = PyFCGI_conf.max_reqs; piper_args.wrk_id = wrk_id; piper_args.act = &act; // preparing sigaction for piper if(sigemptyset(&emptyset)) { err = errno; pyfcgi_log( LOG_ALERT, "sigemptyset fails in piper : %s", strerror(err)); exit(err); } act.sa_handler = worker_piper_sighandler; act.sa_mask = emptyset; //act.sa_flags = SA_RESTART; act.sa_flags = 0; act.sa_restorer = NULL; if( !(piper_stack = malloc(PIPER_STACK_SZ)) ) { err = errno; pyfcgi_log(LOG_ALERT, "Error while allocating piper stack : %s", strerror(err)); exit(err); } pyfcgi_log(LOG_INFO, "Worker %d started", wrk_id); update_python_path(); // add cwd to python path Py_Initialize(); // "start" python update_python_fd(pipe_out, pipe_err); piper_args.pystdout = pipe_out[0]; piper_args.pystderr = pipe_err[0]; fetch_pyflush(&pystdout_flush, &pystderr_flush); pyfcgi_log( LOG_INFO, "Worker[%d] Python started", wrk_id); //importing os py_osmod = python_osmod(); // loading module entry_fun = import_entrypoint(); pyfcgi_log( LOG_INFO, "Worker[%d] Waiting request with %s.%s()", wrk_id, PyFCGI_conf.py_entrymod, PyFCGI_conf.py_entryfun); if(!entry_fun) //but exit if import failed { pyfcgi_log(LOG_ALERT, "Unable to import entrypoint"); exit(PYFCGI_FATAL); } if(pipe(pipe_ctl) == -1) { err = errno; pyfcgi_log(LOG_ALERT, "Worker[%d] Pipe fails for piper ctl : %s", wrk_id, strerror(err)); exit(err); } piper_args.ctl_pipe = pipe_ctl[1]; _worker_idle = 0; worker_set_idle(); count = 0; while ((!count || count != max_reqs) && FCGX_Accept(&in_stream, &out_stream, &err_stream, &envp) >= 0) { pyfcgi_wd_arm(); sem_post(PyFCGI_SEM(SEM_WREQS).sem); // increment request counter discarding OF gettimeofday(&start, NULL); worker_set_busy(); count++; piper_args.out = out_stream; //piper_args.req_id = count; worker_piper_sigrcv = 0; // if(!PyFCGI_conf.pep333) /**@todo avoid clone on each request... (using named pipes ?) */ pid_t pid = clone(worker_piper, piper_stack + PIPER_STACK_SZ - 1, CLONE_CHILD_CLEARTID | CLONE_CHILD_SETTID | \ SIGCHLD | \ CLONE_FILES | CLONE_FS | CLONE_IO | CLONE_VM, &piper_args); if(pid < 0) { err = errno; pyfcgi_log(LOG_ALERT, "Worker[%d] req #%d Fork failed for piper : %s", wrk_id, count, strerror(err)); exit(err); } environ = update_pyenv(py_osmod, envp); Py_DECREF(environ); //close(pipe_ctl[1]); PyObject_CallObject(entry_fun, NULL); if(PyErr_Occurred()) { log_expt(LOG_ERR); } else { /* pyfcgi_log(LOG_DEBUG, "Worker[%d] request %d funcall [OK]", wrk_id, count); */ } PyObject_CallObject(pystdout_flush, NULL); PyObject_CallObject(pystderr_flush, NULL); read(pipe_ctl[0], &buf, 1); // unblock when child ready FCGX_FClose(in_stream); FCGX_FClose(err_stream); //close(pipe_ctl[0]); kill(pid, WPIPER_SIG); //indicate child python call ended waitpid(pid, &piper_status, 0); if(WIFSIGNALED(piper_status)) { pyfcgi_log(LOG_ERR, "Woker[%d] req #%d piper terminated by sig %d", wrk_id, count, WTERMSIG(piper_status)); exit(40); } else if(WEXITSTATUS(piper_status)) { pyfcgi_log(LOG_ERR, "Woker[%d] req #%d piper exited with error status %d", wrk_id, count, WEXITSTATUS(piper_status)); exit(41); } FCGI_Finish(); if(ctl_get_rep_sz(pipe_ctl[0], &rep_sz)) { rep_sz = 0; } //Increase sem showing the worker is idle worker_set_idle(); gettimeofday(&stop, NULL); stop.tv_sec = stop.tv_sec - start.tv_sec; stop.tv_usec = stop.tv_usec - start.tv_usec; if(stop.tv_usec < 0) { stop.tv_usec += 1000000; stop.tv_sec -= 1; } pyfcgi_wd_pause(); pyfcgi_log(LOG_DEBUG, "Worker[%d] request %d END [OK] %lu bytes in %ld.%06lds", wrk_id, count, rep_sz, stop.tv_sec, stop.tv_usec); } worker_set_busy(); free(piper_stack); Py_Exit(count == max_reqs ?0:EXIT_PYERR); } /* void worker_piper(int wrk_id, int req_id, int pystdout, int pystderr, int ctl_pipe, FCGX_Stream* out) */ int worker_piper(void *ptr) { piper_args_t *args; struct pollfd fds[2]; int err, ret, cont, poll_ret; short revents, closed, nfds; char buf[PIPE_BUF]; size_t out_sz; args = ptr; int wrk_id, pystdout, pystderr, ctl_pipe; //int req_id; FCGX_Stream *out; struct sigaction *act; wrk_id = args->wrk_id; //req_id = args->req_id; pystdout = args->pystdout; pystderr = args->pystderr; ctl_pipe = args->ctl_pipe; out = args->out; act = args->act; if(sigaction(WPIPER_SIG, act, NULL)) { err = errno; pyfcgi_log( LOG_ALERT, "Unable to sigaction in piper : %s", strerror(err)); exit(err); } write(ctl_pipe, " ", 1); //pyfcgi_log(LOG_DEBUG, "Worker[%d] req #%d piper", wrk_id, req_id); fds[0].fd = pystderr; fds[1].fd = pystdout; fds[0].events = fds[1].events = POLLIN; fds[0].revents = fds[1].revents = 0; nfds = 2; closed = out_sz = 0; cont = 2; while(cont) { poll_ret = poll(fds, nfds, 0); //pyfcgi_log(LOG_DEBUG, "Worker[%d] req #%d poll_ret = %d", wrk_id, req_id, poll_ret); if(poll_ret < 0) { err = errno; if(err == EINTR) { continue; } pyfcgi_log( LOG_WARNING, "Poll error : %s", strerror(err)); fds[0].revents = fds[1].revents = 0; continue; } if(!poll_ret && worker_piper_sigrcv) { cont--; continue; } if(poll_ret && (revents = fds[1].revents)) { /* pyfcgi_log(LOG_DEBUG, "Worker[%d] req #%d STDOUT evt !", wrk_id, req_id); */ poll_ret--; if(revents & POLLIN) { /* pyfcgi_log(LOG_DEBUG, "Worker[%d] req #%d POLLIN STDOUT !", wrk_id, req_id); */ ret = read(pystdout, buf, PIPE_BUF); /* pyfcgi_log(LOG_DEBUG, "Worker[%d] req #%d read(stdout) ret %d", wrk_id, req_id, ret); */ if(ret < 0) { err = errno; if(err == EINTR) { continue; } pyfcgi_log( LOG_ERR, "Error reading python stdout : %s", strerror(err)); exit(err); } //buf[ret] = '\0'; ret = FCGX_PutStr(buf, ret, out); out_sz += ret; if(ret < PIPE_BUF && worker_piper_sigrcv) { FCGX_FClose(out); FCGI_Finish(); write(ctl_pipe, &out_sz, sizeof(size_t)); closed = 1; nfds = 1; } } //TODO handle other poll events } if(poll_ret && (revents = fds[0].revents)) { /* pyfcgi_log(LOG_DEBUG, "Worker[%d] req #%d STDERR evt !", wrk_id, req_id); */ if(revents & POLLIN) { /* pyfcgi_log(LOG_DEBUG, "Worker[%d] req #%d POLLIN STDERR !", wrk_id, req_id); */ while(1) { ret = read(pystderr, buf, PIPE_BUF); if(ret < 0) { err = errno; if(err == EINTR) { continue; } pyfcgi_log( LOG_ERR, "Error reading python stdout : %s", strerror(err)); exit(err); } buf[ret] = '\0'; pyfcgi_log(LOG_ERR, "Worker[%d] Python error : %s", wrk_id, buf); break; } } //TODO handle other poll events } fds[0].revents = fds[1].revents = 0; } /* pyfcgi_log(LOG_DEBUG, "Worker[%d] req #%d piper exiting...", wrk_id, req_id); */ if(!closed) { FCGX_FClose(out); FCGI_Finish(); write(ctl_pipe, &out_sz, sizeof(size_t)); } return 0; } void worker_log_pipes(int pipe_std, int pipe_err, PyObject *flush[2]) { size_t ret; char buff[4096]; int i, poll_ret; int loglevels[2] = {LOG_INFO, LOG_WARNING}; struct pollfd fds[2]; short isflush[2] = {0,0}; char *outnames[2] = {"sys.stdout", "sys.stderr"}; fds[0].fd = pipe_std; fds[1].fd = pipe_err; fds[0].events = fds[1].events = POLLIN; fds[0].revents = fds[1].revents = 0; for(i=0; i<2; i++) { do { poll_ret = poll(&(fds[i]), 1, 0); if(poll_ret) { ret = read(pipe_std, buff, 4096); if(ret == -1) { pyfcgi_log(LOG_ERR, "Error reading python %s pipe : %s", outnames[i], strerror(errno)); exit(PYFCGI_WORKER_FAIL); } pyfcgi_log(loglevels[i], "Pyton %s : '%s'", outnames[i], buff); } if(!isflush[i] && (!poll_ret || ret < 4096)) { isflush[i] = 1; PyObject_CallObject(flush[i], NULL); if(PyErr_Occurred()) { pyfcgi_log(LOG_ERR, "Exception while flushing %s", outnames[i]); log_expt(LOG_ERR); exit(PYFCGI_WORKER_FAIL); } } }while(poll_ret && !isflush[i]); } } void worker_piper_sighandler(int signum) { worker_piper_sigrcv = 1; //pyfcgi_log(LOG_DEBUG, "SIG"); } int ctl_get_rep_sz(int ctl_pipe, size_t* rep_sz) { struct pollfd fds; int ret, err; fds.fd = ctl_pipe; fds.events = POLLIN; fds.revents = 0; if( (ret = poll(&fds, 1, 0)) < 0) { err = errno; pyfcgi_log(LOG_ERR, "Failed to poll ctl pipe : %s", strerror(err)); return -1; } if(!ret) { pyfcgi_log(LOG_ERR, "No data in ctl pipe for rep_sz..."); return -1; } ret = read(ctl_pipe, rep_sz, sizeof(size_t)); if(ret < 0) { pyfcgi_log(LOG_ERR, "Error reading ctl pipe : %s", strerror(errno)); return -1; } if(ret < sizeof(size_t)) { pyfcgi_log(LOG_ERR, "Incomplete read from ctl pipe, no rep_sz..."); return -1; } return 0; } static void worker_set_idle() { int err; if(_worker_idle) { return; } if(sem_post(PyFCGI_SEM(SEM_WSTATE).sem) < 0) { err = errno; pyfcgi_log(LOG_ERR, "error incrementing the WSTATE semaphore : %s", strerror(err)); return; } _worker_idle = 1; } static void worker_set_busy() { int err; short retry; struct timespec timeout; timeout.tv_sec = 0; timeout.tv_nsec = 100000; //0.0001s if(!_worker_idle) { return; } /**@todo The pool handler WILL decrement the sem to figure if the pool * is busy -__- Using sem_wait make sure that the worker will be able * to set busy, but it can also freeze if not able to set busy.... * sem_timedwait require to get abstime -_- The better way is * maybe to nanosleep 0.01s and retry a sem_trywait... SysV sem are * better T_T */ err = 0; retry = 10; do { err = 0; if(sem_trywait(PyFCGI_SEM(SEM_WSTATE).sem) < 0) { err = errno; if(err == EAGAIN) { nanosleep(&timeout, NULL); err = 1; } else { pyfcgi_log(LOG_ERR, "error decrementing the WSTATE semaphore : %s", strerror(err)); kill(PyFCGI_conf.context.ppid, SIGTERM); return; } } retry--; }while(err & retry); if(err) { pyfcgi_log(LOG_ALERT, "Unable to set busy ! WSTATE sem is allready 0 !!!"); _worker_idle = 0; kill(PyFCGI_conf.context.ppid, SIGTERM); kill(PyFCGI_conf.context.pid, SIGTERM); return; } _worker_idle = 0; } void worker_sighandler(int signum) { pyfcgi_log(LOG_INFO, "%s signal received, exiting...", strsignal(signum)); worker_set_busy(); pyfcgi_IPC_close(IPC_WSTATE | IPC_WREQS); exit(0); } void worker_sigalrmhandler(int signum) { _wtimeout = 1; if(!_worker_idle) { // Call PyErr_Interrupt() in order to stop python PyErr_SetInterrupt(); } worker_set_busy(); pyfcgi_IPC_close(IPC_WSTATE | IPC_WREQS); return; }