You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

664 lines
16 KiB

10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
  1. /* -*- coding: utf-8 -*-
  2. * ----------------------------------------------------------------------
  3. * Copyright © 2012-2014, RedJack, LLC.
  4. * All rights reserved.
  5. *
  6. * Please see the COPYING file in this distribution for license details.
  7. * ----------------------------------------------------------------------
  8. */
  9. #include <assert.h>
  10. #include <errno.h>
  11. #include <fcntl.h>
  12. #include <signal.h>
  13. #ifndef __MINGW32__
  14. #include <sys/select.h>
  15. #include <sys/wait.h>
  16. #endif
  17. #include <unistd.h>
  18. #include "libcork/core.h"
  19. #include "libcork/ds.h"
  20. #include "libcork/os/subprocess.h"
  21. #include "libcork/threads/basics.h"
  22. #include "libcork/helpers/errors.h"
  23. #include "libcork/helpers/posix.h"
  24. #if !defined(CORK_DEBUG_SUBPROCESS)
  25. #define CORK_DEBUG_SUBPROCESS 0
  26. #endif
  27. #if CORK_DEBUG_SUBPROCESS
  28. #include <stdio.h>
  29. #define DEBUG(...) fprintf(stderr, __VA_ARGS__)
  30. #else
  31. #define DEBUG(...) /* no debug messages */
  32. #endif
  33. /*-----------------------------------------------------------------------
  34. * Subprocess groups
  35. */
  36. #define BUF_SIZE 4096
  37. struct cork_subprocess_group {
  38. cork_array(struct cork_subprocess *) subprocesses;
  39. };
  40. struct cork_subprocess_group *
  41. cork_subprocess_group_new(void)
  42. {
  43. struct cork_subprocess_group *group =
  44. cork_new(struct cork_subprocess_group);
  45. cork_pointer_array_init
  46. (&group->subprocesses, (cork_free_f) cork_subprocess_free);
  47. return group;
  48. }
  49. void
  50. cork_subprocess_group_free(struct cork_subprocess_group *group)
  51. {
  52. cork_array_done(&group->subprocesses);
  53. cork_delete(struct cork_subprocess_group, group);
  54. }
  55. void
  56. cork_subprocess_group_add(struct cork_subprocess_group *group,
  57. struct cork_subprocess *sub)
  58. {
  59. cork_array_append(&group->subprocesses, sub);
  60. }
  61. /*-----------------------------------------------------------------------
  62. * Pipes (parent reads)
  63. */
  64. struct cork_read_pipe {
  65. struct cork_stream_consumer *consumer;
  66. int fds[2];
  67. bool first;
  68. };
  69. static void
  70. cork_read_pipe_init(struct cork_read_pipe *p, struct cork_stream_consumer *consumer)
  71. {
  72. p->consumer = consumer;
  73. p->fds[0] = -1;
  74. p->fds[1] = -1;
  75. }
  76. static int
  77. cork_read_pipe_close_read(struct cork_read_pipe *p)
  78. {
  79. if (p->fds[0] != -1) {
  80. DEBUG("Closing read pipe %d\n", p->fds[0]);
  81. rii_check_posix(close(p->fds[0]));
  82. p->fds[0] = -1;
  83. }
  84. return 0;
  85. }
  86. static int
  87. cork_read_pipe_close_write(struct cork_read_pipe *p)
  88. {
  89. if (p->fds[1] != -1) {
  90. DEBUG("Closing write pipe %d\n", p->fds[1]);
  91. rii_check_posix(close(p->fds[1]));
  92. p->fds[1] = -1;
  93. }
  94. return 0;
  95. }
  96. static void
  97. cork_read_pipe_close(struct cork_read_pipe *p)
  98. {
  99. cork_read_pipe_close_read(p);
  100. cork_read_pipe_close_write(p);
  101. }
  102. static void
  103. cork_read_pipe_done(struct cork_read_pipe *p)
  104. {
  105. cork_read_pipe_close(p);
  106. }
  107. static int
  108. cork_read_pipe_open(struct cork_read_pipe *p)
  109. {
  110. if (p->consumer != NULL) {
  111. int flags;
  112. /* We want the read end of the pipe to be non-blocking. */
  113. DEBUG("[read] Opening pipe\n");
  114. rii_check_posix(pipe(p->fds));
  115. DEBUG("[read] Got read=%d write=%d\n", p->fds[0], p->fds[1]);
  116. DEBUG("[read] Setting non-blocking flag on read pipe\n");
  117. ei_check_posix(flags = fcntl(p->fds[0], F_GETFD));
  118. flags |= O_NONBLOCK;
  119. ei_check_posix(fcntl(p->fds[0], F_SETFD, flags));
  120. }
  121. p->first = true;
  122. return 0;
  123. error:
  124. cork_read_pipe_close(p);
  125. return -1;
  126. }
  127. static int
  128. cork_read_pipe_dup(struct cork_read_pipe *p, int fd)
  129. {
  130. if (p->fds[1] != -1) {
  131. rii_check_posix(dup2(p->fds[1], fd));
  132. }
  133. return 0;
  134. }
  135. static int
  136. cork_read_pipe_read(struct cork_read_pipe *p, char *buf, bool *progress)
  137. {
  138. if (p->fds[0] == -1) {
  139. return 0;
  140. }
  141. do {
  142. DEBUG("[read] Reading from pipe %d\n", p->fds[0]);
  143. ssize_t bytes_read = read(p->fds[0], buf, BUF_SIZE);
  144. if (bytes_read == -1) {
  145. if (errno == EAGAIN) {
  146. /* We've exhausted all of the data currently available. */
  147. DEBUG("[read] No more bytes without blocking\n");
  148. return 0;
  149. } else if (errno == EINTR) {
  150. /* Interrupted by a signal; return so that our wait loop can
  151. * catch that. */
  152. DEBUG("[read] Interrupted by signal\n");
  153. return 0;
  154. } else {
  155. /* An actual error */
  156. cork_system_error_set();
  157. DEBUG("[read] Error: %s\n", cork_error_message());
  158. return -1;
  159. }
  160. } else if (bytes_read == 0) {
  161. DEBUG("[read] End of stream\n");
  162. *progress = true;
  163. rii_check(cork_stream_consumer_eof(p->consumer));
  164. rii_check_posix(close(p->fds[0]));
  165. p->fds[0] = -1;
  166. return 0;
  167. } else {
  168. DEBUG("[read] Got %zd bytes\n", bytes_read);
  169. *progress = true;
  170. rii_check(cork_stream_consumer_data
  171. (p->consumer, buf, bytes_read, p->first));
  172. p->first = false;
  173. }
  174. } while (true);
  175. }
  176. static bool
  177. cork_read_pipe_is_finished(struct cork_read_pipe *p)
  178. {
  179. return p->fds[0] == -1;
  180. }
  181. /*-----------------------------------------------------------------------
  182. * Pipes (parent writes)
  183. */
  184. struct cork_write_pipe {
  185. struct cork_stream_consumer consumer;
  186. int fds[2];
  187. };
  188. static int
  189. cork_write_pipe_close_read(struct cork_write_pipe *p)
  190. {
  191. if (p->fds[0] != -1) {
  192. DEBUG("[write] Closing read pipe %d\n", p->fds[0]);
  193. rii_check_posix(close(p->fds[0]));
  194. p->fds[0] = -1;
  195. }
  196. return 0;
  197. }
  198. static int
  199. cork_write_pipe_close_write(struct cork_write_pipe *p)
  200. {
  201. if (p->fds[1] != -1) {
  202. DEBUG("[write] Closing write pipe %d\n", p->fds[1]);
  203. rii_check_posix(close(p->fds[1]));
  204. p->fds[1] = -1;
  205. }
  206. return 0;
  207. }
  208. static int
  209. cork_write_pipe__data(struct cork_stream_consumer *consumer,
  210. const void *buf, size_t size, bool is_first_chunk)
  211. {
  212. struct cork_write_pipe *p =
  213. cork_container_of(consumer, struct cork_write_pipe, consumer);
  214. rii_check_posix(write(p->fds[1], buf, size));
  215. return 0;
  216. }
  217. static int
  218. cork_write_pipe__eof(struct cork_stream_consumer *consumer)
  219. {
  220. struct cork_write_pipe *p =
  221. cork_container_of(consumer, struct cork_write_pipe, consumer);
  222. return cork_write_pipe_close_write(p);
  223. }
  224. static void
  225. cork_write_pipe__free(struct cork_stream_consumer *consumer)
  226. {
  227. }
  228. static void
  229. cork_write_pipe_init(struct cork_write_pipe *p)
  230. {
  231. p->consumer.data = cork_write_pipe__data;
  232. p->consumer.eof = cork_write_pipe__eof;
  233. p->consumer.free = cork_write_pipe__free;
  234. p->fds[0] = -1;
  235. p->fds[1] = -1;
  236. }
  237. static void
  238. cork_write_pipe_close(struct cork_write_pipe *p)
  239. {
  240. cork_write_pipe_close_read(p);
  241. cork_write_pipe_close_write(p);
  242. }
  243. static void
  244. cork_write_pipe_done(struct cork_write_pipe *p)
  245. {
  246. cork_write_pipe_close(p);
  247. }
  248. static int
  249. cork_write_pipe_open(struct cork_write_pipe *p)
  250. {
  251. DEBUG("[write] Opening writer pipe\n");
  252. rii_check_posix(pipe(p->fds));
  253. DEBUG("[write] Got read=%d write=%d\n", p->fds[0], p->fds[1]);
  254. return 0;
  255. }
  256. static int
  257. cork_write_pipe_dup(struct cork_write_pipe *p, int fd)
  258. {
  259. if (p->fds[0] != -1) {
  260. rii_check_posix(dup2(p->fds[0], fd));
  261. }
  262. return 0;
  263. }
  264. /*-----------------------------------------------------------------------
  265. * Subprocesses
  266. */
  267. struct cork_subprocess {
  268. pid_t pid;
  269. struct cork_write_pipe stdin_pipe;
  270. struct cork_read_pipe stdout_pipe;
  271. struct cork_read_pipe stderr_pipe;
  272. void *user_data;
  273. cork_free_f free_user_data;
  274. cork_run_f run;
  275. int *exit_code;
  276. char buf[BUF_SIZE];
  277. };
  278. struct cork_subprocess *
  279. cork_subprocess_new(void *user_data, cork_free_f free_user_data,
  280. cork_run_f run,
  281. struct cork_stream_consumer *stdout_consumer,
  282. struct cork_stream_consumer *stderr_consumer,
  283. int *exit_code)
  284. {
  285. struct cork_subprocess *self = cork_new(struct cork_subprocess);
  286. cork_write_pipe_init(&self->stdin_pipe);
  287. cork_read_pipe_init(&self->stdout_pipe, stdout_consumer);
  288. cork_read_pipe_init(&self->stderr_pipe, stderr_consumer);
  289. self->pid = 0;
  290. self->user_data = user_data;
  291. self->free_user_data = free_user_data;
  292. self->run = run;
  293. self->exit_code = exit_code;
  294. return self;
  295. }
  296. void
  297. cork_subprocess_free(struct cork_subprocess *self)
  298. {
  299. cork_free_user_data(self);
  300. cork_write_pipe_done(&self->stdin_pipe);
  301. cork_read_pipe_done(&self->stdout_pipe);
  302. cork_read_pipe_done(&self->stderr_pipe);
  303. cork_delete(struct cork_subprocess, self);
  304. }
  305. struct cork_stream_consumer *
  306. cork_subprocess_stdin(struct cork_subprocess *self)
  307. {
  308. return &self->stdin_pipe.consumer;
  309. }
  310. /*-----------------------------------------------------------------------
  311. * Executing another program
  312. */
  313. static int
  314. cork_exec__run(void *vself)
  315. {
  316. struct cork_exec *exec = vself;
  317. return cork_exec_run(exec);
  318. }
  319. static void
  320. cork_exec__free(void *vself)
  321. {
  322. struct cork_exec *exec = vself;
  323. cork_exec_free(exec);
  324. }
  325. struct cork_subprocess *
  326. cork_subprocess_new_exec(struct cork_exec *exec,
  327. struct cork_stream_consumer *out,
  328. struct cork_stream_consumer *err,
  329. int *exit_code)
  330. {
  331. return cork_subprocess_new
  332. (exec, cork_exec__free,
  333. cork_exec__run,
  334. out, err, exit_code);
  335. }
  336. /*-----------------------------------------------------------------------
  337. * Running subprocesses
  338. */
  339. int
  340. cork_subprocess_start(struct cork_subprocess *self)
  341. {
  342. pid_t pid;
  343. /* Create the stdout and stderr pipes. */
  344. if (cork_write_pipe_open(&self->stdin_pipe) == -1) {
  345. return -1;
  346. }
  347. if (cork_read_pipe_open(&self->stdout_pipe) == -1) {
  348. cork_write_pipe_close(&self->stdin_pipe);
  349. return -1;
  350. }
  351. if (cork_read_pipe_open(&self->stderr_pipe) == -1) {
  352. cork_write_pipe_close(&self->stdin_pipe);
  353. cork_read_pipe_close(&self->stdout_pipe);
  354. return -1;
  355. }
  356. /* Fork the child process. */
  357. DEBUG("Forking child process\n");
  358. pid = fork();
  359. if (pid == 0) {
  360. /* Child process */
  361. int rc;
  362. /* Close the parent's end of the pipes */
  363. DEBUG("[child] ");
  364. cork_write_pipe_close_write(&self->stdin_pipe);
  365. DEBUG("[child] ");
  366. cork_read_pipe_close_read(&self->stdout_pipe);
  367. DEBUG("[child] ");
  368. cork_read_pipe_close_read(&self->stderr_pipe);
  369. /* Bind the stdout and stderr pipes */
  370. if (cork_write_pipe_dup(&self->stdin_pipe, STDIN_FILENO) == -1) {
  371. _exit(EXIT_FAILURE);
  372. }
  373. if (cork_read_pipe_dup(&self->stdout_pipe, STDOUT_FILENO) == -1) {
  374. _exit(EXIT_FAILURE);
  375. }
  376. if (cork_read_pipe_dup(&self->stderr_pipe, STDERR_FILENO) == -1) {
  377. _exit(EXIT_FAILURE);
  378. }
  379. /* Run the subprocess */
  380. rc = self->run(self->user_data);
  381. if (CORK_LIKELY(rc == 0)) {
  382. _exit(EXIT_SUCCESS);
  383. } else {
  384. fprintf(stderr, "%s\n", cork_error_message());
  385. _exit(EXIT_FAILURE);
  386. }
  387. } else if (pid < 0) {
  388. /* Error forking */
  389. cork_system_error_set();
  390. return -1;
  391. } else {
  392. /* Parent process */
  393. DEBUG(" Child PID=%d\n", (int) pid);
  394. self->pid = pid;
  395. cork_write_pipe_close_read(&self->stdin_pipe);
  396. cork_read_pipe_close_write(&self->stdout_pipe);
  397. cork_read_pipe_close_write(&self->stderr_pipe);
  398. return 0;
  399. }
  400. }
  401. static int
  402. cork_subprocess_reap(struct cork_subprocess *self, int flags, bool *progress)
  403. {
  404. int pid;
  405. int status;
  406. rii_check_posix(pid = waitpid(self->pid, &status, flags));
  407. if (pid == self->pid) {
  408. *progress = true;
  409. self->pid = 0;
  410. if (self->exit_code != NULL) {
  411. *self->exit_code = WEXITSTATUS(status);
  412. }
  413. }
  414. return 0;
  415. }
  416. int
  417. cork_subprocess_abort(struct cork_subprocess *self)
  418. {
  419. if (self->pid > 0) {
  420. CORK_ATTR_UNUSED bool progress;
  421. DEBUG("Terminating child process %d\n", (int) self->pid);
  422. kill(self->pid, SIGTERM);
  423. return cork_subprocess_reap(self, 0, &progress);
  424. } else {
  425. return 0;
  426. }
  427. }
  428. bool
  429. cork_subprocess_is_finished(struct cork_subprocess *self)
  430. {
  431. return (self->pid == 0)
  432. && cork_read_pipe_is_finished(&self->stdout_pipe)
  433. && cork_read_pipe_is_finished(&self->stderr_pipe);
  434. }
  435. #if defined(__APPLE__) || defined(__MINGW32__) || defined(__CYGWIN__)
  436. #include <pthread.h>
  437. #define THREAD_YIELD pthread_yield_np
  438. #elif defined(__linux__) || defined(BSD) || defined(__sun)
  439. #include <sched.h>
  440. #define THREAD_YIELD sched_yield
  441. #else
  442. #error "Unknown thread yield implementation"
  443. #endif
  444. static void
  445. cork_subprocess_yield(unsigned int *spin_count)
  446. {
  447. /* Adapted from
  448. * http://www.1024cores.net/home/lock-free-algorithms/tricks/spinning */
  449. if (*spin_count < 10) {
  450. /* Spin-wait */
  451. cork_pause();
  452. } else if (*spin_count < 20) {
  453. /* A more intense spin-wait */
  454. int i;
  455. for (i = 0; i < 50; i++) {
  456. cork_pause();
  457. }
  458. } else if (*spin_count < 22) {
  459. THREAD_YIELD();
  460. } else if (*spin_count < 24) {
  461. usleep(0);
  462. } else if (*spin_count < 50) {
  463. usleep(1);
  464. } else if (*spin_count < 75) {
  465. usleep((*spin_count - 49) * 1000);
  466. } else {
  467. usleep(25000);
  468. }
  469. (*spin_count)++;
  470. }
  471. static int
  472. cork_subprocess_drain_(struct cork_subprocess *self, bool *progress)
  473. {
  474. rii_check(cork_read_pipe_read(&self->stdout_pipe, self->buf, progress));
  475. rii_check(cork_read_pipe_read(&self->stderr_pipe, self->buf, progress));
  476. if (self->pid > 0) {
  477. return cork_subprocess_reap(self, WNOHANG, progress);
  478. } else {
  479. return 0;
  480. }
  481. }
  482. bool
  483. cork_subprocess_drain(struct cork_subprocess *self)
  484. {
  485. bool progress;
  486. cork_subprocess_drain_(self, &progress);
  487. return progress;
  488. }
  489. int
  490. cork_subprocess_wait(struct cork_subprocess *self)
  491. {
  492. unsigned int spin_count = 0;
  493. bool progress;
  494. while (!cork_subprocess_is_finished(self)) {
  495. progress = false;
  496. rii_check(cork_subprocess_drain_(self, &progress));
  497. if (!progress) {
  498. cork_subprocess_yield(&spin_count);
  499. }
  500. }
  501. return 0;
  502. }
  503. /*-----------------------------------------------------------------------
  504. * Running subprocess groups
  505. */
  506. static int
  507. cork_subprocess_group_terminate(struct cork_subprocess_group *group)
  508. {
  509. size_t i;
  510. for (i = 0; i < cork_array_size(&group->subprocesses); i++) {
  511. struct cork_subprocess *sub = cork_array_at(&group->subprocesses, i);
  512. rii_check(cork_subprocess_abort(sub));
  513. }
  514. return 0;
  515. }
  516. int
  517. cork_subprocess_group_start(struct cork_subprocess_group *group)
  518. {
  519. size_t i;
  520. DEBUG("Starting subprocess group\n");
  521. /* Start each subprocess. */
  522. for (i = 0; i < cork_array_size(&group->subprocesses); i++) {
  523. struct cork_subprocess *sub = cork_array_at(&group->subprocesses, i);
  524. ei_check(cork_subprocess_start(sub));
  525. }
  526. return 0;
  527. error:
  528. cork_subprocess_group_terminate(group);
  529. return -1;
  530. }
  531. int
  532. cork_subprocess_group_abort(struct cork_subprocess_group *group)
  533. {
  534. DEBUG("Aborting subprocess group\n");
  535. return cork_subprocess_group_terminate(group);
  536. }
  537. bool
  538. cork_subprocess_group_is_finished(struct cork_subprocess_group *group)
  539. {
  540. size_t i;
  541. for (i = 0; i < cork_array_size(&group->subprocesses); i++) {
  542. struct cork_subprocess *sub = cork_array_at(&group->subprocesses, i);
  543. bool sub_finished = cork_subprocess_is_finished(sub);
  544. if (!sub_finished) {
  545. return false;
  546. }
  547. }
  548. return true;
  549. }
  550. static int
  551. cork_subprocess_group_drain_(struct cork_subprocess_group *group,
  552. bool *progress)
  553. {
  554. size_t i;
  555. for (i = 0; i < cork_array_size(&group->subprocesses); i++) {
  556. struct cork_subprocess *sub = cork_array_at(&group->subprocesses, i);
  557. rii_check(cork_subprocess_drain_(sub, progress));
  558. }
  559. return 0;
  560. }
  561. bool
  562. cork_subprocess_group_drain(struct cork_subprocess_group *group)
  563. {
  564. bool progress = false;
  565. cork_subprocess_group_drain_(group, &progress);
  566. return progress;
  567. }
  568. int
  569. cork_subprocess_group_wait(struct cork_subprocess_group *group)
  570. {
  571. unsigned int spin_count = 0;
  572. bool progress;
  573. DEBUG("Waiting for subprocess group to finish\n");
  574. while (!cork_subprocess_group_is_finished(group)) {
  575. progress = false;
  576. rii_check(cork_subprocess_group_drain_(group, &progress));
  577. if (!progress) {
  578. cork_subprocess_yield(&spin_count);
  579. }
  580. }
  581. return 0;
  582. }