You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 

676 lines
17 KiB

/* -*- coding: utf-8 -*-
* ----------------------------------------------------------------------
* Copyright © 2012-2013, RedJack, LLC.
* All rights reserved.
*
* Please see the COPYING file in this distribution for license
* details.
* ----------------------------------------------------------------------
*/
#include <assert.h>
#include <errno.h>
#include <fcntl.h>
#include <signal.h>
#ifndef __MINGW32__
#include <sys/select.h>
#include <sys/wait.h>
#endif
#include <unistd.h>
#include "libcork/core.h"
#include "libcork/ds.h"
#include "libcork/os/subprocess.h"
#include "libcork/threads/basics.h"
#include "libcork/helpers/errors.h"
#include "libcork/helpers/posix.h"
#if !defined(CORK_DEBUG_SUBPROCESS)
#define CORK_DEBUG_SUBPROCESS 0
#endif
#if CORK_DEBUG_SUBPROCESS
#include <stdio.h>
#define DEBUG(...) fprintf(stderr, __VA_ARGS__)
#else
#define DEBUG(...) /* no debug messages */
#endif
/*-----------------------------------------------------------------------
* Subprocess groups
*/
#define BUF_SIZE 4096
struct cork_subprocess_group {
cork_array(struct cork_subprocess *) subprocesses;
};
struct cork_subprocess_group *
cork_subprocess_group_new(void)
{
struct cork_subprocess_group *group =
cork_new(struct cork_subprocess_group);
cork_pointer_array_init
(&group->subprocesses, (cork_free_f) cork_subprocess_free);
return group;
}
void
cork_subprocess_group_free(struct cork_subprocess_group *group)
{
cork_array_done(&group->subprocesses);
free(group);
}
void
cork_subprocess_group_add(struct cork_subprocess_group *group,
struct cork_subprocess *sub)
{
cork_array_append(&group->subprocesses, sub);
}
/*-----------------------------------------------------------------------
* Pipes (parent reads)
*/
struct cork_read_pipe {
struct cork_stream_consumer *consumer;
int fds[2];
bool first;
};
static void
cork_read_pipe_init(struct cork_read_pipe *p, struct cork_stream_consumer *consumer)
{
p->consumer = consumer;
p->fds[0] = -1;
p->fds[1] = -1;
}
static int
cork_read_pipe_close_read(struct cork_read_pipe *p)
{
if (p->fds[0] != -1) {
DEBUG("Closing read pipe %d\n", p->fds[0]);
rii_check_posix(close(p->fds[0]));
p->fds[0] = -1;
}
return 0;
}
static int
cork_read_pipe_close_write(struct cork_read_pipe *p)
{
if (p->fds[1] != -1) {
DEBUG("Closing write pipe %d\n", p->fds[1]);
rii_check_posix(close(p->fds[1]));
p->fds[1] = -1;
}
return 0;
}
static void
cork_read_pipe_close(struct cork_read_pipe *p)
{
cork_read_pipe_close_read(p);
cork_read_pipe_close_write(p);
}
static void
cork_read_pipe_done(struct cork_read_pipe *p)
{
cork_read_pipe_close(p);
}
static int
cork_read_pipe_open(struct cork_read_pipe *p)
{
if (p->consumer != NULL) {
int flags;
/* We want the read end of the pipe to be non-blocking. */
DEBUG("[read] Opening pipe\n");
rii_check_posix(pipe(p->fds));
DEBUG("[read] Got read=%d write=%d\n", p->fds[0], p->fds[1]);
DEBUG("[read] Setting non-blocking flag on read pipe\n");
ei_check_posix(flags = fcntl(p->fds[0], F_GETFD));
flags |= O_NONBLOCK;
ei_check_posix(fcntl(p->fds[0], F_SETFD, flags));
}
p->first = true;
return 0;
error:
cork_read_pipe_close(p);
return -1;
}
static int
cork_read_pipe_dup(struct cork_read_pipe *p, int fd)
{
if (p->fds[1] != -1) {
rii_check_posix(dup2(p->fds[1], fd));
}
return 0;
}
static int
cork_read_pipe_read(struct cork_read_pipe *p, char *buf, bool *progress)
{
if (p->fds[0] == -1) {
return 0;
}
do {
DEBUG("[read] Reading from pipe %d\n", p->fds[0]);
ssize_t bytes_read = read(p->fds[0], buf, BUF_SIZE);
if (bytes_read == -1) {
if (errno == EAGAIN) {
/* We've exhausted all of the data currently available. */
DEBUG("[read] No more bytes without blocking\n");
return 0;
} else if (errno == EINTR) {
/* Interrupted by a signal; return so that our wait loop can
* catch that. */
DEBUG("[read] Interrupted by signal\n");
return 0;
} else {
/* An actual error */
cork_system_error_set();
DEBUG("[read] Error: %s\n", cork_error_message());
return -1;
}
} else if (bytes_read == 0) {
DEBUG("[read] End of stream\n");
*progress = true;
rii_check(cork_stream_consumer_eof(p->consumer));
rii_check_posix(close(p->fds[0]));
p->fds[0] = -1;
return 0;
} else {
DEBUG("[read] Got %zd bytes\n", bytes_read);
*progress = true;
rii_check(cork_stream_consumer_data
(p->consumer, buf, bytes_read, p->first));
p->first = false;
}
} while (true);
}
static bool
cork_read_pipe_is_finished(struct cork_read_pipe *p)
{
return p->fds[0] == -1;
}
/*-----------------------------------------------------------------------
* Pipes (parent writes)
*/
struct cork_write_pipe {
struct cork_stream_consumer consumer;
int fds[2];
};
static int
cork_write_pipe_close_read(struct cork_write_pipe *p)
{
if (p->fds[0] != -1) {
DEBUG("[write] Closing read pipe %d\n", p->fds[0]);
rii_check_posix(close(p->fds[0]));
p->fds[0] = -1;
}
return 0;
}
static int
cork_write_pipe_close_write(struct cork_write_pipe *p)
{
if (p->fds[1] != -1) {
DEBUG("[write] Closing write pipe %d\n", p->fds[1]);
rii_check_posix(close(p->fds[1]));
p->fds[1] = -1;
}
return 0;
}
static int
cork_write_pipe__data(struct cork_stream_consumer *consumer,
const void *buf, size_t size, bool is_first_chunk)
{
struct cork_write_pipe *p =
cork_container_of(consumer, struct cork_write_pipe, consumer);
rii_check_posix(write(p->fds[1], buf, size));
return 0;
}
static int
cork_write_pipe__eof(struct cork_stream_consumer *consumer)
{
struct cork_write_pipe *p =
cork_container_of(consumer, struct cork_write_pipe, consumer);
return cork_write_pipe_close_write(p);
}
static void
cork_write_pipe__free(struct cork_stream_consumer *consumer)
{
}
static void
cork_write_pipe_init(struct cork_write_pipe *p)
{
p->consumer.data = cork_write_pipe__data;
p->consumer.eof = cork_write_pipe__eof;
p->consumer.free = cork_write_pipe__free;
p->fds[0] = -1;
p->fds[1] = -1;
}
static void
cork_write_pipe_close(struct cork_write_pipe *p)
{
cork_write_pipe_close_read(p);
cork_write_pipe_close_write(p);
}
static void
cork_write_pipe_done(struct cork_write_pipe *p)
{
cork_write_pipe_close(p);
}
static int
cork_write_pipe_open(struct cork_write_pipe *p)
{
DEBUG("[write] Opening writer pipe\n");
rii_check_posix(pipe(p->fds));
DEBUG("[write] Got read=%d write=%d\n", p->fds[0], p->fds[1]);
return 0;
}
static int
cork_write_pipe_dup(struct cork_write_pipe *p, int fd)
{
if (p->fds[0] != -1) {
rii_check_posix(dup2(p->fds[0], fd));
}
return 0;
}
/*-----------------------------------------------------------------------
* Subprocesses
*/
struct cork_subprocess {
pid_t pid;
struct cork_write_pipe stdin_pipe;
struct cork_read_pipe stdout_pipe;
struct cork_read_pipe stderr_pipe;
struct cork_thread_body *body;
int *exit_code;
char buf[BUF_SIZE];
};
struct cork_subprocess *
cork_subprocess_new(struct cork_thread_body *body,
struct cork_stream_consumer *stdout_consumer,
struct cork_stream_consumer *stderr_consumer,
int *exit_code)
{
struct cork_subprocess *self = cork_new(struct cork_subprocess);
cork_write_pipe_init(&self->stdin_pipe);
cork_read_pipe_init(&self->stdout_pipe, stdout_consumer);
cork_read_pipe_init(&self->stderr_pipe, stderr_consumer);
self->pid = 0;
self->body = body;
self->exit_code = exit_code;
return self;
}
void
cork_subprocess_free(struct cork_subprocess *self)
{
cork_thread_body_free(self->body);
cork_write_pipe_done(&self->stdin_pipe);
cork_read_pipe_done(&self->stdout_pipe);
cork_read_pipe_done(&self->stderr_pipe);
free(self);
}
struct cork_stream_consumer *
cork_subprocess_stdin(struct cork_subprocess *self)
{
return &self->stdin_pipe.consumer;
}
/*-----------------------------------------------------------------------
* Executing another program
*/
struct cork_exec_body {
struct cork_thread_body parent;
struct cork_exec *exec;
};
static int
cork_exec__run(struct cork_thread_body *vself)
{
struct cork_exec_body *self =
cork_container_of(vself, struct cork_exec_body, parent);
return cork_exec_run(self->exec);
}
static void
cork_exec__free(struct cork_thread_body *vself)
{
struct cork_exec_body *self =
cork_container_of(vself, struct cork_exec_body, parent);
cork_exec_free(self->exec);
free(self);
}
static struct cork_thread_body *
cork_exec_body_new(struct cork_exec *exec)
{
struct cork_exec_body *self = cork_new(struct cork_exec_body);
self->parent.run = cork_exec__run;
self->parent.free = cork_exec__free;
self->exec = exec;
return &self->parent;
}
struct cork_subprocess *
cork_subprocess_new_exec(struct cork_exec *exec,
struct cork_stream_consumer *out,
struct cork_stream_consumer *err,
int *exit_code)
{
struct cork_thread_body *body = cork_exec_body_new(exec);
return cork_subprocess_new(body, out, err, exit_code);
}
/*-----------------------------------------------------------------------
* Running subprocesses
*/
int
cork_subprocess_start(struct cork_subprocess *self)
{
pid_t pid;
/* Create the stdout and stderr pipes. */
if (cork_write_pipe_open(&self->stdin_pipe) == -1) {
return -1;
}
if (cork_read_pipe_open(&self->stdout_pipe) == -1) {
cork_write_pipe_close(&self->stdin_pipe);
return -1;
}
if (cork_read_pipe_open(&self->stderr_pipe) == -1) {
cork_write_pipe_close(&self->stdin_pipe);
cork_read_pipe_close(&self->stdout_pipe);
return -1;
}
/* Fork the child process. */
DEBUG("Forking child process\n");
pid = fork();
if (pid == 0) {
/* Child process */
int rc;
/* Close the parent's end of the pipes */
DEBUG("[child] ");
cork_write_pipe_close_write(&self->stdin_pipe);
DEBUG("[child] ");
cork_read_pipe_close_read(&self->stdout_pipe);
DEBUG("[child] ");
cork_read_pipe_close_read(&self->stderr_pipe);
/* Bind the stdout and stderr pipes */
if (cork_write_pipe_dup(&self->stdin_pipe, STDIN_FILENO) == -1) {
_exit(EXIT_FAILURE);
}
if (cork_read_pipe_dup(&self->stdout_pipe, STDOUT_FILENO) == -1) {
_exit(EXIT_FAILURE);
}
if (cork_read_pipe_dup(&self->stderr_pipe, STDERR_FILENO) == -1) {
_exit(EXIT_FAILURE);
}
/* Run the subprocess's body */
rc = cork_thread_body_run(self->body);
if (CORK_LIKELY(rc == 0)) {
_exit(EXIT_SUCCESS);
} else {
fprintf(stderr, "%s\n", cork_error_message());
_exit(EXIT_FAILURE);
}
} else if (pid < 0) {
/* Error forking */
cork_system_error_set();
return -1;
} else {
/* Parent process */
DEBUG(" Child PID=%d\n", (int) pid);
self->pid = pid;
cork_write_pipe_close_read(&self->stdin_pipe);
cork_read_pipe_close_write(&self->stdout_pipe);
cork_read_pipe_close_write(&self->stderr_pipe);
return 0;
}
}
static int
cork_subprocess_reap(struct cork_subprocess *self, int flags, bool *progress)
{
int pid;
int status;
rii_check_posix(pid = waitpid(self->pid, &status, flags));
if (pid == self->pid) {
*progress = true;
self->pid = 0;
if (self->exit_code != NULL) {
*self->exit_code = WEXITSTATUS(status);
}
}
return 0;
}
int
cork_subprocess_abort(struct cork_subprocess *self)
{
if (self->pid > 0) {
CORK_ATTR_UNUSED bool progress;
DEBUG("Terminating child process %d\n", (int) self->pid);
kill(self->pid, SIGTERM);
return cork_subprocess_reap(self, 0, &progress);
} else {
return 0;
}
}
bool
cork_subprocess_is_finished(struct cork_subprocess *self)
{
return (self->pid == 0)
&& cork_read_pipe_is_finished(&self->stdout_pipe)
&& cork_read_pipe_is_finished(&self->stderr_pipe);
}
#if defined(__APPLE__) || defined(__MINGW32__)
#include <pthread.h>
#define THREAD_YIELD pthread_yield_np
#elif defined(__linux__) || defined(BSD)
#include <sched.h>
#define THREAD_YIELD sched_yield
#else
#error "Unknown thread yield implementation"
#endif
static void
cork_subprocess_yield(unsigned int *spin_count)
{
/* Adapted from
* http://www.1024cores.net/home/lock-free-algorithms/tricks/spinning */
if (*spin_count < 10) {
/* Spin-wait */
cork_pause();
} else if (*spin_count < 20) {
/* A more intense spin-wait */
int i;
for (i = 0; i < 50; i++) {
cork_pause();
}
} else if (*spin_count < 22) {
THREAD_YIELD();
} else if (*spin_count < 24) {
usleep(0);
} else if (*spin_count < 50) {
usleep(1);
} else if (*spin_count < 75) {
usleep((*spin_count - 49) * 1000);
} else {
usleep(25000);
}
(*spin_count)++;
}
static int
cork_subprocess_drain_(struct cork_subprocess *self, bool *progress)
{
rii_check(cork_read_pipe_read(&self->stdout_pipe, self->buf, progress));
rii_check(cork_read_pipe_read(&self->stderr_pipe, self->buf, progress));
if (self->pid > 0) {
return cork_subprocess_reap(self, WNOHANG, progress);
} else {
return 0;
}
}
bool
cork_subprocess_drain(struct cork_subprocess *self)
{
bool progress;
cork_subprocess_drain_(self, &progress);
return progress;
}
int
cork_subprocess_wait(struct cork_subprocess *self)
{
unsigned int spin_count = 0;
bool progress;
while (!cork_subprocess_is_finished(self)) {
progress = false;
rii_check(cork_subprocess_drain_(self, &progress));
if (!progress) {
cork_subprocess_yield(&spin_count);
}
}
return 0;
}
/*-----------------------------------------------------------------------
* Running subprocess groups
*/
static int
cork_subprocess_group_terminate(struct cork_subprocess_group *group)
{
size_t i;
for (i = 0; i < cork_array_size(&group->subprocesses); i++) {
struct cork_subprocess *sub = cork_array_at(&group->subprocesses, i);
rii_check(cork_subprocess_abort(sub));
}
return 0;
}
int
cork_subprocess_group_start(struct cork_subprocess_group *group)
{
size_t i;
DEBUG("Starting subprocess group\n");
/* Start each subprocess. */
for (i = 0; i < cork_array_size(&group->subprocesses); i++) {
struct cork_subprocess *sub = cork_array_at(&group->subprocesses, i);
ei_check(cork_subprocess_start(sub));
}
return 0;
error:
cork_subprocess_group_terminate(group);
return -1;
}
int
cork_subprocess_group_abort(struct cork_subprocess_group *group)
{
DEBUG("Aborting subprocess group\n");
return cork_subprocess_group_terminate(group);
}
bool
cork_subprocess_group_is_finished(struct cork_subprocess_group *group)
{
size_t i;
for (i = 0; i < cork_array_size(&group->subprocesses); i++) {
struct cork_subprocess *sub = cork_array_at(&group->subprocesses, i);
bool sub_finished = cork_subprocess_is_finished(sub);
if (!sub_finished) {
return false;
}
}
return true;
}
static int
cork_subprocess_group_drain_(struct cork_subprocess_group *group,
bool *progress)
{
size_t i;
for (i = 0; i < cork_array_size(&group->subprocesses); i++) {
struct cork_subprocess *sub = cork_array_at(&group->subprocesses, i);
rii_check(cork_subprocess_drain_(sub, progress));
}
return 0;
}
bool
cork_subprocess_group_drain(struct cork_subprocess_group *group)
{
bool progress = false;
cork_subprocess_group_drain_(group, &progress);
return progress;
}
int
cork_subprocess_group_wait(struct cork_subprocess_group *group)
{
unsigned int spin_count = 0;
bool progress;
DEBUG("Waiting for subprocess group to finish\n");
while (!cork_subprocess_group_is_finished(group)) {
progress = false;
rii_check(cork_subprocess_group_drain_(group, &progress));
if (!progress) {
cork_subprocess_yield(&spin_count);
}
}
return 0;
}