Implement on demand commits

This commit is contained in:
Yann Weber 2019-07-01 00:00:35 +02:00
commit d38ab9dacb
5 changed files with 199 additions and 44 deletions

View file

@ -29,7 +29,7 @@
#include "responder.h"
#define IDENT_FMT "pyfcgi[%d]"
#define MAX_REQS 5
#define MAX_REQS 50
#define EARLY_ERR(err_str) write(2, err_str, strlen(err_str))
@ -76,8 +76,8 @@ int main(int argc, char **argv)
}
else if(!child)
{
responder_loop(py_entrypoint, MAX_REQS, 1, 1);
exit(-1);
responder_loop(py_entrypoint, MAX_REQS, 1, 20);
exit((unsigned char)-1);
}
waitpid(child, &child_ret, 0);
if(child_ret)

View file

@ -21,36 +21,18 @@
static int worker_piper_sigrcv = 0;
pid_t spawn(char* py_entrypoint, int wrk_id, int semid, int max_reqs)
{
pid_t res;
res = fork();
if(res == -1)
{
syslog( LOG_ERR, "Fork fails for worker #%d : %s",
wrk_id, strerror(errno));
return -1;
}
else if(!res)
{
// Child process
exit(work(py_entrypoint, wrk_id, semid, max_reqs));
}
syslog( LOG_INFO,
"Worker #%d spawned with PID %d", wrk_id, res);
return res;
}
int work(char* py_entrypoint, int wrk_id, int semid, int max_reqs)
{
PyObject *entry_fun, *pystdout_flush, *pystderr_flush,
*py_osmod;
int count, pipe_out[2], pipe_err[2], pipe_ctl[2], err, piper_status;
struct sembuf sop;
struct sigaction act;
sigset_t emptyset;
char buf[PIPE_BUF];
sop.sem_num = 0;
sop.sem_flg = 0;
// preparing sigaction for piper
if(sigemptyset(&emptyset))
@ -96,12 +78,22 @@ int work(char* py_entrypoint, int wrk_id, int semid, int max_reqs)
count = 0;
while ((!count || count != max_reqs) && FCGI_Accept() >= 0)
{
sop.sem_op = -1; // decrementing sem to show worker busy
if(semop(semid, &sop, 1) < 0)
{
err = errno;
syslog(LOG_ERR,
"Worker[%d] error decrementing the semaphore : %s",
wrk_id, strerror(err));
}
count++;
/*
syslog( LOG_INFO,
"Worker[%d] request %d", wrk_id, count);
*/
worker_piper_sigrcv = 0;
pipe(pipe_ctl); //TODO : check for pipe error
//PyOS_BeforeFork();
pid_t pid = fork();
if(pid < 0)
{
@ -128,7 +120,6 @@ int work(char* py_entrypoint, int wrk_id, int semid, int max_reqs)
//printf("Content-type: text/html\r\n\r\nHello world !\n");
exit(1);
}
//PyOS_AfterFork_Parent();
update_pyenv(py_osmod);
//TODO : check if pipe_ctl lock is really needed anymore
close(pipe_ctl[1]);
@ -139,14 +130,16 @@ int work(char* py_entrypoint, int wrk_id, int semid, int max_reqs)
}
else
{
/*
syslog(LOG_DEBUG, "Worker[%d] request %d funcall [OK]",
wrk_id, count);
*/
}
PyObject_CallObject(pystdout_flush, NULL);
PyObject_CallObject(pystderr_flush, NULL);
read(pipe_ctl[0], &buf, 1); // unblock when child ready
syslog(LOG_DEBUG, "PIPER UNLOCK");
kill(pid, WPIPER_SIG);
kill(pid, WPIPER_SIG); //indicate child python call ended
waitpid(pid, &piper_status, 0);
if(piper_status)
{
@ -154,6 +147,17 @@ syslog(LOG_DEBUG, "PIPER UNLOCK");
"Woker[%d] req #%d piper exited with error status %d",
wrk_id, count, piper_status);
}
//Increase sem showing the worker is idle
sop.sem_op = 1;
if(semop(semid, &sop, 1) < 0)
{
err = errno;
syslog(LOG_ERR,
"Worker[%d] error incrementing the semaphore : %s",
wrk_id, strerror(err));
}
syslog(LOG_DEBUG, "Worker[%d] request %d END [OK]",
wrk_id, count);
}
@ -168,7 +172,7 @@ void worker_piper(int wrk_id, int req_id, int pystdout, int pystderr,
short revents;
char buf[PIPE_BUF];
syslog(LOG_DEBUG, "Worker[%d] req #%d piper", wrk_id, req_id);
//syslog(LOG_DEBUG, "Worker[%d] req #%d piper", wrk_id, req_id);
fds[0].fd = pystdout;
fds[1].fd = pystderr;
@ -180,7 +184,7 @@ void worker_piper(int wrk_id, int req_id, int pystdout, int pystderr,
while(cont)
{
poll_ret = poll(fds, 2, 10);
syslog(LOG_DEBUG, "Worler[%d] req #%d poll_ret = %d", wrk_id, req_id, poll_ret);
//syslog(LOG_DEBUG, "Worker[%d] req #%d poll_ret = %d", wrk_id, req_id, poll_ret);
if(poll_ret < 0)
{
err = errno;
@ -200,16 +204,22 @@ syslog(LOG_DEBUG, "Worler[%d] req #%d poll_ret = %d", wrk_id, req_id, poll_ret);
}
if(poll_ret && (revents = fds[0].revents))
{
/*
syslog(LOG_DEBUG, "Worker[%d] req #%d STDOUT evt !",
wrk_id, req_id);
*/
poll_ret--;
if(revents & POLLIN)
{
/*
syslog(LOG_DEBUG, "Worker[%d] req #%d POLLIN STDOUT !",
wrk_id, req_id);
*/
ret = read(pystdout, buf, PIPE_BUF);
/*
syslog(LOG_DEBUG, "Worker[%d] req #%d read(stdout) ret %d",
wrk_id, req_id, ret);
*/
if(ret < 0)
{
err = errno;
@ -229,12 +239,16 @@ syslog(LOG_DEBUG, "Worler[%d] req #%d poll_ret = %d", wrk_id, req_id, poll_ret);
}
if(poll_ret && (revents = fds[1].revents))
{
/*
syslog(LOG_DEBUG, "Worker[%d] req #%d STDERR evt !",
wrk_id, req_id);
*/
if(revents & POLLIN)
{
/*
syslog(LOG_DEBUG, "Worker[%d] req #%d POLLIN STDERR !",
wrk_id, req_id);
*/
while(1)
{
ret = read(pystderr, buf, PIPE_BUF);
@ -261,8 +275,10 @@ syslog(LOG_DEBUG, "Worler[%d] req #%d poll_ret = %d", wrk_id, req_id, poll_ret);
}
fds[0].revents = fds[1].revents = 0;
}
/*
syslog(LOG_DEBUG, "Worker[%d] req #%d piper exiting...",
wrk_id, req_id);
*/
exit(0);
}
@ -620,7 +636,7 @@ void update_pyenv(PyObject *py_osmod)
}
value++;
*(value-1) = '\0'; // dirty modification of **environ
syslog(LOG_DEBUG, "PySetEnv '%s'='%s'", key, value);
//syslog(LOG_DEBUG, "PySetEnv '%s'='%s'", key, value);
pykey = PyUnicode_DecodeLocale(key, "surrogateescape");
if(!pykey)
{

View file

@ -34,6 +34,8 @@
#include <limits.h>
#include <poll.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/sem.h>
#include <sys/wait.h>
#define EXIT_PYERR 42
@ -44,16 +46,6 @@ extern char **environ;
typedef unsigned long int pywrkid_t;
/**@brief Spawn a worker given an entrypoint
*
* Spawn a new worker process and prepare ENV & request forwarding
* @param char* python_entrypoint a path to a python entrypoint
* @param int worker uid
* @param int semid for FCGI access
* @param int max request before worker restart
* @return child PID
*/
pid_t spawn(char*, int, int, int);
/**@brief the function that initialize the python worker
* @param char* python_entrypoint a path to a python entrypoint

View file

@ -28,10 +28,19 @@ void init_context()
int responder_loop(char *py_entrypoint, unsigned int max_reqs,
unsigned int min_wrk, unsigned int max_wrk)
{
unsigned int n_wrk;
int *wrk_pids;
unsigned int n_wrk, wanted_n, n;
pid_t *wrk_pids;
int semid, err;
int status;
pid_t ret;
struct sembuf sop;
struct timespec timeout;
sop.sem_num = 0;
sop.sem_op = 0;
sop.sem_flg = 0;
timeout.tv_sec = 0;
timeout.tv_nsec = 100000000;
syslog(LOG_INFO, "Preparing workers");
@ -50,10 +59,99 @@ int responder_loop(char *py_entrypoint, unsigned int max_reqs,
semid = new_semaphore();
for(n_wrk=0; n_wrk != min_wrk; n_wrk++)
wanted_n = min_wrk;
n_wrk = 0;
// prespawning minimum worker count
for(n_wrk=0; n_wrk < wanted_n; n_wrk++)
{
wrk_pids[n_wrk] = spawn(py_entrypoint, n_wrk, semid, max_reqs);
}
// main loop, taking care to restart terminated workers,
// spawn new one if needed, etc.
while(1)
{
if( (ret = waitpid(0, &status, WNOHANG)) )
{
if(ret < 0)
{
//TODO : error
}
if(!ret)
{
continue;
}
for(n=0; n<n_wrk; n++)
{
if(wrk_pids[n] == ret)
{
break;
}
}
if(n == n_wrk)
{
syslog(LOG_WARNING,
"Child %d stopped but was notregistered",
ret);
continue;
}
if(status)
{
syslog(LOG_WARNING,
"Worker[%d] exited with status %d",
n, status);
}
else
{
syslog(LOG_INFO,
"Worker[%d] PID %d exited normally",
n, wrk_pids[n]);
}
// child stopped, looking for it
if(wanted_n < n_wrk)
{ // need to shift the list and dec n_wrk
memmove(wrk_pids+n, wrk_pids+n+1,
sizeof(pid_t) * (n_wrk - n));
n_wrk--;
}
else
{ // respawn on same slot
wrk_pids[n] = spawn(py_entrypoint, n,
semid, max_reqs);
}
}
if(n_wrk == max_wrk)
{
nanosleep(&timeout, NULL);
continue;
}
ret = semtimedop(semid, &sop, 1, &timeout);
if(ret < 0)
{
err = errno;
if(err == EAGAIN)
{
// workers idle
if(wanted_n > min_wrk)
{
wanted_n--;
}
continue;
}
syslog(LOG_ERR, "Unable to read semaphore : %s",
strerror(err));
}
if(!ret)
{
syslog( LOG_DEBUG,
"All workers busy, spawning a new one");
n = n_wrk;
n_wrk++;
wanted_n = n_wrk;
wrk_pids[n] = spawn(py_entrypoint, n,
semid, max_reqs);
}
}
//Debug wait & exit
for(n_wrk=0; n_wrk != min_wrk; n_wrk++)
@ -67,6 +165,41 @@ int responder_loop(char *py_entrypoint, unsigned int max_reqs,
exit(0);
}
pid_t spawn(char* py_entrypoint, int wrk_id, int semid, int max_reqs)
{
pid_t res;
struct sembuf sop;
int err;
sop.sem_num = 0;
sop.sem_op = 1;
sop.sem_flg = 0;
if(semop(semid, &sop, 1) < 0)
{
err = errno;
syslog(LOG_ALERT,
"Failed to semop before spawning a child : %s",
strerror(errno));
clean_exit(err);
}
res = fork();
if(res == -1)
{
syslog( LOG_ERR, "Fork fails for worker #%d : %s",
wrk_id, strerror(errno));
return -1;
}
else if(!res)
{
// Child process
exit(work(py_entrypoint, wrk_id, semid, max_reqs));
}
syslog( LOG_INFO,
"Worker #%d spawned with PID %d", wrk_id, res);
return res;
}
int new_semaphore(key_t semkey)
{

View file

@ -18,6 +18,7 @@
*/
#ifndef _RESPONDER__H___
#define _RESPONDER__H___
#include "config.h"
#include <fcgi_stdio.h> /* fcgi library; put it first*/
@ -26,6 +27,7 @@
#include <string.h>
#include <syslog.h>
#include <errno.h>
#include <time.h>
#include <sys/types.h>
#include <sys/ipc.h>
#include <sys/sem.h>
@ -51,6 +53,18 @@ void init_context();
int responder_loop(char *py_entrypoint, unsigned int max_reqs,
unsigned int min_wrk, unsigned int max_wrk);
/**@brief Spawn a worker given an entrypoint
*
* Spawn a new worker process and prepare ENV & request forwarding
* @param char* python_entrypoint a path to a python entrypoint
* @param int worker uid
* @param int semid for FCGI access
* @param int max request before worker restart
* @return child PID
*/
pid_t spawn(char*, int, int, int);
/**@brief Generate a new semaphore from given key
* @note set pyfcgi_semid
* @return int semid