From b95dd49b46ca0374d0fcc4a2c10e8deb76655bfe Mon Sep 17 00:00:00 2001 From: clowwindy Date: Wed, 23 May 2012 15:30:41 +0800 Subject: [PATCH] a pipe, close is not implemented yet --- Makefile | 2 +- main.c | 284 +++++++++++++++++++++++++++++++++++++++++++++---------- main.h | 59 ++++++------ 3 files changed, 264 insertions(+), 81 deletions(-) diff --git a/Makefile b/Makefile index 5d3d244f..257ebeea 100644 --- a/Makefile +++ b/Makefile @@ -1,4 +1,4 @@ -main: *.o +main: main.o gcc -g -O2 main.o -lev -L/usr/local/lib -o main main.o: *.c *.h Makefile gcc -g -O2 -c -o main.o main.c diff --git a/main.c b/main.c index fb504dbc..c6ada511 100644 --- a/main.c +++ b/main.c @@ -28,11 +28,6 @@ // with the name ev_TYPE ev_io stdin_watcher; -struct server_ctx { - ev_io io; - int fd; - struct sockaddr sock; -}; struct client_ctx { ev_io io; @@ -87,93 +82,286 @@ int create_and_bind(char *port) { return listen_sock; } -static void -write_cb (EV_P_ ev_io *w, int revents) +void send_cb (EV_P_ ev_io *w, int revents) { struct client_ctx *client = (struct client_ctx *)w; ev_io_stop(EV_A_ &client->io); close(client->fd); free(client); } -static void -read_cb (EV_P_ ev_io *w, int revents) +void recv_cb (EV_P_ ev_io *w, int revents) { struct client_ctx *client = (struct client_ctx *)w; char buf[4096]; int n = recv(client->fd, buf, 4096, 0); if (n == 0) { ev_io_stop(EV_A_ &client->io); - close(client->fd); + close(client->fd); free(client); return; } else if (n < 0) { perror("recv"); return; } - //write(1, buf, n); - send(client->fd, REPLY, sizeof(REPLY), 0); + send(client->fd, REPLY, sizeof(REPLY), MSG_NOSIGNAL); ev_io_stop(EV_A_ &client->io); - ev_io_init(&client->io, write_cb, client->fd, EV_WRITE); + ev_io_init(&client->io, send_cb, client->fd, EV_WRITE); ev_io_start(EV_A_ &client->io); } -static struct client_ctx* client_new(int fd) { - struct client_ctx* client; +struct client_ctx* client_new(int fd) { + struct client_ctx* client; + + client = malloc(sizeof(struct client_ctx)); + client->fd = fd; + //client->server = server; + setnonblocking(client->fd); + ev_io_init(&client->io, recv_cb, client->fd, EV_READ); + return client; +} - client = malloc(sizeof(struct client_ctx)); - client->fd = fd; - //client->server = server; - setnonblocking(client->fd); - ev_io_init(&client->io, read_cb, client->fd, EV_READ); - return client; +static void server_recv_cb (EV_P_ ev_io *w, int revents) { + struct server_ctx *server_recv_ctx = (struct server_ctx *)w; + struct server *server = server_recv_ctx->server; + struct remote *remote = server->remote; + while (1) { + ssize_t r = recv(server->fd, server->buf, BUF_SIZE, 0); + if (r == 0) { + // TODO connection closed + return; + } else if(r == -1) { + perror("recv"); + if (errno == EAGAIN) { + // no data + // continue to wait for recv + break; + } + } + int w = send(remote->fd, server->buf, r, MSG_NOSIGNAL); + if (w == 0) { + // TODO connection closed + return; + } else if(w == -1) { + perror("send"); + if (errno == EAGAIN) { + // no data, wait for send + ev_io_stop(EV_A_ &server_recv_ctx->io); + ev_io_start(EV_A_ &remote->send_ctx->io); + break; + } + } else if(w < r) { + char *pt; + for (pt = server->buf; pt < pt + w; pt++) { + *pt = *(pt + w); + } + server->buf_len = w; + ev_io_stop(EV_A_ &server_recv_ctx->io); + ev_io_start(EV_A_ &remote->send_ctx->io); + break; + } + } } -// all watcher callbacks have a similar signature -// this callback is called when data is readable on stdin - static void -server_cb (EV_P_ ev_io *w, int revents) +static void server_send_cb (EV_P_ ev_io *w, int revents) { + struct server_ctx *server_send_ctx = (struct server_ctx *)w; + struct server *server = server_send_ctx->server; + struct remote *remote = server->remote; + if (remote->buf_len == 0) { + // TODO close and free + } else { + // has data to send + ssize_t r = send(server->fd, remote->buf, + remote->buf_len, 0); + if (r < 0) { + perror("send"); + // TODO close and free + return; + } + if (r < remote->buf_len) { + // partly sent, move memory, wait for the next time to send + char *pt; + for (pt = remote->buf; pt < pt + r; pt++) { + *pt = *(pt + r); + } + remote->buf_len = r; + return; + } else { + // all sent out, wait for reading + ev_io_stop(EV_A_ &server_send_ctx->io); + ev_io_start(EV_A_ &remote->recv_ctx->io); + } + } + +} +static void remote_recv_cb (EV_P_ ev_io *w, int revents) { + struct remote_ctx *remote_recv_ctx = (struct remote_ctx *)w; + struct remote *remote = remote_recv_ctx->remote; + struct server *server = remote->server; + while (1) { + ssize_t r = recv(remote->fd, remote->buf, BUF_SIZE, 0); + if (r == 0) { + // TODO connection closed + return; + } else if(r == -1) { + perror("recv"); + if (errno == EAGAIN) { + // no data + // continue to wait for recv + break; + } + } + int w = send(server->fd, remote->buf, r, MSG_NOSIGNAL); + if (w == 0) { + // TODO connection closed + return; + } else if(w == -1) { + perror("send"); + if (errno == EAGAIN) { + // no data, wait for send + ev_io_stop(EV_A_ &remote_recv_ctx->io); + ev_io_start(EV_A_ &server->send_ctx->io); + break; + } + } else if(w < r) { + char *pt; + for (pt = remote->buf; pt < pt + w; pt++) { + *pt = *(pt + w); + } + remote->buf_len = w; + ev_io_stop(EV_A_ &remote_recv_ctx->io); + ev_io_start(EV_A_ &server->send_ctx->io); + break; + } + } +} +static void remote_send_cb (EV_P_ ev_io *w, int revents) { + struct remote_ctx *remote_send_ctx = (struct remote_ctx *)w; + struct remote *remote = remote_send_ctx->remote; + struct server *server = remote->server; + if (!remote_send_ctx->connected) { + socklen_t len; + struct sockaddr_storage addr; + char ipstr[INET6_ADDRSTRLEN]; + int port; + len = sizeof addr; + int r = getpeername(remote->fd, (struct sockaddr*)&addr, &len); + if (r == 0) { + remote_send_ctx->connected = 1; + ev_io_stop(EV_A_ &remote_send_ctx->io); + ev_io_start(EV_A_ &server->recv_ctx->io); + ev_io_start(EV_A_ &remote->recv_ctx->io); + } else { + perror("getpeername"); + // not connected + // TODO + return; + } + } else { + if (server->buf_len == 0) { + // TODO close and free + } else { + // has data to send + ssize_t r = send(remote->fd, server->buf, + server->buf_len, 0); + if (r < 0) { + perror("send"); + // TODO close and free + return; + } + if (r < server->buf_len) { + // partly sent, move memory, wait for the next time to send + char *pt; + for (pt = server->buf; pt < pt + r; pt++) { + *pt = *(pt + r); + } + server->buf_len = r; + return; + } else { + // all sent out, wait for reading + ev_io_stop(EV_A_ &remote_send_ctx->io); + ev_io_start(EV_A_ &server->recv_ctx->io); + } + } + } +} + +struct remote* new_remote(int fd) { + struct remote *remote; + remote = malloc(sizeof(struct remote)); + remote->fd = fd; + remote->recv_ctx = malloc(sizeof(struct remote_ctx)); + remote->send_ctx = malloc(sizeof(struct remote_ctx)); + ev_io_init(&remote->recv_ctx->io, remote_recv_cb, fd, EV_READ); + ev_io_init(&remote->send_ctx->io, remote_send_cb, fd, EV_WRITE); + remote->recv_ctx->remote = remote; + remote->recv_ctx->connected = 0; + remote->send_ctx->remote = remote; + remote->send_ctx->connected = 0; + return remote; +} +struct server* new_server(int fd) { + struct server *server; + server = malloc(sizeof(struct server)); + server->fd = fd; + server->recv_ctx = malloc(sizeof(struct server_ctx)); + server->send_ctx = malloc(sizeof(struct server_ctx)); + ev_io_init(&server->recv_ctx->io, server_recv_cb, fd, EV_READ); + ev_io_init(&server->send_ctx->io, server_send_cb, fd, EV_WRITE); + server->recv_ctx->server = server; + server->recv_ctx->connected = 0; + server->send_ctx->server = server; + server->send_ctx->connected = 0; + return server; +} + +static void accept_cb (EV_P_ ev_io *w, int revents) { -// puts ("clients connected"); - struct server_ctx *server = (struct server_ctx *)w; - int connectfd; + struct listen_ctx *listener = (struct listen_ctx *)w; + int serverfd; while (1) { - connectfd = accept(server->fd, NULL, NULL); - if (connectfd == -1) { + serverfd = accept(listener->fd, NULL, NULL); + if (serverfd == -1) { perror("accept"); break; } - struct client_ctx *client = client_new(connectfd); - ev_io_start(EV_A_ &client->io); + struct server *server = new_server(serverfd); + struct addrinfo hints, *res; + int sockfd; + memset(&hints, 0, sizeof hints); + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; + getaddrinfo("www.sina.com.cn", "80", &hints, &res); + sockfd = socket(res->ai_family, res->ai_socktype, res->ai_protocol); + if (sockfd < 0) { + perror("socket"); + exit(1); // TODO close, free and return + } + setnonblocking(sockfd); + struct remote *remote = new_remote(sockfd); + server->remote = remote; + remote->server = server; + connect(sockfd, res->ai_addr, res->ai_addrlen); + // listen to remote connected event + ev_io_start(EV_A_ &remote->send_ctx->io); break; } - } - int -main (void) + +int main (void) { - int listenfd, connectfd; + int listenfd; listenfd = create_and_bind("1090"); if (listen(listenfd, SOMAXCONN) == -1) { perror("listen() error."); return 1; } setnonblocking(listenfd); - - struct server_ctx listen_ctx; + struct listen_ctx listen_ctx; listen_ctx.fd = listenfd; - - // use the default event loop unless you have special needs struct ev_loop *loop = EV_DEFAULT; - - // initialise an io watcher, then start it - // this one will watch for stdin to become readable - ev_io_init (&listen_ctx.io, server_cb, listenfd, EV_READ); + ev_io_init (&listen_ctx.io, accept_cb, listenfd, EV_READ); ev_io_start (loop, &listen_ctx.io); - - // now wait for events to arrive ev_run (loop, 0); - - // break was called, so exit return 0; } diff --git a/main.h b/main.h index a0d36c3c..baff1439 100644 --- a/main.h +++ b/main.h @@ -4,48 +4,43 @@ #define BUF_SIZE 4096 +struct listen_ctx { + ev_io io; + int fd; + struct sockaddr sock; +}; + struct server { - int server_fd; - char server_buf[BUF_SIZE]; - int server_buf_len; - struct server_read_ctx *server_read_ctx; - struct server_write_ctx *server_write_ctx; + int fd; + char buf[BUF_SIZE]; // server recv into, remote send from + int buf_len; + struct server_ctx *recv_ctx; + struct server_ctx *send_ctx; struct remote *remote; }; -struct server_read_ctx { - ev_io server_read_io; - struct server *server; -}; -struct server_write_ctx { - ev_io server_write_io; +struct server_ctx { + ev_io io; + int connected; struct server *server; }; struct remote { - int remote_fd; - char remote_buf[BUF_SIZE]; - int remote_buf_len; - struct remote_read_ctx *remote_read_ctx; - struct remote_write_ctx *remote_write_ctx; + int fd; + char buf[BUF_SIZE]; // remote recv into, server send from + int buf_len; + struct remote_ctx *recv_ctx; + struct remote_ctx *send_ctx; struct server *server; }; -struct remote_read_ctx { - ev_io remote_read_io; - struct remote *remote; -}; -struct remote_write_ctx { - ev_io remote_write_io; +struct remote_ctx { + ev_io io; + int connected; struct remote *remote; }; -static void -accept_cb (EV_P_ ev_io *w, int revents); -static void -server_read_cb (EV_P_ ev_io *w, int revents); -static void -server_write_cb (EV_P_ ev_io *w, int revents); -static void -remote_read_cb (EV_P_ ev_io *w, int revents); -static void -remote_write_cb (EV_P_ ev_io *w, int revents); +static void accept_cb (EV_P_ ev_io *w, int revents); +static void server_recv_cb (EV_P_ ev_io *w, int revents); +static void server_send_cb (EV_P_ ev_io *w, int revents); +static void remote_recv_cb (EV_P_ ev_io *w, int revents); +static void remote_send_cb (EV_P_ ev_io *w, int revents);