/* -*- coding: utf-8 -*- * ---------------------------------------------------------------------- * Copyright © 2012-2013, RedJack, LLC. * All rights reserved. * * Please see the COPYING file in this distribution for license * details. * ---------------------------------------------------------------------- */ #include #include #include #include #include #include #include #include "libcork/core.h" #include "libcork/ds.h" #include "libcork/os/subprocess.h" #include "libcork/threads/basics.h" #include "libcork/helpers/errors.h" #include "libcork/helpers/posix.h" #if !defined(CORK_DEBUG_SUBPROCESS) #define CORK_DEBUG_SUBPROCESS 0 #endif #if CORK_DEBUG_SUBPROCESS #include #define DEBUG(...) fprintf(stderr, __VA_ARGS__) #else #define DEBUG(...) /* no debug messages */ #endif /*----------------------------------------------------------------------- * Subprocess groups */ #define BUF_SIZE 4096 struct cork_subprocess_group { cork_array(struct cork_subprocess *) subprocesses; }; struct cork_subprocess_group * cork_subprocess_group_new(void) { struct cork_subprocess_group *group = cork_new(struct cork_subprocess_group); cork_pointer_array_init (&group->subprocesses, (cork_free_f) cork_subprocess_free); return group; } void cork_subprocess_group_free(struct cork_subprocess_group *group) { cork_array_done(&group->subprocesses); free(group); } void cork_subprocess_group_add(struct cork_subprocess_group *group, struct cork_subprocess *sub) { cork_array_append(&group->subprocesses, sub); } /*----------------------------------------------------------------------- * Pipes (parent reads) */ struct cork_read_pipe { struct cork_stream_consumer *consumer; int fds[2]; bool first; }; static void cork_read_pipe_init(struct cork_read_pipe *p, struct cork_stream_consumer *consumer) { p->consumer = consumer; p->fds[0] = -1; p->fds[1] = -1; } static int cork_read_pipe_close_read(struct cork_read_pipe *p) { if (p->fds[0] != -1) { DEBUG("Closing read pipe %d\n", p->fds[0]); rii_check_posix(close(p->fds[0])); p->fds[0] = -1; } return 0; } static int cork_read_pipe_close_write(struct cork_read_pipe *p) { if (p->fds[1] != -1) { DEBUG("Closing write pipe %d\n", p->fds[1]); rii_check_posix(close(p->fds[1])); p->fds[1] = -1; } return 0; } static void cork_read_pipe_close(struct cork_read_pipe *p) { cork_read_pipe_close_read(p); cork_read_pipe_close_write(p); } static void cork_read_pipe_done(struct cork_read_pipe *p) { cork_read_pipe_close(p); } static int cork_read_pipe_open(struct cork_read_pipe *p) { if (p->consumer != NULL) { int flags; /* We want the read end of the pipe to be non-blocking. */ DEBUG("[read] Opening pipe\n"); rii_check_posix(pipe(p->fds)); DEBUG("[read] Got read=%d write=%d\n", p->fds[0], p->fds[1]); DEBUG("[read] Setting non-blocking flag on read pipe\n"); ei_check_posix(flags = fcntl(p->fds[0], F_GETFD)); flags |= O_NONBLOCK; ei_check_posix(fcntl(p->fds[0], F_SETFD, flags)); } p->first = true; return 0; error: cork_read_pipe_close(p); return -1; } static int cork_read_pipe_dup(struct cork_read_pipe *p, int fd) { if (p->fds[1] != -1) { rii_check_posix(dup2(p->fds[1], fd)); } return 0; } static int cork_read_pipe_read(struct cork_read_pipe *p, char *buf, bool *progress) { if (p->fds[0] == -1) { return 0; } do { DEBUG("[read] Reading from pipe %d\n", p->fds[0]); ssize_t bytes_read = read(p->fds[0], buf, BUF_SIZE); if (bytes_read == -1) { if (errno == EAGAIN) { /* We've exhausted all of the data currently available. */ DEBUG("[read] No more bytes without blocking\n"); return 0; } else if (errno == EINTR) { /* Interrupted by a signal; return so that our wait loop can * catch that. */ DEBUG("[read] Interrupted by signal\n"); return 0; } else { /* An actual error */ cork_system_error_set(); DEBUG("[read] Error: %s\n", cork_error_message()); return -1; } } else if (bytes_read == 0) { DEBUG("[read] End of stream\n"); *progress = true; rii_check(cork_stream_consumer_eof(p->consumer)); rii_check_posix(close(p->fds[0])); p->fds[0] = -1; return 0; } else { DEBUG("[read] Got %zd bytes\n", bytes_read); *progress = true; rii_check(cork_stream_consumer_data (p->consumer, buf, bytes_read, p->first)); p->first = false; } } while (true); } static bool cork_read_pipe_is_finished(struct cork_read_pipe *p) { return p->fds[0] == -1; } /*----------------------------------------------------------------------- * Pipes (parent writes) */ struct cork_write_pipe { struct cork_stream_consumer consumer; int fds[2]; }; static int cork_write_pipe_close_read(struct cork_write_pipe *p) { if (p->fds[0] != -1) { DEBUG("[write] Closing read pipe %d\n", p->fds[0]); rii_check_posix(close(p->fds[0])); p->fds[0] = -1; } return 0; } static int cork_write_pipe_close_write(struct cork_write_pipe *p) { if (p->fds[1] != -1) { DEBUG("[write] Closing write pipe %d\n", p->fds[1]); rii_check_posix(close(p->fds[1])); p->fds[1] = -1; } return 0; } static int cork_write_pipe__data(struct cork_stream_consumer *consumer, const void *buf, size_t size, bool is_first_chunk) { struct cork_write_pipe *p = cork_container_of(consumer, struct cork_write_pipe, consumer); rii_check_posix(write(p->fds[1], buf, size)); return 0; } static int cork_write_pipe__eof(struct cork_stream_consumer *consumer) { struct cork_write_pipe *p = cork_container_of(consumer, struct cork_write_pipe, consumer); return cork_write_pipe_close_write(p); } static void cork_write_pipe__free(struct cork_stream_consumer *consumer) { } static void cork_write_pipe_init(struct cork_write_pipe *p) { p->consumer.data = cork_write_pipe__data; p->consumer.eof = cork_write_pipe__eof; p->consumer.free = cork_write_pipe__free; p->fds[0] = -1; p->fds[1] = -1; } static void cork_write_pipe_close(struct cork_write_pipe *p) { cork_write_pipe_close_read(p); cork_write_pipe_close_write(p); } static void cork_write_pipe_done(struct cork_write_pipe *p) { cork_write_pipe_close(p); } static int cork_write_pipe_open(struct cork_write_pipe *p) { DEBUG("[write] Opening writer pipe\n"); rii_check_posix(pipe(p->fds)); DEBUG("[write] Got read=%d write=%d\n", p->fds[0], p->fds[1]); return 0; } static int cork_write_pipe_dup(struct cork_write_pipe *p, int fd) { if (p->fds[0] != -1) { rii_check_posix(dup2(p->fds[0], fd)); } return 0; } /*----------------------------------------------------------------------- * Subprocesses */ struct cork_subprocess { pid_t pid; struct cork_write_pipe stdin_pipe; struct cork_read_pipe stdout_pipe; struct cork_read_pipe stderr_pipe; struct cork_thread_body *body; int *exit_code; char buf[BUF_SIZE]; }; struct cork_subprocess * cork_subprocess_new(struct cork_thread_body *body, struct cork_stream_consumer *stdout_consumer, struct cork_stream_consumer *stderr_consumer, int *exit_code) { struct cork_subprocess *self = cork_new(struct cork_subprocess); cork_write_pipe_init(&self->stdin_pipe); cork_read_pipe_init(&self->stdout_pipe, stdout_consumer); cork_read_pipe_init(&self->stderr_pipe, stderr_consumer); self->pid = 0; self->body = body; self->exit_code = exit_code; return self; } void cork_subprocess_free(struct cork_subprocess *self) { cork_thread_body_free(self->body); cork_write_pipe_done(&self->stdin_pipe); cork_read_pipe_done(&self->stdout_pipe); cork_read_pipe_done(&self->stderr_pipe); free(self); } struct cork_stream_consumer * cork_subprocess_stdin(struct cork_subprocess *self) { return &self->stdin_pipe.consumer; } /*----------------------------------------------------------------------- * Executing another program */ struct cork_exec_body { struct cork_thread_body parent; struct cork_exec *exec; }; static int cork_exec__run(struct cork_thread_body *vself) { struct cork_exec_body *self = cork_container_of(vself, struct cork_exec_body, parent); return cork_exec_run(self->exec); } static void cork_exec__free(struct cork_thread_body *vself) { struct cork_exec_body *self = cork_container_of(vself, struct cork_exec_body, parent); cork_exec_free(self->exec); free(self); } static struct cork_thread_body * cork_exec_body_new(struct cork_exec *exec) { struct cork_exec_body *self = cork_new(struct cork_exec_body); self->parent.run = cork_exec__run; self->parent.free = cork_exec__free; self->exec = exec; return &self->parent; } struct cork_subprocess * cork_subprocess_new_exec(struct cork_exec *exec, struct cork_stream_consumer *out, struct cork_stream_consumer *err, int *exit_code) { struct cork_thread_body *body = cork_exec_body_new(exec); return cork_subprocess_new(body, out, err, exit_code); } /*----------------------------------------------------------------------- * Running subprocesses */ int cork_subprocess_start(struct cork_subprocess *self) { pid_t pid; /* Create the stdout and stderr pipes. */ if (cork_write_pipe_open(&self->stdin_pipe) == -1) { return -1; } if (cork_read_pipe_open(&self->stdout_pipe) == -1) { cork_write_pipe_close(&self->stdin_pipe); return -1; } if (cork_read_pipe_open(&self->stderr_pipe) == -1) { cork_write_pipe_close(&self->stdin_pipe); cork_read_pipe_close(&self->stdout_pipe); return -1; } /* Fork the child process. */ DEBUG("Forking child process\n"); pid = fork(); if (pid == 0) { /* Child process */ int rc; /* Close the parent's end of the pipes */ DEBUG("[child] "); cork_write_pipe_close_write(&self->stdin_pipe); DEBUG("[child] "); cork_read_pipe_close_read(&self->stdout_pipe); DEBUG("[child] "); cork_read_pipe_close_read(&self->stderr_pipe); /* Bind the stdout and stderr pipes */ if (cork_write_pipe_dup(&self->stdin_pipe, STDIN_FILENO) == -1) { _exit(EXIT_FAILURE); } if (cork_read_pipe_dup(&self->stdout_pipe, STDOUT_FILENO) == -1) { _exit(EXIT_FAILURE); } if (cork_read_pipe_dup(&self->stderr_pipe, STDERR_FILENO) == -1) { _exit(EXIT_FAILURE); } /* Run the subprocess's body */ rc = cork_thread_body_run(self->body); if (CORK_LIKELY(rc == 0)) { _exit(EXIT_SUCCESS); } else { fprintf(stderr, "%s\n", cork_error_message()); _exit(EXIT_FAILURE); } } else if (pid < 0) { /* Error forking */ cork_system_error_set(); return -1; } else { /* Parent process */ DEBUG(" Child PID=%d\n", (int) pid); self->pid = pid; cork_write_pipe_close_read(&self->stdin_pipe); cork_read_pipe_close_write(&self->stdout_pipe); cork_read_pipe_close_write(&self->stderr_pipe); return 0; } } static int cork_subprocess_reap(struct cork_subprocess *self, int flags, bool *progress) { int pid; int status; rii_check_posix(pid = waitpid(self->pid, &status, flags)); if (pid == self->pid) { *progress = true; self->pid = 0; if (self->exit_code != NULL) { *self->exit_code = WEXITSTATUS(status); } } return 0; } int cork_subprocess_abort(struct cork_subprocess *self) { if (self->pid > 0) { CORK_ATTR_UNUSED bool progress; DEBUG("Terminating child process %d\n", (int) self->pid); kill(self->pid, SIGTERM); return cork_subprocess_reap(self, 0, &progress); } else { return 0; } } bool cork_subprocess_is_finished(struct cork_subprocess *self) { return (self->pid == 0) && cork_read_pipe_is_finished(&self->stdout_pipe) && cork_read_pipe_is_finished(&self->stderr_pipe); } #if defined(__APPLE__) #include #define THREAD_YIELD pthread_yield_np #elif defined(__linux__) || defined(BSD) #include #define THREAD_YIELD sched_yield #else #error "Unknown thread yield implementation" #endif static void cork_subprocess_yield(unsigned int *spin_count) { /* Adapted from * http://www.1024cores.net/home/lock-free-algorithms/tricks/spinning */ if (*spin_count < 10) { /* Spin-wait */ cork_pause(); } else if (*spin_count < 20) { /* A more intense spin-wait */ int i; for (i = 0; i < 50; i++) { cork_pause(); } } else if (*spin_count < 22) { THREAD_YIELD(); } else if (*spin_count < 24) { usleep(0); } else if (*spin_count < 50) { usleep(1); } else if (*spin_count < 75) { usleep((*spin_count - 49) * 1000); } else { usleep(25000); } (*spin_count)++; } static int cork_subprocess_drain_(struct cork_subprocess *self, bool *progress) { rii_check(cork_read_pipe_read(&self->stdout_pipe, self->buf, progress)); rii_check(cork_read_pipe_read(&self->stderr_pipe, self->buf, progress)); if (self->pid > 0) { return cork_subprocess_reap(self, WNOHANG, progress); } else { return 0; } } bool cork_subprocess_drain(struct cork_subprocess *self) { bool progress; cork_subprocess_drain_(self, &progress); return progress; } int cork_subprocess_wait(struct cork_subprocess *self) { unsigned int spin_count = 0; bool progress; while (!cork_subprocess_is_finished(self)) { progress = false; rii_check(cork_subprocess_drain_(self, &progress)); if (!progress) { cork_subprocess_yield(&spin_count); } } return 0; } /*----------------------------------------------------------------------- * Running subprocess groups */ static int cork_subprocess_group_terminate(struct cork_subprocess_group *group) { size_t i; for (i = 0; i < cork_array_size(&group->subprocesses); i++) { struct cork_subprocess *sub = cork_array_at(&group->subprocesses, i); rii_check(cork_subprocess_abort(sub)); } return 0; } int cork_subprocess_group_start(struct cork_subprocess_group *group) { size_t i; DEBUG("Starting subprocess group\n"); /* Start each subprocess. */ for (i = 0; i < cork_array_size(&group->subprocesses); i++) { struct cork_subprocess *sub = cork_array_at(&group->subprocesses, i); ei_check(cork_subprocess_start(sub)); } return 0; error: cork_subprocess_group_terminate(group); return -1; } int cork_subprocess_group_abort(struct cork_subprocess_group *group) { DEBUG("Aborting subprocess group\n"); return cork_subprocess_group_terminate(group); } bool cork_subprocess_group_is_finished(struct cork_subprocess_group *group) { size_t i; for (i = 0; i < cork_array_size(&group->subprocesses); i++) { struct cork_subprocess *sub = cork_array_at(&group->subprocesses, i); bool sub_finished = cork_subprocess_is_finished(sub); if (!sub_finished) { return false; } } return true; } static int cork_subprocess_group_drain_(struct cork_subprocess_group *group, bool *progress) { size_t i; for (i = 0; i < cork_array_size(&group->subprocesses); i++) { struct cork_subprocess *sub = cork_array_at(&group->subprocesses, i); rii_check(cork_subprocess_drain_(sub, progress)); } return 0; } bool cork_subprocess_group_drain(struct cork_subprocess_group *group) { bool progress = false; cork_subprocess_group_drain_(group, &progress); return progress; } int cork_subprocess_group_wait(struct cork_subprocess_group *group) { unsigned int spin_count = 0; bool progress; DEBUG("Waiting for subprocess group to finish\n"); while (!cork_subprocess_group_is_finished(group)) { progress = false; rii_check(cork_subprocess_group_drain_(group, &progress)); if (!progress) { cork_subprocess_yield(&spin_count); } } return 0; }