Tests about a simple python3 fastcgi runner using libfcgi and the Python-C API.
python
c
wsgi
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

pyworker.c 14KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614
  1. /*
  2. * Copyright (C) 2019 Weber Yann
  3. *
  4. * This file is part of PyFCGI.
  5. *
  6. * PyFCGI is free software: you can redistribute it and/or modify
  7. * it under the terms of the GNU Affero General Public License as published by
  8. * the Free Software Foundation, either version 3 of the License, or
  9. * any later version.
  10. *
  11. * PyFCGI is distributed in the hope that it will be useful,
  12. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  14. * GNU Affero General Public License for more details.
  15. *
  16. * You should have received a copy of the GNU Affero General Public License
  17. * along with PyFCGI. If not, see <http://www.gnu.org/licenses/>.
  18. */
  19. #include "pyworker.h"
  20. /**@brief Indicate that a worker is idle
  21. * @param int semid */
  22. static inline void worker_set_idle(int);
  23. /**@brief Indicate that a worker is busy
  24. * @param int semid */
  25. static inline void worker_set_busy(int);
  26. /**@brief Process results from a pep333 worker
  27. * @param FCGX_Stream* out stream from libFCGI
  28. * @param PyObject* application function returned value
  29. */
  30. static inline int work333_send_result(FCGX_Stream*, PyObject* ret);
  31. static int worker_piper_sigrcv = 0;
  32. int work333(int wrk_id, int semid)
  33. {
  34. PyObject *entry_fun, *pyflush[2], *py_osmod, *entry_ret, *environ,
  35. *start_response, *args;
  36. FCGX_Stream *in_stream, *out_stream, *err_stream;
  37. char **envp;
  38. int count, pipe_out[2], pipe_err[2];
  39. int max_reqs;
  40. char ident[128];
  41. struct timeval start, stop;
  42. max_reqs = PyFCGI_conf.max_reqs;
  43. snprintf(ident, 128, "Worker[%d]", wrk_id);
  44. pyfcgi_logger_set_ident(ident);
  45. pyfcgi_log(LOG_INFO, "Worker started with PEP333 App");
  46. pyinit();
  47. pyfcgi_log(LOG_DEBUG, "Python started");
  48. update_python_fd(pipe_out, pipe_err);
  49. fetch_pyflush(&(pyflush[0]), &(pyflush[1]));
  50. //importing os
  51. py_osmod = python_osmod();
  52. // loading module
  53. entry_fun = import_entrypoint();
  54. pyfcgi_log(LOG_INFO, "Waiting request with %s.%s()",
  55. PyFCGI_conf.py_entrymod, PyFCGI_conf.py_entryfun);
  56. worker_set_idle(semid); //before failing on import
  57. if(!entry_fun) //but exit if import failed
  58. {
  59. pyfcgi_log(LOG_ALERT, "Unable to import entrypoint");
  60. exit(PYFCGI_FATAL);
  61. }
  62. start_response = get_start_response();
  63. // requests accepting loop
  64. count = 0;
  65. while ((!count || count != max_reqs) &&
  66. FCGX_Accept(&in_stream, &out_stream, &err_stream, &envp) >= 0)
  67. {
  68. gettimeofday(&start, NULL);
  69. worker_set_busy(semid);
  70. count++;
  71. environ = update_pyenv(py_osmod, envp);
  72. libpyfcgi_clean_response();
  73. libpyfcgi.out = out_stream;
  74. libpyfcgi.in = in_stream;
  75. args = Py_BuildValue("OO", environ, start_response);
  76. // Call application function
  77. entry_ret = PyObject_CallObject(entry_fun, args);
  78. if(entry_ret && entry_ret != Py_None )
  79. {
  80. Py_INCREF(entry_ret);
  81. }
  82. if(PyErr_Occurred())
  83. {
  84. pyfcgi_log(LOG_ERR, "PEP333 entrypoint function triggered an exception");
  85. log_expt(LOG_ERR);
  86. }
  87. // able to process returned value
  88. // Simulate python call of libpyfcgi.write_body()
  89. if(entry_ret && entry_ret != Py_None)
  90. {
  91. _pyfcgi_write_body(entry_ret);
  92. if(PyErr_Occurred())
  93. {
  94. log_expt(LOG_ERR);
  95. }
  96. Py_DECREF(entry_ret);
  97. }
  98. // clean stuffs
  99. Py_DECREF(args);
  100. Py_DECREF(environ);
  101. // flush & logs pystdout & pystderr
  102. worker_log_pipes(pipe_out[0], pipe_err[0], pyflush);
  103. FCGX_FClose(out_stream);
  104. FCGX_FClose(in_stream);
  105. FCGX_FClose(err_stream);
  106. FCGI_Finish();
  107. worker_set_idle(semid);
  108. gettimeofday(&stop, NULL);
  109. stop.tv_sec = stop.tv_sec - start.tv_sec;
  110. stop.tv_usec = stop.tv_usec - start.tv_usec;
  111. if(stop.tv_usec < 0)
  112. {
  113. stop.tv_usec += 1000000;
  114. stop.tv_sec -= 1;
  115. }
  116. pyfcgi_log(LOG_DEBUG, "Worker[%d] request %d END [OK] %lu bytes in %ld.%06lds",
  117. wrk_id, count, libpyfcgi.rep_sz, stop.tv_sec, stop.tv_usec);
  118. }
  119. return 0;
  120. }
  121. static inline int work333_send_result(FCGX_Stream *out, PyObject* ret)
  122. {
  123. return 0;
  124. }
  125. int work(int wrk_id, int semid)
  126. {
  127. PyObject *entry_fun, *pystdout_flush, *pystderr_flush, *py_osmod,
  128. *environ;
  129. FCGX_Stream *in_stream, *out_stream, *err_stream;
  130. char **envp;
  131. int count, pipe_out[2], pipe_err[2], pipe_ctl[2], err, piper_status;
  132. int max_reqs;
  133. struct sigaction act;
  134. struct timeval start, stop;
  135. sigset_t emptyset;
  136. char buf[PIPE_BUF];
  137. size_t rep_sz;
  138. piper_args_t piper_args;
  139. char *piper_stack;
  140. char ident[128];
  141. snprintf(ident, 128, "Worker[%d]", wrk_id);
  142. pyfcgi_logger_set_ident(ident);
  143. max_reqs = PyFCGI_conf.max_reqs;
  144. piper_args.wrk_id = wrk_id;
  145. piper_args.act = &act;
  146. // preparing sigaction for piper
  147. if(sigemptyset(&emptyset))
  148. {
  149. err = errno;
  150. pyfcgi_log( LOG_ALERT, "sigemptyset fails in piper : %s",
  151. strerror(err));
  152. exit(err);
  153. }
  154. act.sa_handler = worker_piper_sighandler;
  155. act.sa_mask = emptyset;
  156. //act.sa_flags = SA_RESTART;
  157. act.sa_flags = 0;
  158. act.sa_restorer = NULL;
  159. if( !(piper_stack = malloc(PIPER_STACK_SZ)) )
  160. {
  161. err = errno;
  162. pyfcgi_log(LOG_ALERT, "Error while allocating piper stack : %s",
  163. strerror(err));
  164. exit(err);
  165. }
  166. pyfcgi_log(LOG_INFO, "Worker %d started", wrk_id);
  167. update_python_path(); // add cwd to python path
  168. Py_Initialize(); // "start" python
  169. update_python_fd(pipe_out, pipe_err);
  170. piper_args.pystdout = pipe_out[0];
  171. piper_args.pystderr = pipe_err[0];
  172. fetch_pyflush(&pystdout_flush, &pystderr_flush);
  173. pyfcgi_log( LOG_INFO,
  174. "Worker[%d] Python started", wrk_id);
  175. //importing os
  176. py_osmod = python_osmod();
  177. // loading module
  178. entry_fun = import_entrypoint();
  179. pyfcgi_log( LOG_INFO,
  180. "Worker[%d] Waiting request with %s.%s()", wrk_id,
  181. PyFCGI_conf.py_entrymod, PyFCGI_conf.py_entryfun);
  182. worker_set_idle(semid); //before failing on import
  183. if(!entry_fun) //but exit if import failed
  184. {
  185. pyfcgi_log(LOG_ALERT, "Unable to import entrypoint");
  186. exit(PYFCGI_FATAL);
  187. }
  188. if(pipe(pipe_ctl) == -1)
  189. {
  190. err = errno;
  191. pyfcgi_log(LOG_ALERT,
  192. "Worker[%d] Pipe fails for piper ctl : %s",
  193. wrk_id, strerror(err));
  194. exit(err);
  195. }
  196. piper_args.ctl_pipe = pipe_ctl[1];
  197. count = 0;
  198. while ((!count || count != max_reqs) && FCGX_Accept(&in_stream, &out_stream, &err_stream, &envp) >= 0)
  199. {
  200. gettimeofday(&start, NULL);
  201. worker_set_busy(semid);
  202. count++;
  203. piper_args.out = out_stream;
  204. //piper_args.req_id = count;
  205. worker_piper_sigrcv = 0;
  206. // if(!PyFCGI_conf.pep333)
  207. /**@todo avoid clone on each request... (using named pipes ?) */
  208. pid_t pid = clone(worker_piper, piper_stack + PIPER_STACK_SZ - 1,
  209. CLONE_CHILD_CLEARTID | CLONE_CHILD_SETTID | \
  210. SIGCHLD | \
  211. CLONE_FILES | CLONE_FS | CLONE_IO | CLONE_VM,
  212. &piper_args);
  213. if(pid < 0)
  214. {
  215. err = errno;
  216. pyfcgi_log(LOG_ALERT,
  217. "Worker[%d] req #%d Fork failed for piper : %s",
  218. wrk_id, count, strerror(err));
  219. exit(err);
  220. }
  221. environ = update_pyenv(py_osmod, envp);
  222. Py_DECREF(environ);
  223. //close(pipe_ctl[1]);
  224. PyObject_CallObject(entry_fun, NULL);
  225. if(PyErr_Occurred())
  226. {
  227. log_expt(LOG_ERR);
  228. }
  229. else
  230. {
  231. /*
  232. pyfcgi_log(LOG_DEBUG, "Worker[%d] request %d funcall [OK]",
  233. wrk_id, count);
  234. */
  235. }
  236. PyObject_CallObject(pystdout_flush, NULL);
  237. PyObject_CallObject(pystderr_flush, NULL);
  238. read(pipe_ctl[0], &buf, 1); // unblock when child ready
  239. FCGX_FClose(in_stream);
  240. FCGX_FClose(err_stream);
  241. //close(pipe_ctl[0]);
  242. kill(pid, WPIPER_SIG); //indicate child python call ended
  243. waitpid(pid, &piper_status, 0);
  244. if(WIFSIGNALED(piper_status))
  245. {
  246. pyfcgi_log(LOG_ERR,
  247. "Woker[%d] req #%d piper terminated by sig %d",
  248. wrk_id, count, WTERMSIG(piper_status));
  249. exit(40);
  250. }
  251. else if(WEXITSTATUS(piper_status))
  252. {
  253. pyfcgi_log(LOG_ERR,
  254. "Woker[%d] req #%d piper exited with error status %d",
  255. wrk_id, count, WEXITSTATUS(piper_status));
  256. exit(41);
  257. }
  258. FCGI_Finish();
  259. if(ctl_get_rep_sz(pipe_ctl[0], &rep_sz))
  260. {
  261. rep_sz = 0;
  262. }
  263. //Increase sem showing the worker is idle
  264. worker_set_idle(semid);
  265. gettimeofday(&stop, NULL);
  266. stop.tv_sec = stop.tv_sec - start.tv_sec;
  267. stop.tv_usec = stop.tv_usec - start.tv_usec;
  268. if(stop.tv_usec < 0)
  269. {
  270. stop.tv_usec += 1000000;
  271. stop.tv_sec -= 1;
  272. }
  273. pyfcgi_log(LOG_DEBUG, "Worker[%d] request %d END [OK] %lu bytes in %ld.%06lds",
  274. wrk_id, count, rep_sz, stop.tv_sec, stop.tv_usec);
  275. }
  276. free(piper_stack);
  277. Py_Exit(count == max_reqs ?0:EXIT_PYERR);
  278. }
  279. /*
  280. void worker_piper(int wrk_id, int req_id, int pystdout, int pystderr,
  281. int ctl_pipe, FCGX_Stream* out)
  282. */
  283. int worker_piper(void *ptr)
  284. {
  285. piper_args_t *args;
  286. struct pollfd fds[2];
  287. int err, ret, cont, poll_ret;
  288. short revents, closed, nfds;
  289. char buf[PIPE_BUF];
  290. size_t out_sz;
  291. args = ptr;
  292. int wrk_id, pystdout, pystderr, ctl_pipe;
  293. //int req_id;
  294. FCGX_Stream *out;
  295. struct sigaction *act;
  296. wrk_id = args->wrk_id;
  297. //req_id = args->req_id;
  298. pystdout = args->pystdout;
  299. pystderr = args->pystderr;
  300. ctl_pipe = args->ctl_pipe;
  301. out = args->out;
  302. act = args->act;
  303. if(sigaction(WPIPER_SIG, act, NULL))
  304. {
  305. err = errno;
  306. pyfcgi_log( LOG_ALERT, "Unable to sigaction in piper : %s",
  307. strerror(err));
  308. exit(err);
  309. }
  310. write(ctl_pipe, " ", 1);
  311. //pyfcgi_log(LOG_DEBUG, "Worker[%d] req #%d piper", wrk_id, req_id);
  312. fds[0].fd = pystderr;
  313. fds[1].fd = pystdout;
  314. fds[0].events = fds[1].events = POLLIN;
  315. fds[0].revents = fds[1].revents = 0;
  316. nfds = 2;
  317. closed = out_sz = 0;
  318. cont = 2;
  319. while(cont)
  320. {
  321. poll_ret = poll(fds, nfds, 0);
  322. //pyfcgi_log(LOG_DEBUG, "Worker[%d] req #%d poll_ret = %d", wrk_id, req_id, poll_ret);
  323. if(poll_ret < 0)
  324. {
  325. err = errno;
  326. if(err == EINTR)
  327. {
  328. continue;
  329. }
  330. pyfcgi_log( LOG_WARNING, "Poll error : %s",
  331. strerror(err));
  332. fds[0].revents = fds[1].revents = 0;
  333. continue;
  334. }
  335. if(!poll_ret && worker_piper_sigrcv)
  336. {
  337. cont--;
  338. continue;
  339. }
  340. if(poll_ret && (revents = fds[1].revents))
  341. {
  342. /*
  343. pyfcgi_log(LOG_DEBUG, "Worker[%d] req #%d STDOUT evt !",
  344. wrk_id, req_id);
  345. */
  346. poll_ret--;
  347. if(revents & POLLIN)
  348. {
  349. /*
  350. pyfcgi_log(LOG_DEBUG, "Worker[%d] req #%d POLLIN STDOUT !",
  351. wrk_id, req_id);
  352. */
  353. ret = read(pystdout, buf, PIPE_BUF);
  354. /*
  355. pyfcgi_log(LOG_DEBUG, "Worker[%d] req #%d read(stdout) ret %d",
  356. wrk_id, req_id, ret);
  357. */
  358. if(ret < 0)
  359. {
  360. err = errno;
  361. if(err == EINTR)
  362. {
  363. continue;
  364. }
  365. pyfcgi_log( LOG_ERR,
  366. "Error reading python stdout : %s",
  367. strerror(err));
  368. exit(err);
  369. }
  370. //buf[ret] = '\0';
  371. ret = FCGX_PutStr(buf, ret, out);
  372. out_sz += ret;
  373. if(ret < PIPE_BUF && worker_piper_sigrcv)
  374. {
  375. FCGX_FClose(out);
  376. FCGI_Finish();
  377. write(ctl_pipe, &out_sz, sizeof(size_t));
  378. closed = 1;
  379. nfds = 1;
  380. }
  381. }
  382. //TODO handle other poll events
  383. }
  384. if(poll_ret && (revents = fds[0].revents))
  385. {
  386. /*
  387. pyfcgi_log(LOG_DEBUG, "Worker[%d] req #%d STDERR evt !",
  388. wrk_id, req_id);
  389. */
  390. if(revents & POLLIN)
  391. {
  392. /*
  393. pyfcgi_log(LOG_DEBUG, "Worker[%d] req #%d POLLIN STDERR !",
  394. wrk_id, req_id);
  395. */
  396. while(1)
  397. {
  398. ret = read(pystderr, buf, PIPE_BUF);
  399. if(ret < 0)
  400. {
  401. err = errno;
  402. if(err == EINTR)
  403. {
  404. continue;
  405. }
  406. pyfcgi_log( LOG_ERR,
  407. "Error reading python stdout : %s",
  408. strerror(err));
  409. exit(err);
  410. }
  411. buf[ret] = '\0';
  412. pyfcgi_log(LOG_ERR,
  413. "Worker[%d] Python error : %s",
  414. wrk_id, buf);
  415. break;
  416. }
  417. }
  418. //TODO handle other poll events
  419. }
  420. fds[0].revents = fds[1].revents = 0;
  421. }
  422. /*
  423. pyfcgi_log(LOG_DEBUG, "Worker[%d] req #%d piper exiting...",
  424. wrk_id, req_id);
  425. */
  426. if(!closed)
  427. {
  428. FCGX_FClose(out);
  429. FCGI_Finish();
  430. write(ctl_pipe, &out_sz, sizeof(size_t));
  431. }
  432. return 0;
  433. }
  434. void worker_log_pipes(int pipe_std, int pipe_err, PyObject *flush[2])
  435. {
  436. size_t ret;
  437. char buff[4096];
  438. int i, poll_ret;
  439. int loglevels[2] = {LOG_INFO, LOG_WARNING};
  440. struct pollfd fds[2];
  441. short isflush[2] = {0,0};
  442. char *outnames[2] = {"sys.stdout", "sys.stderr"};
  443. fds[0].fd = pipe_std;
  444. fds[1].fd = pipe_err;
  445. fds[0].events = fds[1].events = POLLIN;
  446. fds[0].revents = fds[1].revents = 0;
  447. for(i=0; i<2; i++)
  448. {
  449. do
  450. {
  451. poll_ret = poll(&(fds[i]), 1, 0);
  452. if(poll_ret)
  453. {
  454. ret = read(pipe_std, buff, 4096);
  455. if(ret == -1)
  456. {
  457. pyfcgi_log(LOG_ERR, "Error reading python %s pipe : %s",
  458. outnames[i], strerror(errno));
  459. exit(PYFCGI_WORKER_FAIL);
  460. }
  461. pyfcgi_log(loglevels[i], "Pyton %s : '%s'",
  462. outnames[i], buff);
  463. }
  464. if(!isflush[i] && (!poll_ret || ret < 4096))
  465. {
  466. isflush[i] = 1;
  467. PyObject_CallObject(flush[i], NULL);
  468. if(PyErr_Occurred())
  469. {
  470. pyfcgi_log(LOG_ERR, "Exception while flushing %s",
  471. outnames[i]);
  472. log_expt(LOG_ERR);
  473. exit(PYFCGI_WORKER_FAIL);
  474. }
  475. }
  476. }while(poll_ret && !isflush[i]);
  477. }
  478. }
  479. void worker_piper_sighandler(int signum)
  480. {
  481. worker_piper_sigrcv = 1;
  482. //pyfcgi_log(LOG_DEBUG, "SIG");
  483. }
  484. int ctl_get_rep_sz(int ctl_pipe, size_t* rep_sz)
  485. {
  486. struct pollfd fds;
  487. int ret, err;
  488. fds.fd = ctl_pipe;
  489. fds.events = POLLIN;
  490. fds.revents = 0;
  491. if( (ret = poll(&fds, 1, 0)) < 0)
  492. {
  493. err = errno;
  494. pyfcgi_log(LOG_ERR, "Failed to poll ctl pipe : %s",
  495. strerror(err));
  496. return -1;
  497. }
  498. if(!ret)
  499. {
  500. pyfcgi_log(LOG_ERR, "No data in ctl pipe for rep_sz...");
  501. return -1;
  502. }
  503. ret = read(ctl_pipe, rep_sz, sizeof(size_t));
  504. if(ret < 0)
  505. {
  506. pyfcgi_log(LOG_ERR, "Error reading ctl pipe : %s",
  507. strerror(errno));
  508. return -1;
  509. }
  510. if(ret < sizeof(size_t))
  511. {
  512. pyfcgi_log(LOG_ERR, "Incomplete read from ctl pipe, no rep_sz...");
  513. return -1;
  514. }
  515. return 0;
  516. }
  517. static void worker_set_idle(int semid)
  518. {
  519. int err;
  520. struct sembuf sop;
  521. sop.sem_num = 0;
  522. sop.sem_flg = 0;
  523. sop.sem_op = 1;
  524. if(semop(semid, &sop, 1) < 0)
  525. {
  526. err = errno;
  527. pyfcgi_log(LOG_ERR, "error incrementing the semaphore : %s",
  528. strerror(err));
  529. }
  530. }
  531. static void worker_set_busy(int semid)
  532. {
  533. int err;
  534. struct sembuf sop;
  535. sop.sem_num = 0;
  536. sop.sem_flg = 0;
  537. sop.sem_op = -1;
  538. if(semop(semid, &sop, 1) < 0)
  539. {
  540. err = errno;
  541. pyfcgi_log(LOG_ERR, "error incrementing the semaphore : %s",
  542. strerror(err));
  543. }
  544. }
  545. void worker_sighandler(int signum)
  546. {
  547. pyfcgi_log(LOG_INFO, "%s signal received, exiting...", strsignal(signum));
  548. exit(0);
  549. }