676 lines
17 KiB

11 years ago
10 years ago
11 years ago
10 years ago
11 years ago
11 years ago
11 years ago
  1. /* -*- coding: utf-8 -*-
  2. * ----------------------------------------------------------------------
  3. * Copyright © 2012-2013, RedJack, LLC.
  4. * All rights reserved.
  5. *
  6. * Please see the COPYING file in this distribution for license
  7. * details.
  8. * ----------------------------------------------------------------------
  9. */
  10. #include <assert.h>
  11. #include <errno.h>
  12. #include <fcntl.h>
  13. #include <signal.h>
  14. #ifndef __MINGW32__
  15. #include <sys/select.h>
  16. #include <sys/wait.h>
  17. #endif
  18. #include <unistd.h>
  19. #include "libcork/core.h"
  20. #include "libcork/ds.h"
  21. #include "libcork/os/subprocess.h"
  22. #include "libcork/threads/basics.h"
  23. #include "libcork/helpers/errors.h"
  24. #include "libcork/helpers/posix.h"
  25. #if !defined(CORK_DEBUG_SUBPROCESS)
  26. #define CORK_DEBUG_SUBPROCESS 0
  27. #endif
  28. #if CORK_DEBUG_SUBPROCESS
  29. #include <stdio.h>
  30. #define DEBUG(...) fprintf(stderr, __VA_ARGS__)
  31. #else
  32. #define DEBUG(...) /* no debug messages */
  33. #endif
  34. /*-----------------------------------------------------------------------
  35. * Subprocess groups
  36. */
  37. #define BUF_SIZE 4096
  38. struct cork_subprocess_group {
  39. cork_array(struct cork_subprocess *) subprocesses;
  40. };
  41. struct cork_subprocess_group *
  42. cork_subprocess_group_new(void)
  43. {
  44. struct cork_subprocess_group *group =
  45. cork_new(struct cork_subprocess_group);
  46. cork_pointer_array_init
  47. (&group->subprocesses, (cork_free_f) cork_subprocess_free);
  48. return group;
  49. }
  50. void
  51. cork_subprocess_group_free(struct cork_subprocess_group *group)
  52. {
  53. cork_array_done(&group->subprocesses);
  54. free(group);
  55. }
  56. void
  57. cork_subprocess_group_add(struct cork_subprocess_group *group,
  58. struct cork_subprocess *sub)
  59. {
  60. cork_array_append(&group->subprocesses, sub);
  61. }
  62. /*-----------------------------------------------------------------------
  63. * Pipes (parent reads)
  64. */
  65. struct cork_read_pipe {
  66. struct cork_stream_consumer *consumer;
  67. int fds[2];
  68. bool first;
  69. };
  70. static void
  71. cork_read_pipe_init(struct cork_read_pipe *p, struct cork_stream_consumer *consumer)
  72. {
  73. p->consumer = consumer;
  74. p->fds[0] = -1;
  75. p->fds[1] = -1;
  76. }
  77. static int
  78. cork_read_pipe_close_read(struct cork_read_pipe *p)
  79. {
  80. if (p->fds[0] != -1) {
  81. DEBUG("Closing read pipe %d\n", p->fds[0]);
  82. rii_check_posix(close(p->fds[0]));
  83. p->fds[0] = -1;
  84. }
  85. return 0;
  86. }
  87. static int
  88. cork_read_pipe_close_write(struct cork_read_pipe *p)
  89. {
  90. if (p->fds[1] != -1) {
  91. DEBUG("Closing write pipe %d\n", p->fds[1]);
  92. rii_check_posix(close(p->fds[1]));
  93. p->fds[1] = -1;
  94. }
  95. return 0;
  96. }
  97. static void
  98. cork_read_pipe_close(struct cork_read_pipe *p)
  99. {
  100. cork_read_pipe_close_read(p);
  101. cork_read_pipe_close_write(p);
  102. }
  103. static void
  104. cork_read_pipe_done(struct cork_read_pipe *p)
  105. {
  106. cork_read_pipe_close(p);
  107. }
  108. static int
  109. cork_read_pipe_open(struct cork_read_pipe *p)
  110. {
  111. if (p->consumer != NULL) {
  112. int flags;
  113. /* We want the read end of the pipe to be non-blocking. */
  114. DEBUG("[read] Opening pipe\n");
  115. rii_check_posix(pipe(p->fds));
  116. DEBUG("[read] Got read=%d write=%d\n", p->fds[0], p->fds[1]);
  117. DEBUG("[read] Setting non-blocking flag on read pipe\n");
  118. ei_check_posix(flags = fcntl(p->fds[0], F_GETFD));
  119. flags |= O_NONBLOCK;
  120. ei_check_posix(fcntl(p->fds[0], F_SETFD, flags));
  121. }
  122. p->first = true;
  123. return 0;
  124. error:
  125. cork_read_pipe_close(p);
  126. return -1;
  127. }
  128. static int
  129. cork_read_pipe_dup(struct cork_read_pipe *p, int fd)
  130. {
  131. if (p->fds[1] != -1) {
  132. rii_check_posix(dup2(p->fds[1], fd));
  133. }
  134. return 0;
  135. }
  136. static int
  137. cork_read_pipe_read(struct cork_read_pipe *p, char *buf, bool *progress)
  138. {
  139. if (p->fds[0] == -1) {
  140. return 0;
  141. }
  142. do {
  143. DEBUG("[read] Reading from pipe %d\n", p->fds[0]);
  144. ssize_t bytes_read = read(p->fds[0], buf, BUF_SIZE);
  145. if (bytes_read == -1) {
  146. if (errno == EAGAIN) {
  147. /* We've exhausted all of the data currently available. */
  148. DEBUG("[read] No more bytes without blocking\n");
  149. return 0;
  150. } else if (errno == EINTR) {
  151. /* Interrupted by a signal; return so that our wait loop can
  152. * catch that. */
  153. DEBUG("[read] Interrupted by signal\n");
  154. return 0;
  155. } else {
  156. /* An actual error */
  157. cork_system_error_set();
  158. DEBUG("[read] Error: %s\n", cork_error_message());
  159. return -1;
  160. }
  161. } else if (bytes_read == 0) {
  162. DEBUG("[read] End of stream\n");
  163. *progress = true;
  164. rii_check(cork_stream_consumer_eof(p->consumer));
  165. rii_check_posix(close(p->fds[0]));
  166. p->fds[0] = -1;
  167. return 0;
  168. } else {
  169. DEBUG("[read] Got %zd bytes\n", bytes_read);
  170. *progress = true;
  171. rii_check(cork_stream_consumer_data
  172. (p->consumer, buf, bytes_read, p->first));
  173. p->first = false;
  174. }
  175. } while (true);
  176. }
  177. static bool
  178. cork_read_pipe_is_finished(struct cork_read_pipe *p)
  179. {
  180. return p->fds[0] == -1;
  181. }
  182. /*-----------------------------------------------------------------------
  183. * Pipes (parent writes)
  184. */
  185. struct cork_write_pipe {
  186. struct cork_stream_consumer consumer;
  187. int fds[2];
  188. };
  189. static int
  190. cork_write_pipe_close_read(struct cork_write_pipe *p)
  191. {
  192. if (p->fds[0] != -1) {
  193. DEBUG("[write] Closing read pipe %d\n", p->fds[0]);
  194. rii_check_posix(close(p->fds[0]));
  195. p->fds[0] = -1;
  196. }
  197. return 0;
  198. }
  199. static int
  200. cork_write_pipe_close_write(struct cork_write_pipe *p)
  201. {
  202. if (p->fds[1] != -1) {
  203. DEBUG("[write] Closing write pipe %d\n", p->fds[1]);
  204. rii_check_posix(close(p->fds[1]));
  205. p->fds[1] = -1;
  206. }
  207. return 0;
  208. }
  209. static int
  210. cork_write_pipe__data(struct cork_stream_consumer *consumer,
  211. const void *buf, size_t size, bool is_first_chunk)
  212. {
  213. struct cork_write_pipe *p =
  214. cork_container_of(consumer, struct cork_write_pipe, consumer);
  215. rii_check_posix(write(p->fds[1], buf, size));
  216. return 0;
  217. }
  218. static int
  219. cork_write_pipe__eof(struct cork_stream_consumer *consumer)
  220. {
  221. struct cork_write_pipe *p =
  222. cork_container_of(consumer, struct cork_write_pipe, consumer);
  223. return cork_write_pipe_close_write(p);
  224. }
  225. static void
  226. cork_write_pipe__free(struct cork_stream_consumer *consumer)
  227. {
  228. }
  229. static void
  230. cork_write_pipe_init(struct cork_write_pipe *p)
  231. {
  232. p->consumer.data = cork_write_pipe__data;
  233. p->consumer.eof = cork_write_pipe__eof;
  234. p->consumer.free = cork_write_pipe__free;
  235. p->fds[0] = -1;
  236. p->fds[1] = -1;
  237. }
  238. static void
  239. cork_write_pipe_close(struct cork_write_pipe *p)
  240. {
  241. cork_write_pipe_close_read(p);
  242. cork_write_pipe_close_write(p);
  243. }
  244. static void
  245. cork_write_pipe_done(struct cork_write_pipe *p)
  246. {
  247. cork_write_pipe_close(p);
  248. }
  249. static int
  250. cork_write_pipe_open(struct cork_write_pipe *p)
  251. {
  252. DEBUG("[write] Opening writer pipe\n");
  253. rii_check_posix(pipe(p->fds));
  254. DEBUG("[write] Got read=%d write=%d\n", p->fds[0], p->fds[1]);
  255. return 0;
  256. }
  257. static int
  258. cork_write_pipe_dup(struct cork_write_pipe *p, int fd)
  259. {
  260. if (p->fds[0] != -1) {
  261. rii_check_posix(dup2(p->fds[0], fd));
  262. }
  263. return 0;
  264. }
  265. /*-----------------------------------------------------------------------
  266. * Subprocesses
  267. */
  268. struct cork_subprocess {
  269. pid_t pid;
  270. struct cork_write_pipe stdin_pipe;
  271. struct cork_read_pipe stdout_pipe;
  272. struct cork_read_pipe stderr_pipe;
  273. struct cork_thread_body *body;
  274. int *exit_code;
  275. char buf[BUF_SIZE];
  276. };
  277. struct cork_subprocess *
  278. cork_subprocess_new(struct cork_thread_body *body,
  279. struct cork_stream_consumer *stdout_consumer,
  280. struct cork_stream_consumer *stderr_consumer,
  281. int *exit_code)
  282. {
  283. struct cork_subprocess *self = cork_new(struct cork_subprocess);
  284. cork_write_pipe_init(&self->stdin_pipe);
  285. cork_read_pipe_init(&self->stdout_pipe, stdout_consumer);
  286. cork_read_pipe_init(&self->stderr_pipe, stderr_consumer);
  287. self->pid = 0;
  288. self->body = body;
  289. self->exit_code = exit_code;
  290. return self;
  291. }
  292. void
  293. cork_subprocess_free(struct cork_subprocess *self)
  294. {
  295. cork_thread_body_free(self->body);
  296. cork_write_pipe_done(&self->stdin_pipe);
  297. cork_read_pipe_done(&self->stdout_pipe);
  298. cork_read_pipe_done(&self->stderr_pipe);
  299. free(self);
  300. }
  301. struct cork_stream_consumer *
  302. cork_subprocess_stdin(struct cork_subprocess *self)
  303. {
  304. return &self->stdin_pipe.consumer;
  305. }
  306. /*-----------------------------------------------------------------------
  307. * Executing another program
  308. */
  309. struct cork_exec_body {
  310. struct cork_thread_body parent;
  311. struct cork_exec *exec;
  312. };
  313. static int
  314. cork_exec__run(struct cork_thread_body *vself)
  315. {
  316. struct cork_exec_body *self =
  317. cork_container_of(vself, struct cork_exec_body, parent);
  318. return cork_exec_run(self->exec);
  319. }
  320. static void
  321. cork_exec__free(struct cork_thread_body *vself)
  322. {
  323. struct cork_exec_body *self =
  324. cork_container_of(vself, struct cork_exec_body, parent);
  325. cork_exec_free(self->exec);
  326. free(self);
  327. }
  328. static struct cork_thread_body *
  329. cork_exec_body_new(struct cork_exec *exec)
  330. {
  331. struct cork_exec_body *self = cork_new(struct cork_exec_body);
  332. self->parent.run = cork_exec__run;
  333. self->parent.free = cork_exec__free;
  334. self->exec = exec;
  335. return &self->parent;
  336. }
  337. struct cork_subprocess *
  338. cork_subprocess_new_exec(struct cork_exec *exec,
  339. struct cork_stream_consumer *out,
  340. struct cork_stream_consumer *err,
  341. int *exit_code)
  342. {
  343. struct cork_thread_body *body = cork_exec_body_new(exec);
  344. return cork_subprocess_new(body, out, err, exit_code);
  345. }
  346. /*-----------------------------------------------------------------------
  347. * Running subprocesses
  348. */
  349. int
  350. cork_subprocess_start(struct cork_subprocess *self)
  351. {
  352. pid_t pid;
  353. /* Create the stdout and stderr pipes. */
  354. if (cork_write_pipe_open(&self->stdin_pipe) == -1) {
  355. return -1;
  356. }
  357. if (cork_read_pipe_open(&self->stdout_pipe) == -1) {
  358. cork_write_pipe_close(&self->stdin_pipe);
  359. return -1;
  360. }
  361. if (cork_read_pipe_open(&self->stderr_pipe) == -1) {
  362. cork_write_pipe_close(&self->stdin_pipe);
  363. cork_read_pipe_close(&self->stdout_pipe);
  364. return -1;
  365. }
  366. /* Fork the child process. */
  367. DEBUG("Forking child process\n");
  368. pid = fork();
  369. if (pid == 0) {
  370. /* Child process */
  371. int rc;
  372. /* Close the parent's end of the pipes */
  373. DEBUG("[child] ");
  374. cork_write_pipe_close_write(&self->stdin_pipe);
  375. DEBUG("[child] ");
  376. cork_read_pipe_close_read(&self->stdout_pipe);
  377. DEBUG("[child] ");
  378. cork_read_pipe_close_read(&self->stderr_pipe);
  379. /* Bind the stdout and stderr pipes */
  380. if (cork_write_pipe_dup(&self->stdin_pipe, STDIN_FILENO) == -1) {
  381. _exit(EXIT_FAILURE);
  382. }
  383. if (cork_read_pipe_dup(&self->stdout_pipe, STDOUT_FILENO) == -1) {
  384. _exit(EXIT_FAILURE);
  385. }
  386. if (cork_read_pipe_dup(&self->stderr_pipe, STDERR_FILENO) == -1) {
  387. _exit(EXIT_FAILURE);
  388. }
  389. /* Run the subprocess's body */
  390. rc = cork_thread_body_run(self->body);
  391. if (CORK_LIKELY(rc == 0)) {
  392. _exit(EXIT_SUCCESS);
  393. } else {
  394. fprintf(stderr, "%s\n", cork_error_message());
  395. _exit(EXIT_FAILURE);
  396. }
  397. } else if (pid < 0) {
  398. /* Error forking */
  399. cork_system_error_set();
  400. return -1;
  401. } else {
  402. /* Parent process */
  403. DEBUG(" Child PID=%d\n", (int) pid);
  404. self->pid = pid;
  405. cork_write_pipe_close_read(&self->stdin_pipe);
  406. cork_read_pipe_close_write(&self->stdout_pipe);
  407. cork_read_pipe_close_write(&self->stderr_pipe);
  408. return 0;
  409. }
  410. }
  411. static int
  412. cork_subprocess_reap(struct cork_subprocess *self, int flags, bool *progress)
  413. {
  414. int pid;
  415. int status;
  416. rii_check_posix(pid = waitpid(self->pid, &status, flags));
  417. if (pid == self->pid) {
  418. *progress = true;
  419. self->pid = 0;
  420. if (self->exit_code != NULL) {
  421. *self->exit_code = WEXITSTATUS(status);
  422. }
  423. }
  424. return 0;
  425. }
  426. int
  427. cork_subprocess_abort(struct cork_subprocess *self)
  428. {
  429. if (self->pid > 0) {
  430. CORK_ATTR_UNUSED bool progress;
  431. DEBUG("Terminating child process %d\n", (int) self->pid);
  432. kill(self->pid, SIGTERM);
  433. return cork_subprocess_reap(self, 0, &progress);
  434. } else {
  435. return 0;
  436. }
  437. }
  438. bool
  439. cork_subprocess_is_finished(struct cork_subprocess *self)
  440. {
  441. return (self->pid == 0)
  442. && cork_read_pipe_is_finished(&self->stdout_pipe)
  443. && cork_read_pipe_is_finished(&self->stderr_pipe);
  444. }
  445. #if defined(__APPLE__) || defined(__MINGW32__) || defined(__CYGWIN__)
  446. #include <pthread.h>
  447. #define THREAD_YIELD pthread_yield_np
  448. #elif defined(__linux__) || defined(BSD) || defined(__sun)
  449. #include <sched.h>
  450. #define THREAD_YIELD sched_yield
  451. #else
  452. #error "Unknown thread yield implementation"
  453. #endif
  454. static void
  455. cork_subprocess_yield(unsigned int *spin_count)
  456. {
  457. /* Adapted from
  458. * http://www.1024cores.net/home/lock-free-algorithms/tricks/spinning */
  459. if (*spin_count < 10) {
  460. /* Spin-wait */
  461. cork_pause();
  462. } else if (*spin_count < 20) {
  463. /* A more intense spin-wait */
  464. int i;
  465. for (i = 0; i < 50; i++) {
  466. cork_pause();
  467. }
  468. } else if (*spin_count < 22) {
  469. THREAD_YIELD();
  470. } else if (*spin_count < 24) {
  471. usleep(0);
  472. } else if (*spin_count < 50) {
  473. usleep(1);
  474. } else if (*spin_count < 75) {
  475. usleep((*spin_count - 49) * 1000);
  476. } else {
  477. usleep(25000);
  478. }
  479. (*spin_count)++;
  480. }
  481. static int
  482. cork_subprocess_drain_(struct cork_subprocess *self, bool *progress)
  483. {
  484. rii_check(cork_read_pipe_read(&self->stdout_pipe, self->buf, progress));
  485. rii_check(cork_read_pipe_read(&self->stderr_pipe, self->buf, progress));
  486. if (self->pid > 0) {
  487. return cork_subprocess_reap(self, WNOHANG, progress);
  488. } else {
  489. return 0;
  490. }
  491. }
  492. bool
  493. cork_subprocess_drain(struct cork_subprocess *self)
  494. {
  495. bool progress;
  496. cork_subprocess_drain_(self, &progress);
  497. return progress;
  498. }
  499. int
  500. cork_subprocess_wait(struct cork_subprocess *self)
  501. {
  502. unsigned int spin_count = 0;
  503. bool progress;
  504. while (!cork_subprocess_is_finished(self)) {
  505. progress = false;
  506. rii_check(cork_subprocess_drain_(self, &progress));
  507. if (!progress) {
  508. cork_subprocess_yield(&spin_count);
  509. }
  510. }
  511. return 0;
  512. }
  513. /*-----------------------------------------------------------------------
  514. * Running subprocess groups
  515. */
  516. static int
  517. cork_subprocess_group_terminate(struct cork_subprocess_group *group)
  518. {
  519. size_t i;
  520. for (i = 0; i < cork_array_size(&group->subprocesses); i++) {
  521. struct cork_subprocess *sub = cork_array_at(&group->subprocesses, i);
  522. rii_check(cork_subprocess_abort(sub));
  523. }
  524. return 0;
  525. }
  526. int
  527. cork_subprocess_group_start(struct cork_subprocess_group *group)
  528. {
  529. size_t i;
  530. DEBUG("Starting subprocess group\n");
  531. /* Start each subprocess. */
  532. for (i = 0; i < cork_array_size(&group->subprocesses); i++) {
  533. struct cork_subprocess *sub = cork_array_at(&group->subprocesses, i);
  534. ei_check(cork_subprocess_start(sub));
  535. }
  536. return 0;
  537. error:
  538. cork_subprocess_group_terminate(group);
  539. return -1;
  540. }
  541. int
  542. cork_subprocess_group_abort(struct cork_subprocess_group *group)
  543. {
  544. DEBUG("Aborting subprocess group\n");
  545. return cork_subprocess_group_terminate(group);
  546. }
  547. bool
  548. cork_subprocess_group_is_finished(struct cork_subprocess_group *group)
  549. {
  550. size_t i;
  551. for (i = 0; i < cork_array_size(&group->subprocesses); i++) {
  552. struct cork_subprocess *sub = cork_array_at(&group->subprocesses, i);
  553. bool sub_finished = cork_subprocess_is_finished(sub);
  554. if (!sub_finished) {
  555. return false;
  556. }
  557. }
  558. return true;
  559. }
  560. static int
  561. cork_subprocess_group_drain_(struct cork_subprocess_group *group,
  562. bool *progress)
  563. {
  564. size_t i;
  565. for (i = 0; i < cork_array_size(&group->subprocesses); i++) {
  566. struct cork_subprocess *sub = cork_array_at(&group->subprocesses, i);
  567. rii_check(cork_subprocess_drain_(sub, progress));
  568. }
  569. return 0;
  570. }
  571. bool
  572. cork_subprocess_group_drain(struct cork_subprocess_group *group)
  573. {
  574. bool progress = false;
  575. cork_subprocess_group_drain_(group, &progress);
  576. return progress;
  577. }
  578. int
  579. cork_subprocess_group_wait(struct cork_subprocess_group *group)
  580. {
  581. unsigned int spin_count = 0;
  582. bool progress;
  583. DEBUG("Waiting for subprocess group to finish\n");
  584. while (!cork_subprocess_group_is_finished(group)) {
  585. progress = false;
  586. rii_check(cork_subprocess_group_drain_(group, &progress));
  587. if (!progress) {
  588. cork_subprocess_yield(&spin_count);
  589. }
  590. }
  591. return 0;
  592. }