Tests about a simple python3 fastcgi runner using libfcgi and the Python-C API.
python
c
wsgi
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

pyworker.c 13KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557
  1. /*
  2. * Copyright (C) 2019 Weber Yann
  3. *
  4. * This file is part of PyFCGI.
  5. *
  6. * PyFCGI is free software: you can redistribute it and/or modify
  7. * it under the terms of the GNU Affero General Public License as published by
  8. * the Free Software Foundation, either version 3 of the License, or
  9. * any later version.
  10. *
  11. * PyFCGI is distributed in the hope that it will be useful,
  12. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  13. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  14. * GNU Affero General Public License for more details.
  15. *
  16. * You should have received a copy of the GNU Affero General Public License
  17. * along with PyFCGI. If not, see <http://www.gnu.org/licenses/>.
  18. */
  19. #include "pyworker.h"
  20. /**@brief Indicate that a worker is idle
  21. * @param int semid */
  22. static inline void worker_set_idle(int);
  23. /**@brief Indicate that a worker is busy
  24. * @param int semid */
  25. static inline void worker_set_busy(int);
  26. /**@brief Process results from a pep333 worker
  27. * @param FCGX_Stream* out stream from libFCGI
  28. * @param PyObject* application function returned value
  29. */
  30. static inline int work333_send_result(FCGX_Stream*, PyObject* ret);
  31. static int worker_piper_sigrcv = 0;
  32. int work333(int wrk_id, int semid)
  33. {
  34. PyObject *entry_fun, *pyflush[2], *py_osmod, *entry_ret, *environ,
  35. *start_response, *args;
  36. FCGX_Stream *in_stream, *out_stream, *err_stream;
  37. char **envp;
  38. int count, pipe_out[2], pipe_err[2], err;
  39. int max_reqs;
  40. char ident[128];
  41. struct timeval start, stop;
  42. max_reqs = PyFCGI_conf.max_reqs;
  43. snprintf(ident, 128, "Worker[%d]", wrk_id);
  44. pyfcgi_logger_set_ident(ident);
  45. pyfcgi_log(LOG_INFO, "Worker started with PEP333 App");
  46. pyinit();
  47. pyfcgi_log(LOG_DEBUG, "Python started");
  48. update_python_fd(pipe_out, pipe_err);
  49. fetch_pyflush(&(pyflush[0]), &(pyflush[1]));
  50. //importing os
  51. py_osmod = python_osmod();
  52. // loading module
  53. entry_fun = import_entrypoint();
  54. pyfcgi_log(LOG_INFO, "Waiting request with %s.%s()",
  55. PyFCGI_conf.py_entrymod, PyFCGI_conf.py_entryfun);
  56. worker_set_idle(semid); //before failing on import
  57. if(!entry_fun) //but exit if import failed
  58. {
  59. pyfcgi_log(LOG_ALERT, "Unable to import entrypoint");
  60. exit(PYFCGI_FATAL);
  61. }
  62. start_response = get_start_response();
  63. // requests accepting loop
  64. count = 0;
  65. while ((!count || count != max_reqs) &&
  66. FCGX_Accept(&in_stream, &out_stream, &err_stream, &envp) >= 0)
  67. {
  68. gettimeofday(&start, NULL);
  69. worker_set_busy(semid);
  70. pyfcgi_log(LOG_DEBUG, "Working !!");
  71. count++;
  72. environ = update_pyenv(py_osmod, envp);
  73. libpyfcgi_clean_response();
  74. libpyfcgi.out = out_stream;
  75. args = Py_BuildValue("OO", environ, start_response);
  76. pyfcgi_log(LOG_DEBUG, "Calling entrypoint :D");
  77. entry_ret = PyObject_CallObject(entry_fun, args);
  78. Py_INCREF(entry_ret);
  79. if(PyErr_Occurred())
  80. {
  81. log_expt(LOG_ERR);
  82. }
  83. // able to process returned value
  84. // Simulate python call of libpyfcgi.write_body()
  85. pyfcgi_log(LOG_DEBUG, "Writing body !");
  86. _pyfcgi_write_body(entry_ret);
  87. if(PyErr_Occurred())
  88. {
  89. log_expt(LOG_ERR);
  90. }
  91. pyfcgi_log(LOG_DEBUG, "Cleaning...");
  92. // clean stuffs
  93. Py_DECREF(args);
  94. Py_DECREF(environ);
  95. Py_DECREF(entry_ret);
  96. // flush & logs pystdout & pystderr
  97. PyObject_CallObject(pyflush[0], NULL);
  98. PyObject_CallObject(pyflush[1], NULL);
  99. FCGX_FClose(out_stream);
  100. FCGX_FClose(in_stream);
  101. FCGX_FClose(err_stream);
  102. FCGI_Finish();
  103. worker_set_idle(semid);
  104. gettimeofday(&stop, NULL);
  105. stop.tv_sec = stop.tv_sec - start.tv_sec;
  106. stop.tv_usec = stop.tv_usec - start.tv_usec;
  107. if(stop.tv_usec < 0)
  108. {
  109. stop.tv_usec += 1000000;
  110. stop.tv_sec -= 1;
  111. }
  112. pyfcgi_log(LOG_DEBUG, "Worker[%d] request %d END [OK] in %ld.%06lds",
  113. wrk_id, count, stop.tv_sec, stop.tv_usec);
  114. }
  115. return 0;
  116. }
  117. static inline int work333_send_result(FCGX_Stream *out, PyObject* ret)
  118. {
  119. return 0;
  120. }
  121. int work(int wrk_id, int semid)
  122. {
  123. PyObject *entry_fun, *pystdout_flush, *pystderr_flush, *py_osmod,
  124. *environ;
  125. FCGX_Stream *in_stream, *out_stream, *err_stream;
  126. char **envp;
  127. int count, pipe_out[2], pipe_err[2], pipe_ctl[2], err, piper_status;
  128. int max_reqs;
  129. struct sigaction act;
  130. struct timeval start, stop;
  131. sigset_t emptyset;
  132. char buf[PIPE_BUF];
  133. size_t rep_sz;
  134. piper_args_t piper_args;
  135. char *piper_stack;
  136. char ident[128];
  137. snprintf(ident, 128, "Worker[%d]", wrk_id);
  138. pyfcgi_logger_set_ident(ident);
  139. max_reqs = PyFCGI_conf.max_reqs;
  140. piper_args.wrk_id = wrk_id;
  141. piper_args.act = &act;
  142. // preparing sigaction for piper
  143. if(sigemptyset(&emptyset))
  144. {
  145. err = errno;
  146. pyfcgi_log( LOG_ALERT, "sigemptyset fails in piper : %s",
  147. strerror(err));
  148. exit(err);
  149. }
  150. act.sa_handler = worker_piper_sighandler;
  151. act.sa_mask = emptyset;
  152. //act.sa_flags = SA_RESTART;
  153. act.sa_flags = 0;
  154. act.sa_restorer = NULL;
  155. if( !(piper_stack = malloc(PIPER_STACK_SZ)) )
  156. {
  157. err = errno;
  158. pyfcgi_log(LOG_ALERT, "Error while allocating piper stack : %s",
  159. strerror(err));
  160. exit(err);
  161. }
  162. pyfcgi_log(LOG_INFO, "Worker %d started", wrk_id);
  163. update_python_path(); // add cwd to python path
  164. Py_Initialize(); // "start" python
  165. update_python_fd(pipe_out, pipe_err);
  166. piper_args.pystdout = pipe_out[0];
  167. piper_args.pystderr = pipe_err[0];
  168. fetch_pyflush(&pystdout_flush, &pystderr_flush);
  169. pyfcgi_log( LOG_INFO,
  170. "Worker[%d] Python started", wrk_id);
  171. //importing os
  172. py_osmod = python_osmod();
  173. // loading module
  174. entry_fun = import_entrypoint();
  175. pyfcgi_log( LOG_INFO,
  176. "Worker[%d] Waiting request with %s.%s()", wrk_id,
  177. PyFCGI_conf.py_entrymod, PyFCGI_conf.py_entryfun);
  178. worker_set_idle(semid); //before failing on import
  179. if(!entry_fun) //but exit if import failed
  180. {
  181. pyfcgi_log(LOG_ALERT, "Unable to import entrypoint");
  182. exit(PYFCGI_FATAL);
  183. }
  184. if(pipe(pipe_ctl) == -1)
  185. {
  186. err = errno;
  187. pyfcgi_log(LOG_ALERT,
  188. "Worker[%d] Pipe fails for piper ctl : %s",
  189. wrk_id, strerror(err));
  190. exit(err);
  191. }
  192. piper_args.ctl_pipe = pipe_ctl[1];
  193. count = 0;
  194. while ((!count || count != max_reqs) && FCGX_Accept(&in_stream, &out_stream, &err_stream, &envp) >= 0)
  195. {
  196. gettimeofday(&start, NULL);
  197. worker_set_busy(semid);
  198. count++;
  199. piper_args.out = out_stream;
  200. //piper_args.req_id = count;
  201. worker_piper_sigrcv = 0;
  202. // if(!PyFCGI_conf.pep333)
  203. /**@todo avoid clone on each request... (using named pipes ?) */
  204. pid_t pid = clone(worker_piper, piper_stack + PIPER_STACK_SZ - 1,
  205. CLONE_CHILD_CLEARTID | CLONE_CHILD_SETTID | \
  206. SIGCHLD | \
  207. CLONE_FILES | CLONE_FS | CLONE_IO | CLONE_VM,
  208. &piper_args);
  209. if(pid < 0)
  210. {
  211. err = errno;
  212. pyfcgi_log(LOG_ALERT,
  213. "Worker[%d] req #%d Fork failed for piper : %s",
  214. wrk_id, count, strerror(err));
  215. exit(err);
  216. }
  217. environ = update_pyenv(py_osmod, envp);
  218. Py_DECREF(environ);
  219. //close(pipe_ctl[1]);
  220. PyObject_CallObject(entry_fun, NULL);
  221. if(PyErr_Occurred())
  222. {
  223. log_expt(LOG_ERR);
  224. }
  225. else
  226. {
  227. /*
  228. pyfcgi_log(LOG_DEBUG, "Worker[%d] request %d funcall [OK]",
  229. wrk_id, count);
  230. */
  231. }
  232. PyObject_CallObject(pystdout_flush, NULL);
  233. PyObject_CallObject(pystderr_flush, NULL);
  234. read(pipe_ctl[0], &buf, 1); // unblock when child ready
  235. FCGX_FClose(in_stream);
  236. FCGX_FClose(err_stream);
  237. //close(pipe_ctl[0]);
  238. kill(pid, WPIPER_SIG); //indicate child python call ended
  239. waitpid(pid, &piper_status, 0);
  240. if(WIFSIGNALED(piper_status))
  241. {
  242. pyfcgi_log(LOG_ERR,
  243. "Woker[%d] req #%d piper terminated by sig %d",
  244. wrk_id, count, WTERMSIG(piper_status));
  245. exit(40);
  246. }
  247. else if(WEXITSTATUS(piper_status))
  248. {
  249. pyfcgi_log(LOG_ERR,
  250. "Woker[%d] req #%d piper exited with error status %d",
  251. wrk_id, count, WEXITSTATUS(piper_status));
  252. exit(41);
  253. }
  254. FCGI_Finish();
  255. if(ctl_get_rep_sz(pipe_ctl[0], &rep_sz))
  256. {
  257. rep_sz = 0;
  258. }
  259. //Increase sem showing the worker is idle
  260. worker_set_idle(semid);
  261. gettimeofday(&stop, NULL);
  262. stop.tv_sec = stop.tv_sec - start.tv_sec;
  263. stop.tv_usec = stop.tv_usec - start.tv_usec;
  264. if(stop.tv_usec < 0)
  265. {
  266. stop.tv_usec += 1000000;
  267. stop.tv_sec -= 1;
  268. }
  269. pyfcgi_log(LOG_DEBUG, "Worker[%d] request %d END [OK] %lu bytes in %ld.%06lds",
  270. wrk_id, count, rep_sz, stop.tv_sec, stop.tv_usec);
  271. }
  272. free(piper_stack);
  273. Py_Exit(count == max_reqs ?0:EXIT_PYERR);
  274. }
  275. /*
  276. void worker_piper(int wrk_id, int req_id, int pystdout, int pystderr,
  277. int ctl_pipe, FCGX_Stream* out)
  278. */
  279. int worker_piper(void *ptr)
  280. {
  281. piper_args_t *args;
  282. struct pollfd fds[2];
  283. int err, ret, cont, poll_ret;
  284. short revents, closed, nfds;
  285. char buf[PIPE_BUF];
  286. size_t out_sz;
  287. args = ptr;
  288. int wrk_id, pystdout, pystderr, ctl_pipe;
  289. //int req_id;
  290. FCGX_Stream *out;
  291. struct sigaction *act;
  292. wrk_id = args->wrk_id;
  293. //req_id = args->req_id;
  294. pystdout = args->pystdout;
  295. pystderr = args->pystderr;
  296. ctl_pipe = args->ctl_pipe;
  297. out = args->out;
  298. act = args->act;
  299. if(sigaction(WPIPER_SIG, act, NULL))
  300. {
  301. err = errno;
  302. pyfcgi_log( LOG_ALERT, "Unable to sigaction in piper : %s",
  303. strerror(err));
  304. exit(err);
  305. }
  306. write(ctl_pipe, " ", 1);
  307. //pyfcgi_log(LOG_DEBUG, "Worker[%d] req #%d piper", wrk_id, req_id);
  308. fds[0].fd = pystderr;
  309. fds[1].fd = pystdout;
  310. fds[0].events = fds[1].events = POLLIN;
  311. fds[0].revents = fds[1].revents = 0;
  312. nfds = 2;
  313. closed = out_sz = 0;
  314. cont = 2;
  315. while(cont)
  316. {
  317. poll_ret = poll(fds, nfds, 0);
  318. //pyfcgi_log(LOG_DEBUG, "Worker[%d] req #%d poll_ret = %d", wrk_id, req_id, poll_ret);
  319. if(poll_ret < 0)
  320. {
  321. err = errno;
  322. if(err == EINTR)
  323. {
  324. continue;
  325. }
  326. pyfcgi_log( LOG_WARNING, "Poll error : %s",
  327. strerror(err));
  328. fds[0].revents = fds[1].revents = 0;
  329. continue;
  330. }
  331. if(!poll_ret && worker_piper_sigrcv)
  332. {
  333. cont--;
  334. continue;
  335. }
  336. if(poll_ret && (revents = fds[1].revents))
  337. {
  338. /*
  339. pyfcgi_log(LOG_DEBUG, "Worker[%d] req #%d STDOUT evt !",
  340. wrk_id, req_id);
  341. */
  342. poll_ret--;
  343. if(revents & POLLIN)
  344. {
  345. /*
  346. pyfcgi_log(LOG_DEBUG, "Worker[%d] req #%d POLLIN STDOUT !",
  347. wrk_id, req_id);
  348. */
  349. ret = read(pystdout, buf, PIPE_BUF);
  350. /*
  351. pyfcgi_log(LOG_DEBUG, "Worker[%d] req #%d read(stdout) ret %d",
  352. wrk_id, req_id, ret);
  353. */
  354. if(ret < 0)
  355. {
  356. err = errno;
  357. if(err == EINTR)
  358. {
  359. continue;
  360. }
  361. pyfcgi_log( LOG_ERR,
  362. "Error reading python stdout : %s",
  363. strerror(err));
  364. exit(err);
  365. }
  366. //buf[ret] = '\0';
  367. ret = FCGX_PutStr(buf, ret, out);
  368. out_sz += ret;
  369. if(ret < PIPE_BUF && worker_piper_sigrcv)
  370. {
  371. FCGX_FClose(out);
  372. FCGI_Finish();
  373. write(ctl_pipe, &out_sz, sizeof(size_t));
  374. closed = 1;
  375. nfds = 1;
  376. }
  377. }
  378. //TODO handle other poll events
  379. }
  380. if(poll_ret && (revents = fds[0].revents))
  381. {
  382. /*
  383. pyfcgi_log(LOG_DEBUG, "Worker[%d] req #%d STDERR evt !",
  384. wrk_id, req_id);
  385. */
  386. if(revents & POLLIN)
  387. {
  388. /*
  389. pyfcgi_log(LOG_DEBUG, "Worker[%d] req #%d POLLIN STDERR !",
  390. wrk_id, req_id);
  391. */
  392. while(1)
  393. {
  394. ret = read(pystderr, buf, PIPE_BUF);
  395. if(ret < 0)
  396. {
  397. err = errno;
  398. if(err == EINTR)
  399. {
  400. continue;
  401. }
  402. pyfcgi_log( LOG_ERR,
  403. "Error reading python stdout : %s",
  404. strerror(err));
  405. exit(err);
  406. }
  407. buf[ret] = '\0';
  408. pyfcgi_log(LOG_ERR,
  409. "Worker[%d] Python error : %s",
  410. wrk_id, buf);
  411. break;
  412. }
  413. }
  414. //TODO handle other poll events
  415. }
  416. fds[0].revents = fds[1].revents = 0;
  417. }
  418. /*
  419. pyfcgi_log(LOG_DEBUG, "Worker[%d] req #%d piper exiting...",
  420. wrk_id, req_id);
  421. */
  422. if(!closed)
  423. {
  424. FCGX_FClose(out);
  425. FCGI_Finish();
  426. write(ctl_pipe, &out_sz, sizeof(size_t));
  427. }
  428. return 0;
  429. }
  430. void worker_piper_sighandler(int signum)
  431. {
  432. worker_piper_sigrcv = 1;
  433. //pyfcgi_log(LOG_DEBUG, "SIG");
  434. }
  435. int ctl_get_rep_sz(int ctl_pipe, size_t* rep_sz)
  436. {
  437. struct pollfd fds;
  438. int ret, err;
  439. fds.fd = ctl_pipe;
  440. fds.events = POLLIN;
  441. fds.revents = 0;
  442. if( (ret = poll(&fds, 1, 0)) < 0)
  443. {
  444. err = errno;
  445. pyfcgi_log(LOG_ERR, "Failed to poll ctl pipe : %s",
  446. strerror(err));
  447. return -1;
  448. }
  449. if(!ret)
  450. {
  451. pyfcgi_log(LOG_ERR, "No data in ctl pipe for rep_sz...");
  452. return -1;
  453. }
  454. ret = read(ctl_pipe, rep_sz, sizeof(size_t));
  455. if(ret < 0)
  456. {
  457. pyfcgi_log(LOG_ERR, "Error reading ctl pipe : %s",
  458. strerror(errno));
  459. return -1;
  460. }
  461. if(ret < sizeof(size_t))
  462. {
  463. pyfcgi_log(LOG_ERR, "Incomplete read from ctl pipe, no rep_sz...");
  464. return -1;
  465. }
  466. return 0;
  467. }
  468. static void worker_set_idle(int semid)
  469. {
  470. int err;
  471. struct sembuf sop;
  472. sop.sem_num = 0;
  473. sop.sem_flg = 0;
  474. sop.sem_op = 1;
  475. if(semop(semid, &sop, 1) < 0)
  476. {
  477. err = errno;
  478. pyfcgi_log(LOG_ERR, "error incrementing the semaphore : %s",
  479. strerror(err));
  480. }
  481. }
  482. static void worker_set_busy(int semid)
  483. {
  484. int err;
  485. struct sembuf sop;
  486. sop.sem_num = 0;
  487. sop.sem_flg = 0;
  488. sop.sem_op = -1;
  489. if(semop(semid, &sop, 1) < 0)
  490. {
  491. err = errno;
  492. pyfcgi_log(LOG_ERR, "error incrementing the semaphore : %s",
  493. strerror(err));
  494. }
  495. }