/*
* Copyright (C) 2019 Weber Yann
*
* This file is part of PyFCGI.
*
* PyFCGI is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* PyFCGI is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with PyFCGI. If not, see .
*/
#include "responder.h"
/**@brief Exit after closing all stuff like semaphores
* @ingroup work_master_proc */
static void clean_exit(int status)
{
pyfcgi_IPC_close(IPC_WSTATE | IPC_WREQS | IPC_SEMST | IPC_SHMST);
pyfcgi_IPC_destroy(IPC_WSTATE | IPC_WREQS | IPC_SEMST | IPC_SHMST);
exit(status);
}
pid_t spawn_pool_handler()
{
pid_t res;
struct sigaction act;
act.sa_handler = pyfcgi_sighandler_drop;
sigemptyset(&act.sa_mask);
act.sa_flags = 0;
act.sa_restorer = NULL;
res = fork();
if(res < 0)
{
pyfcgi_log(LOG_ALERT, "Failed to fork pool_handler : %s",
strerror(errno));
return -1;
}
if(!res)
{
if(sigaction(SIGINT, &act, NULL))
{
pyfcgi_log(LOG_WARNING,
"Unable to sigaction SIGINT handler : %s",
strerror(errno));
}
responder_loop();
exit((unsigned char)-1);
}
return res;
}
void init_context()
{
PyFCGI_conf.context.pid = getpid();
PyFCGI_conf.context.ppid = getppid();
if(pyfcgi_IPC_create(IPC_WSTATE | IPC_WREQS | IPC_SEMST | IPC_SHMST) < 0)
{
pyfcgi_log(LOG_ALERT, "Pool handler process is unable to init IPC components");
sleep(1);
clean_exit(PYFCGI_FATAL);
}
if(sem_post(PyFCGI_SEM(SEM_STATS).sem) < 0)
{
pyfcgi_log(LOG_ALERT, "Unable to POST stat semaphore : %s",
strerror(errno));
clean_exit(PYFCGI_FATAL);
}
}
int responder_loop()
{
unsigned int n_wrk, wanted_n, n;
pid_t *wrk_pids;
int err;
int status;
pid_t ret;
/**@brief poll timeout */
struct timespec timeout;
/**@brief watchdog timeout */
struct timespec pool_timeout;
time_t idle_start, busy_start;
short idle, busy;
struct sigaction act;
char *statusstr;
time_t last_update, now;
act.sa_handler = pool_sighandler;
sigemptyset(&act.sa_mask);
sigaddset(&act.sa_mask, SIGTERM);
act.sa_flags = 0;
act.sa_restorer = NULL;
if(sigaction(SIGTERM, &act, NULL))
{
pyfcgi_log(LOG_ALERT,
"Sigaction error for SIGTERM pool process : %s",
strerror(errno));
exit(PYFCGI_FATAL);
}
timeout.tv_sec = 0;
timeout.tv_nsec = 100000000;
idle = busy = 0;
pyfcgi_logger_set_ident("Workpool");
if(PyFCGI_conf.pool_timeout)
{
pool_timeout.tv_nsec = 0;
pool_timeout.tv_sec = PyFCGI_conf.pool_timeout;
pyfcgi_wd_init(pool_wd_sighandler, &pool_timeout);
}
pyfcgi_log(LOG_INFO, "Preparing workers");
init_context();
pyfcgi_wd_arm();
PyFCGI_conf.context.wrk_pids = &wrk_pids;
PyFCGI_conf.context.n_wrk = 0;
wrk_pids = malloc(sizeof(int) * PyFCGI_conf.max_wrk);
if(!wrk_pids)
{
err = errno;
pyfcgi_log( LOG_ALERT,
"Unable to allocate memory for childs PID : %s",
strerror(err));
clean_exit(err);
}
bzero(wrk_pids, sizeof(int) * PyFCGI_conf.max_wrk);
wanted_n = PyFCGI_conf.min_wrk;
n_wrk = 0;
// prespawning minimum worker count
for(n_wrk=0; n_wrk < wanted_n; n_wrk++)
{
wrk_pids[n_wrk] = spawn(n_wrk);
PyFCGI_conf.context.n_wrk = n_wrk;
}
//Wait at least for a process to be ready
while(!pyfcgi_pool_idle(&timeout));
last_update = 0;
// main loop, taking care to restart terminated workers,
// spawn new one if needed, etc.
while(1)
{
pyfcgi_wd_arm();
PyFCGI_conf.context.n_wrk = n_wrk;
if(last_update != (now = time(NULL)))
{
pyfcgi_pool_shm_update(n_wrk);
last_update = now;
}
if( (ret = waitpid(0, &status, WNOHANG)) )
{
if(ret < 0)
{
//TODO : error
}
for(n=0; n PyFCGI_conf.worker_gc_timeout &&
wanted_n > PyFCGI_conf.min_wrk
&& n_wrk - wanted_n < 2)
{
wanted_n--;
idle = 0;
}
}
else
{
idle = 0;
if(!busy)
{
busy = 1;
busy_start = time(NULL);
}
else if(time(NULL) - busy_start > 0 &&
wanted_n < PyFCGI_conf.max_wrk)
{
pyfcgi_log( LOG_DEBUG,
"All workers busy, spawning a new one");
n = n_wrk;
n_wrk++;
wanted_n = n_wrk;
wrk_pids[n] = spawn(n);
if(!PyFCGI_conf.worker_fast_spawn)
{
busy_start = time(NULL);
}
}
}
// Stopping & deleting useless childs
if(wanted_n < n_wrk && idle)
{ // need to shift the list and dec n_wrk
busy = 0;
n_wrk--;
kill(wrk_pids[n_wrk], SIGTERM);
nanosleep(&timeout, NULL);
if( (ret = waitpid(wrk_pids[n_wrk], &status, WNOHANG)) < 0 )
{
pyfcgi_log(LOG_ERR, "Pool idle since %ds but unable to kill child %d (PID %d)",
PyFCGI_conf.worker_gc_timeout,
n_wrk, wrk_pids[n_wrk]);
kill(wrk_pids[n_wrk], SIGKILL);
}
else
{
pyfcgi_log(LOG_INFO, "Pool idle since %ds : worker[%d](%d) killed",
PyFCGI_conf.worker_gc_timeout,
n_wrk, wrk_pids[n_wrk]);
}
idle = 0;
continue;
}
nanosleep(&timeout, NULL);
}
pyfcgi_wd_arm();
//Debug wait & exit
for(; n_wrk != 0; n_wrk--)
{
waitpid(wrk_pids[n_wrk], &status, 0);
pyfcgi_log(LOG_DEBUG, "Child %d stopped with status %d",
wrk_pids[n_wrk], status);
PyFCGI_conf.context.n_wrk = n_wrk;
}
//printf("Content-Type: text/html\r\n\r\nHello world !\n");
pyfcgi_wd_stop();
pyfcgi_log(LOG_INFO,"Child workers stoped, stopping responder");
exit(0);
}
pid_t spawn(int wrk_id)
{
pid_t res;
struct timespec wd_timeout;
struct sigaction act;
char ident[128];
act.sa_handler = worker_sighandler;
sigemptyset(&act.sa_mask);
act.sa_flags = 0;
act.sa_restorer = NULL;
res = fork();
if(res == -1)
{
pyfcgi_log(LOG_ERR, "Fork fails for worker #%d : %s",
wrk_id, strerror(errno));
return -1;
}
else if(!res)
{
// Child process
PyFCGI_conf.context.ppid = PyFCGI_conf.context.pid;
PyFCGI_conf.context.pid = getpid();
snprintf(ident, 128, "Worker%2d", wrk_id);
pyfcgi_logger_set_ident(ident);
// Init IPC components
if(pyfcgi_IPC_init(IPC_WSTATE | IPC_WREQS) < 0)
{
pyfcgi_log(LOG_ALERT, "Unable to initialize semaphore when spawning process...");
exit(PYFCGI_FATAL);
}
// Set handler for SIGINT & SIGTERM
/*
if(sigaction(SIGINT, &(PyFCGI_conf.context.master_old_sigint),
NULL))
{
pyfcgi_log(LOG_ALERT,
"Sigaction error for worker process when restoring SIGINT handler: %s",
strerror(errno));
exit(PYFCGI_FATAL);
}
*/
if(sigaction(SIGTERM, &act, NULL))
{
pyfcgi_log(LOG_ALERT,
"Sigaction error for worker process : %s",
strerror(errno));
exit(PYFCGI_FATAL);
}
// Set watchdog
if(PyFCGI_conf.worker_timeout)
{
wd_timeout.tv_nsec = 0;
wd_timeout.tv_sec = PyFCGI_conf.worker_timeout;
pyfcgi_wd_init(worker_sigalrmhandler, &wd_timeout);
}
if(PyFCGI_conf.pep333)
{
exit(work333(wrk_id));
}
else
{
exit(work(wrk_id));
}
}
pyfcgi_IPC_init(IPC_WSTATE | IPC_WREQS | IPC_SEMST);
// Sleep to avoid spawning like hell thinking all workers are
// busy. Let some time to this one to go up...
// TODO: find a better way to avoid spawning to max_wrk
//nanosleep(&timeout, NULL);
pyfcgi_log( LOG_INFO,
"Worker #%d spawned with PID %d", wrk_id, res);
return res;
}
int pyfcgi_pool_state()
{
int err, res;
if(sem_getvalue(PyFCGI_SEM(SEM_WSTATE).sem, &res) < 0)
{
err = errno;
pyfcgi_log(LOG_ALERT, "Unable to read WSTATE semaphore value : %s",
strerror(err));
clean_exit(PYFCGI_FATAL);
}
return res;
}
int pyfcgi_pool_idle(const struct timespec *timeout)
{
int err;
struct timespec abs_timeout;
if(clock_gettime(CLOCK_REALTIME_COARSE, &abs_timeout) < 0)
{
//clock error
pyfcgi_log(LOG_WARNING, "Unable to fetch asbtime for WSTATE sem_timedwait : %s",
strerror(errno));
}
abs_timeout.tv_sec += timeout->tv_sec;
if(abs_timeout.tv_nsec + timeout->tv_nsec > 999999999)
{
abs_timeout.tv_nsec = abs_timeout.tv_nsec + timeout->tv_nsec - 999999999;
abs_timeout.tv_sec +=1;
}
else
{
abs_timeout.tv_nsec = timeout->tv_nsec;
}
if(sem_timedwait(PyFCGI_SEM(SEM_WSTATE).sem, &abs_timeout) < 0)
{
err = errno;
switch(err)
{
case ETIMEDOUT:
case EAGAIN:
return 0; //busy
case EINVAL:
sleep(1);
return 1;
default:
pyfcgi_log(LOG_ALERT, "Unable to wait WSTATE sem : %s",
strerror(err));
clean_exit(PYFCGI_FATAL);
}
}
sem_post(PyFCGI_SEM(SEM_WSTATE).sem); //Hope no worker fails to set busy...
return 1; //idle
}
void pool_sighandler(int signum)
{
unsigned int i, retry;
int status, ret;
struct timespec req;
pyfcgi_log(LOG_NOTICE, "Received signal %s, cleaning & exiting...",
strsignal(signum));
if(PyFCGI_conf.context.n_wrk < 1) { clean_exit(0); }
for(i=0; i= 5)
{
pyfcgi_log(LOG_ALERT,
"Deadlock on SEM_STATS");
clean_exit(PYFCGI_FATAL);
}
nanosleep(&req, NULL);
continue;
}
pyfcgi_log(LOG_ALERT,
"Unable to wait stats semaphore : %s",
strerror(err));
clean_exit(PYFCGI_FATAL);
}
break;
}
data = (pyfcgi_stats_shm_t*)PyFCGI_conf.shm.ptr;
data->nworker = nworker;
if(sem_getvalue(PyFCGI_SEM(SEM_WSTATE).sem, &(data->pool_load)) < 0)
{
data->pool_load = -1;
}
if(sem_post(PyFCGI_SEM(SEM_STATS).sem) < 0)
{
pyfcgi_log(LOG_ALERT, "Unable to post sem at shm update : %s",
strerror(errno));
clean_exit(PYFCGI_FATAL);
}
}