You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

676 lines
17 KiB

10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
10 years ago
  1. /* -*- coding: utf-8 -*-
  2. * ----------------------------------------------------------------------
  3. * Copyright © 2012-2013, RedJack, LLC.
  4. * All rights reserved.
  5. *
  6. * Please see the COPYING file in this distribution for license
  7. * details.
  8. * ----------------------------------------------------------------------
  9. */
  10. #include <assert.h>
  11. #include <errno.h>
  12. #include <fcntl.h>
  13. #include <signal.h>
  14. #ifndef __MINGW32__
  15. #include <sys/select.h>
  16. #include <sys/wait.h>
  17. #endif
  18. #include <unistd.h>
  19. #include "libcork/core.h"
  20. #include "libcork/ds.h"
  21. #include "libcork/os/subprocess.h"
  22. #include "libcork/threads/basics.h"
  23. #include "libcork/helpers/errors.h"
  24. #include "libcork/helpers/posix.h"
  25. #if !defined(CORK_DEBUG_SUBPROCESS)
  26. #define CORK_DEBUG_SUBPROCESS 0
  27. #endif
  28. #if CORK_DEBUG_SUBPROCESS
  29. #include <stdio.h>
  30. #define DEBUG(...) fprintf(stderr, __VA_ARGS__)
  31. #else
  32. #define DEBUG(...) /* no debug messages */
  33. #endif
  34. /*-----------------------------------------------------------------------
  35. * Subprocess groups
  36. */
  37. #define BUF_SIZE 4096
  38. struct cork_subprocess_group {
  39. cork_array(struct cork_subprocess *) subprocesses;
  40. };
  41. struct cork_subprocess_group *
  42. cork_subprocess_group_new(void)
  43. {
  44. struct cork_subprocess_group *group =
  45. cork_new(struct cork_subprocess_group);
  46. cork_pointer_array_init
  47. (&group->subprocesses, (cork_free_f) cork_subprocess_free);
  48. return group;
  49. }
  50. void
  51. cork_subprocess_group_free(struct cork_subprocess_group *group)
  52. {
  53. cork_array_done(&group->subprocesses);
  54. free(group);
  55. }
  56. void
  57. cork_subprocess_group_add(struct cork_subprocess_group *group,
  58. struct cork_subprocess *sub)
  59. {
  60. cork_array_append(&group->subprocesses, sub);
  61. }
  62. /*-----------------------------------------------------------------------
  63. * Pipes (parent reads)
  64. */
  65. struct cork_read_pipe {
  66. struct cork_stream_consumer *consumer;
  67. int fds[2];
  68. bool first;
  69. };
  70. static void
  71. cork_read_pipe_init(struct cork_read_pipe *p, struct cork_stream_consumer *consumer)
  72. {
  73. p->consumer = consumer;
  74. p->fds[0] = -1;
  75. p->fds[1] = -1;
  76. }
  77. static int
  78. cork_read_pipe_close_read(struct cork_read_pipe *p)
  79. {
  80. if (p->fds[0] != -1) {
  81. DEBUG("Closing read pipe %d\n", p->fds[0]);
  82. rii_check_posix(close(p->fds[0]));
  83. p->fds[0] = -1;
  84. }
  85. return 0;
  86. }
  87. static int
  88. cork_read_pipe_close_write(struct cork_read_pipe *p)
  89. {
  90. if (p->fds[1] != -1) {
  91. DEBUG("Closing write pipe %d\n", p->fds[1]);
  92. rii_check_posix(close(p->fds[1]));
  93. p->fds[1] = -1;
  94. }
  95. return 0;
  96. }
  97. static void
  98. cork_read_pipe_close(struct cork_read_pipe *p)
  99. {
  100. cork_read_pipe_close_read(p);
  101. cork_read_pipe_close_write(p);
  102. }
  103. static void
  104. cork_read_pipe_done(struct cork_read_pipe *p)
  105. {
  106. cork_read_pipe_close(p);
  107. }
  108. static int
  109. cork_read_pipe_open(struct cork_read_pipe *p)
  110. {
  111. if (p->consumer != NULL) {
  112. int flags;
  113. /* We want the read end of the pipe to be non-blocking. */
  114. DEBUG("[read] Opening pipe\n");
  115. rii_check_posix(pipe(p->fds));
  116. DEBUG("[read] Got read=%d write=%d\n", p->fds[0], p->fds[1]);
  117. DEBUG("[read] Setting non-blocking flag on read pipe\n");
  118. ei_check_posix(flags = fcntl(p->fds[0], F_GETFD));
  119. flags |= O_NONBLOCK;
  120. ei_check_posix(fcntl(p->fds[0], F_SETFD, flags));
  121. }
  122. p->first = true;
  123. return 0;
  124. error:
  125. cork_read_pipe_close(p);
  126. return -1;
  127. }
  128. static int
  129. cork_read_pipe_dup(struct cork_read_pipe *p, int fd)
  130. {
  131. if (p->fds[1] != -1) {
  132. rii_check_posix(dup2(p->fds[1], fd));
  133. }
  134. return 0;
  135. }
  136. static int
  137. cork_read_pipe_read(struct cork_read_pipe *p, char *buf, bool *progress)
  138. {
  139. if (p->fds[0] == -1) {
  140. return 0;
  141. }
  142. do {
  143. DEBUG("[read] Reading from pipe %d\n", p->fds[0]);
  144. ssize_t bytes_read = read(p->fds[0], buf, BUF_SIZE);
  145. if (bytes_read == -1) {
  146. if (errno == EAGAIN) {
  147. /* We've exhausted all of the data currently available. */
  148. DEBUG("[read] No more bytes without blocking\n");
  149. return 0;
  150. } else if (errno == EINTR) {
  151. /* Interrupted by a signal; return so that our wait loop can
  152. * catch that. */
  153. DEBUG("[read] Interrupted by signal\n");
  154. return 0;
  155. } else {
  156. /* An actual error */
  157. cork_system_error_set();
  158. DEBUG("[read] Error: %s\n", cork_error_message());
  159. return -1;
  160. }
  161. } else if (bytes_read == 0) {
  162. DEBUG("[read] End of stream\n");
  163. *progress = true;
  164. rii_check(cork_stream_consumer_eof(p->consumer));
  165. rii_check_posix(close(p->fds[0]));
  166. p->fds[0] = -1;
  167. return 0;
  168. } else {
  169. DEBUG("[read] Got %zd bytes\n", bytes_read);
  170. *progress = true;
  171. rii_check(cork_stream_consumer_data
  172. (p->consumer, buf, bytes_read, p->first));
  173. p->first = false;
  174. }
  175. } while (true);
  176. }
  177. static bool
  178. cork_read_pipe_is_finished(struct cork_read_pipe *p)
  179. {
  180. return p->fds[0] == -1;
  181. }
  182. /*-----------------------------------------------------------------------
  183. * Pipes (parent writes)
  184. */
  185. struct cork_write_pipe {
  186. struct cork_stream_consumer consumer;
  187. int fds[2];
  188. };
  189. static int
  190. cork_write_pipe_close_read(struct cork_write_pipe *p)
  191. {
  192. if (p->fds[0] != -1) {
  193. DEBUG("[write] Closing read pipe %d\n", p->fds[0]);
  194. rii_check_posix(close(p->fds[0]));
  195. p->fds[0] = -1;
  196. }
  197. return 0;
  198. }
  199. static int
  200. cork_write_pipe_close_write(struct cork_write_pipe *p)
  201. {
  202. if (p->fds[1] != -1) {
  203. DEBUG("[write] Closing write pipe %d\n", p->fds[1]);
  204. rii_check_posix(close(p->fds[1]));
  205. p->fds[1] = -1;
  206. }
  207. return 0;
  208. }
  209. static int
  210. cork_write_pipe__data(struct cork_stream_consumer *consumer,
  211. const void *buf, size_t size, bool is_first_chunk)
  212. {
  213. struct cork_write_pipe *p =
  214. cork_container_of(consumer, struct cork_write_pipe, consumer);
  215. rii_check_posix(write(p->fds[1], buf, size));
  216. return 0;
  217. }
  218. static int
  219. cork_write_pipe__eof(struct cork_stream_consumer *consumer)
  220. {
  221. struct cork_write_pipe *p =
  222. cork_container_of(consumer, struct cork_write_pipe, consumer);
  223. return cork_write_pipe_close_write(p);
  224. }
  225. static void
  226. cork_write_pipe__free(struct cork_stream_consumer *consumer)
  227. {
  228. }
  229. static void
  230. cork_write_pipe_init(struct cork_write_pipe *p)
  231. {
  232. p->consumer.data = cork_write_pipe__data;
  233. p->consumer.eof = cork_write_pipe__eof;
  234. p->consumer.free = cork_write_pipe__free;
  235. p->fds[0] = -1;
  236. p->fds[1] = -1;
  237. }
  238. static void
  239. cork_write_pipe_close(struct cork_write_pipe *p)
  240. {
  241. cork_write_pipe_close_read(p);
  242. cork_write_pipe_close_write(p);
  243. }
  244. static void
  245. cork_write_pipe_done(struct cork_write_pipe *p)
  246. {
  247. cork_write_pipe_close(p);
  248. }
  249. static int
  250. cork_write_pipe_open(struct cork_write_pipe *p)
  251. {
  252. DEBUG("[write] Opening writer pipe\n");
  253. rii_check_posix(pipe(p->fds));
  254. DEBUG("[write] Got read=%d write=%d\n", p->fds[0], p->fds[1]);
  255. return 0;
  256. }
  257. static int
  258. cork_write_pipe_dup(struct cork_write_pipe *p, int fd)
  259. {
  260. if (p->fds[0] != -1) {
  261. rii_check_posix(dup2(p->fds[0], fd));
  262. }
  263. return 0;
  264. }
  265. /*-----------------------------------------------------------------------
  266. * Subprocesses
  267. */
  268. struct cork_subprocess {
  269. pid_t pid;
  270. struct cork_write_pipe stdin_pipe;
  271. struct cork_read_pipe stdout_pipe;
  272. struct cork_read_pipe stderr_pipe;
  273. struct cork_thread_body *body;
  274. int *exit_code;
  275. char buf[BUF_SIZE];
  276. };
  277. struct cork_subprocess *
  278. cork_subprocess_new(struct cork_thread_body *body,
  279. struct cork_stream_consumer *stdout_consumer,
  280. struct cork_stream_consumer *stderr_consumer,
  281. int *exit_code)
  282. {
  283. struct cork_subprocess *self = cork_new(struct cork_subprocess);
  284. cork_write_pipe_init(&self->stdin_pipe);
  285. cork_read_pipe_init(&self->stdout_pipe, stdout_consumer);
  286. cork_read_pipe_init(&self->stderr_pipe, stderr_consumer);
  287. self->pid = 0;
  288. self->body = body;
  289. self->exit_code = exit_code;
  290. return self;
  291. }
  292. void
  293. cork_subprocess_free(struct cork_subprocess *self)
  294. {
  295. cork_thread_body_free(self->body);
  296. cork_write_pipe_done(&self->stdin_pipe);
  297. cork_read_pipe_done(&self->stdout_pipe);
  298. cork_read_pipe_done(&self->stderr_pipe);
  299. free(self);
  300. }
  301. struct cork_stream_consumer *
  302. cork_subprocess_stdin(struct cork_subprocess *self)
  303. {
  304. return &self->stdin_pipe.consumer;
  305. }
  306. /*-----------------------------------------------------------------------
  307. * Executing another program
  308. */
  309. struct cork_exec_body {
  310. struct cork_thread_body parent;
  311. struct cork_exec *exec;
  312. };
  313. static int
  314. cork_exec__run(struct cork_thread_body *vself)
  315. {
  316. struct cork_exec_body *self =
  317. cork_container_of(vself, struct cork_exec_body, parent);
  318. return cork_exec_run(self->exec);
  319. }
  320. static void
  321. cork_exec__free(struct cork_thread_body *vself)
  322. {
  323. struct cork_exec_body *self =
  324. cork_container_of(vself, struct cork_exec_body, parent);
  325. cork_exec_free(self->exec);
  326. free(self);
  327. }
  328. static struct cork_thread_body *
  329. cork_exec_body_new(struct cork_exec *exec)
  330. {
  331. struct cork_exec_body *self = cork_new(struct cork_exec_body);
  332. self->parent.run = cork_exec__run;
  333. self->parent.free = cork_exec__free;
  334. self->exec = exec;
  335. return &self->parent;
  336. }
  337. struct cork_subprocess *
  338. cork_subprocess_new_exec(struct cork_exec *exec,
  339. struct cork_stream_consumer *out,
  340. struct cork_stream_consumer *err,
  341. int *exit_code)
  342. {
  343. struct cork_thread_body *body = cork_exec_body_new(exec);
  344. return cork_subprocess_new(body, out, err, exit_code);
  345. }
  346. /*-----------------------------------------------------------------------
  347. * Running subprocesses
  348. */
  349. int
  350. cork_subprocess_start(struct cork_subprocess *self)
  351. {
  352. pid_t pid;
  353. /* Create the stdout and stderr pipes. */
  354. if (cork_write_pipe_open(&self->stdin_pipe) == -1) {
  355. return -1;
  356. }
  357. if (cork_read_pipe_open(&self->stdout_pipe) == -1) {
  358. cork_write_pipe_close(&self->stdin_pipe);
  359. return -1;
  360. }
  361. if (cork_read_pipe_open(&self->stderr_pipe) == -1) {
  362. cork_write_pipe_close(&self->stdin_pipe);
  363. cork_read_pipe_close(&self->stdout_pipe);
  364. return -1;
  365. }
  366. /* Fork the child process. */
  367. DEBUG("Forking child process\n");
  368. pid = fork();
  369. if (pid == 0) {
  370. /* Child process */
  371. int rc;
  372. /* Close the parent's end of the pipes */
  373. DEBUG("[child] ");
  374. cork_write_pipe_close_write(&self->stdin_pipe);
  375. DEBUG("[child] ");
  376. cork_read_pipe_close_read(&self->stdout_pipe);
  377. DEBUG("[child] ");
  378. cork_read_pipe_close_read(&self->stderr_pipe);
  379. /* Bind the stdout and stderr pipes */
  380. if (cork_write_pipe_dup(&self->stdin_pipe, STDIN_FILENO) == -1) {
  381. _exit(EXIT_FAILURE);
  382. }
  383. if (cork_read_pipe_dup(&self->stdout_pipe, STDOUT_FILENO) == -1) {
  384. _exit(EXIT_FAILURE);
  385. }
  386. if (cork_read_pipe_dup(&self->stderr_pipe, STDERR_FILENO) == -1) {
  387. _exit(EXIT_FAILURE);
  388. }
  389. /* Run the subprocess's body */
  390. rc = cork_thread_body_run(self->body);
  391. if (CORK_LIKELY(rc == 0)) {
  392. _exit(EXIT_SUCCESS);
  393. } else {
  394. fprintf(stderr, "%s\n", cork_error_message());
  395. _exit(EXIT_FAILURE);
  396. }
  397. } else if (pid < 0) {
  398. /* Error forking */
  399. cork_system_error_set();
  400. return -1;
  401. } else {
  402. /* Parent process */
  403. DEBUG(" Child PID=%d\n", (int) pid);
  404. self->pid = pid;
  405. cork_write_pipe_close_read(&self->stdin_pipe);
  406. cork_read_pipe_close_write(&self->stdout_pipe);
  407. cork_read_pipe_close_write(&self->stderr_pipe);
  408. return 0;
  409. }
  410. }
  411. static int
  412. cork_subprocess_reap(struct cork_subprocess *self, int flags, bool *progress)
  413. {
  414. int pid;
  415. int status;
  416. rii_check_posix(pid = waitpid(self->pid, &status, flags));
  417. if (pid == self->pid) {
  418. *progress = true;
  419. self->pid = 0;
  420. if (self->exit_code != NULL) {
  421. *self->exit_code = WEXITSTATUS(status);
  422. }
  423. }
  424. return 0;
  425. }
  426. int
  427. cork_subprocess_abort(struct cork_subprocess *self)
  428. {
  429. if (self->pid > 0) {
  430. CORK_ATTR_UNUSED bool progress;
  431. DEBUG("Terminating child process %d\n", (int) self->pid);
  432. kill(self->pid, SIGTERM);
  433. return cork_subprocess_reap(self, 0, &progress);
  434. } else {
  435. return 0;
  436. }
  437. }
  438. bool
  439. cork_subprocess_is_finished(struct cork_subprocess *self)
  440. {
  441. return (self->pid == 0)
  442. && cork_read_pipe_is_finished(&self->stdout_pipe)
  443. && cork_read_pipe_is_finished(&self->stderr_pipe);
  444. }
  445. #if defined(__APPLE__) || defined(__MINGW32__)
  446. #include <pthread.h>
  447. #define THREAD_YIELD pthread_yield_np
  448. #elif defined(__linux__) || defined(BSD)
  449. #include <sched.h>
  450. #define THREAD_YIELD sched_yield
  451. #else
  452. #error "Unknown thread yield implementation"
  453. #endif
  454. static void
  455. cork_subprocess_yield(unsigned int *spin_count)
  456. {
  457. /* Adapted from
  458. * http://www.1024cores.net/home/lock-free-algorithms/tricks/spinning */
  459. if (*spin_count < 10) {
  460. /* Spin-wait */
  461. cork_pause();
  462. } else if (*spin_count < 20) {
  463. /* A more intense spin-wait */
  464. int i;
  465. for (i = 0; i < 50; i++) {
  466. cork_pause();
  467. }
  468. } else if (*spin_count < 22) {
  469. THREAD_YIELD();
  470. } else if (*spin_count < 24) {
  471. usleep(0);
  472. } else if (*spin_count < 50) {
  473. usleep(1);
  474. } else if (*spin_count < 75) {
  475. usleep((*spin_count - 49) * 1000);
  476. } else {
  477. usleep(25000);
  478. }
  479. (*spin_count)++;
  480. }
  481. static int
  482. cork_subprocess_drain_(struct cork_subprocess *self, bool *progress)
  483. {
  484. rii_check(cork_read_pipe_read(&self->stdout_pipe, self->buf, progress));
  485. rii_check(cork_read_pipe_read(&self->stderr_pipe, self->buf, progress));
  486. if (self->pid > 0) {
  487. return cork_subprocess_reap(self, WNOHANG, progress);
  488. } else {
  489. return 0;
  490. }
  491. }
  492. bool
  493. cork_subprocess_drain(struct cork_subprocess *self)
  494. {
  495. bool progress;
  496. cork_subprocess_drain_(self, &progress);
  497. return progress;
  498. }
  499. int
  500. cork_subprocess_wait(struct cork_subprocess *self)
  501. {
  502. unsigned int spin_count = 0;
  503. bool progress;
  504. while (!cork_subprocess_is_finished(self)) {
  505. progress = false;
  506. rii_check(cork_subprocess_drain_(self, &progress));
  507. if (!progress) {
  508. cork_subprocess_yield(&spin_count);
  509. }
  510. }
  511. return 0;
  512. }
  513. /*-----------------------------------------------------------------------
  514. * Running subprocess groups
  515. */
  516. static int
  517. cork_subprocess_group_terminate(struct cork_subprocess_group *group)
  518. {
  519. size_t i;
  520. for (i = 0; i < cork_array_size(&group->subprocesses); i++) {
  521. struct cork_subprocess *sub = cork_array_at(&group->subprocesses, i);
  522. rii_check(cork_subprocess_abort(sub));
  523. }
  524. return 0;
  525. }
  526. int
  527. cork_subprocess_group_start(struct cork_subprocess_group *group)
  528. {
  529. size_t i;
  530. DEBUG("Starting subprocess group\n");
  531. /* Start each subprocess. */
  532. for (i = 0; i < cork_array_size(&group->subprocesses); i++) {
  533. struct cork_subprocess *sub = cork_array_at(&group->subprocesses, i);
  534. ei_check(cork_subprocess_start(sub));
  535. }
  536. return 0;
  537. error:
  538. cork_subprocess_group_terminate(group);
  539. return -1;
  540. }
  541. int
  542. cork_subprocess_group_abort(struct cork_subprocess_group *group)
  543. {
  544. DEBUG("Aborting subprocess group\n");
  545. return cork_subprocess_group_terminate(group);
  546. }
  547. bool
  548. cork_subprocess_group_is_finished(struct cork_subprocess_group *group)
  549. {
  550. size_t i;
  551. for (i = 0; i < cork_array_size(&group->subprocesses); i++) {
  552. struct cork_subprocess *sub = cork_array_at(&group->subprocesses, i);
  553. bool sub_finished = cork_subprocess_is_finished(sub);
  554. if (!sub_finished) {
  555. return false;
  556. }
  557. }
  558. return true;
  559. }
  560. static int
  561. cork_subprocess_group_drain_(struct cork_subprocess_group *group,
  562. bool *progress)
  563. {
  564. size_t i;
  565. for (i = 0; i < cork_array_size(&group->subprocesses); i++) {
  566. struct cork_subprocess *sub = cork_array_at(&group->subprocesses, i);
  567. rii_check(cork_subprocess_drain_(sub, progress));
  568. }
  569. return 0;
  570. }
  571. bool
  572. cork_subprocess_group_drain(struct cork_subprocess_group *group)
  573. {
  574. bool progress = false;
  575. cork_subprocess_group_drain_(group, &progress);
  576. return progress;
  577. }
  578. int
  579. cork_subprocess_group_wait(struct cork_subprocess_group *group)
  580. {
  581. unsigned int spin_count = 0;
  582. bool progress;
  583. DEBUG("Waiting for subprocess group to finish\n");
  584. while (!cork_subprocess_group_is_finished(group)) {
  585. progress = false;
  586. rii_check(cork_subprocess_group_drain_(group, &progress));
  587. if (!progress) {
  588. cork_subprocess_yield(&spin_count);
  589. }
  590. }
  591. return 0;
  592. }