From cddde07e3a3514d3ee08572f8227832bb6b70c70 Mon Sep 17 00:00:00 2001 From: manojampalam Date: Tue, 1 Mar 2016 21:43:32 -0800 Subject: [PATCH] 3-1 C3 --- .../win32posix/UnitTests/socket_tests.c | 360 +++++++----------- .../win32posix/win32posix/signal.c | 6 +- .../win32posix/win32posix/socketio.c | 15 +- .../win32posix/win32posix/w32fd.c | 75 ++-- 4 files changed, 190 insertions(+), 266 deletions(-) diff --git a/contrib/win32/w32-posix-prototype/win32posix/UnitTests/socket_tests.c b/contrib/win32/w32-posix-prototype/win32posix/UnitTests/socket_tests.c index 46020ac..f29bbc2 100644 --- a/contrib/win32/w32-posix-prototype/win32posix/UnitTests/socket_tests.c +++ b/contrib/win32/w32-posix-prototype/win32posix/UnitTests/socket_tests.c @@ -9,7 +9,7 @@ #define SMALL_RECV_BUF_SIZE 128 int listen_fd, accept_fd, connect_fd, ret; -struct addrinfo hints,*servinfo; +struct addrinfo hints, *servinfo; fd_set read_set, write_set, except_set; struct timeval time_val; struct sockaddr_storage their_addr; @@ -56,9 +56,9 @@ set_nonblock(int fd) void socket_fd_tests() { - fd_set set,*pset; + fd_set set, *pset; pset = &set; - + TEST_START("fd_set initial state"); FD_ZERO(pset); ASSERT_CHAR_EQ(0, FD_ISSET(0, pset)); @@ -73,7 +73,7 @@ void socket_fd_tests() ASSERT_CHAR_EQ(1, FD_ISSET(1, pset)); ASSERT_CHAR_EQ(0, FD_ISSET(2, pset)); TEST_DONE(); - + TEST_START("FD_CLR"); FD_CLR(0, pset); ASSERT_CHAR_EQ(0, FD_ISSET(0, pset)); @@ -109,7 +109,7 @@ void socket_fd_tests() ASSERT_INT_EQ(errno, EBADF); ASSERT_INT_EQ(recv(9, NULL, 0, 0), -1); ASSERT_INT_EQ(errno, EBADF); - ASSERT_INT_EQ(send(10, NULL, 0,0), -1); + ASSERT_INT_EQ(send(10, NULL, 0, 0), -1); ASSERT_INT_EQ(errno, EBADF); ASSERT_INT_EQ(shutdown(11, 0), -1); ASSERT_INT_EQ(errno, EBADF); @@ -121,7 +121,7 @@ void socket_fd_tests() ASSERT_INT_EQ(errno, EBADF); ASSERT_INT_EQ(isatty(12), -1); ASSERT_INT_EQ(errno, EBADF); - ASSERT_INT_EQ(fdopen(13,NULL), -1); + ASSERT_INT_EQ(fdopen(13, NULL), -1); ASSERT_INT_EQ(errno, EBADF); ASSERT_INT_EQ(close(14), -1); ASSERT_INT_EQ(errno, EBADF); @@ -393,7 +393,7 @@ void socket_select_tests() { ret = close(listen_fd); ASSERT_INT_EQ(ret, 0); TEST_DONE(); - + TEST_START("select send and recv"); s = accept_fd; r = connect_fd; @@ -410,97 +410,6 @@ void socket_select_tests() { FD_ZERO(&write_set); FD_SET(s, &write_set); FD_SET(r, &read_set); - while (-1 != select(max(r,s)+1, &read_set ,&write_set, NULL, &time_val)) { - if (FD_ISSET(s, &write_set)) { - while ((bytes_sent < num_bytes) && ((ret = send(s, send_buf + bytes_sent, num_bytes - bytes_sent, 0)) > 0)) - bytes_sent += ret; - if (bytes_sent < num_bytes) { - ASSERT_INT_EQ(ret, -1); - ASSERT_INT_EQ(errno, EAGAIN); - eagain_results++; - } - } - - if (FD_ISSET(r, &read_set)) { - while ((ret = recv(r, recv_buf + bytes_received, num_bytes - bytes_received + 1, 0)) > 0) - bytes_received += ret; - if (ret == 0) - break; - ASSERT_INT_EQ(ret, -1); - ASSERT_INT_EQ(errno, EAGAIN); - eagain_results++; - } - - if (bytes_sent < num_bytes) - FD_SET(s, &write_set); - else { - FD_CLR(s, &write_set); - ret = shutdown(s, SD_SEND); - ASSERT_INT_EQ(ret, 0); - } - FD_SET(r, &read_set); - } - - /*ensure that we hit send and recv paths that returned EAGAIN. Else it would not have touched the async paths*/ - /*if this assert is being hit, then num_bytes is too small. up it*/ - ASSERT_INT_GT(eagain_results, 0); - ASSERT_INT_EQ(bytes_sent, bytes_received); - ASSERT_INT_EQ(memcmp(send_buf, recv_buf, num_bytes), 0); - ret = close(connect_fd); - ASSERT_INT_EQ(ret, 0); - ret = close(accept_fd); - ASSERT_INT_EQ(ret, 0); - TEST_DONE(); - - freeaddrinfo(servinfo); -} - -void socket_typical_ssh_payload_tests() { - int s, r, i; - int max_bytes = 1024 * 700; //700KB - int max_packetsize = 1024 * 5; - int packets_sent = 0; - int packets_received = 0; - int send_packet_remaining = 0, recv_packet_remaining = 0; - int eagain_results = 0; - - TEST_START("connection setup"); - memset(&hints, 0, sizeof(hints)); - hints.ai_socktype = SOCK_STREAM; - ret = getaddrinfo("127.0.0.1", PORT, &hints, &servinfo); - ASSERT_INT_EQ(ret, 0); - listen_fd = socket(servinfo->ai_family, servinfo->ai_socktype, servinfo->ai_protocol); - ASSERT_INT_NE(listen_fd, -1); - ret = bind(listen_fd, servinfo->ai_addr, servinfo->ai_addrlen); - ASSERT_INT_EQ(ret, 0); - ret = listen(listen_fd, BACKLOG); - ASSERT_INT_EQ(ret, 0); - connect_fd = socket(servinfo->ai_family, servinfo->ai_socktype, servinfo->ai_protocol); - ASSERT_INT_NE(connect_fd, -1); - ret = connect(connect_fd, servinfo->ai_addr, servinfo->ai_addrlen); - ASSERT_INT_EQ(ret, 0); - accept_fd = accept(listen_fd, (struct sockaddr*)&their_addr, sizeof(their_addr)); - ASSERT_INT_NE(accept_fd, -1); - ret = close(listen_fd); - ASSERT_INT_EQ(ret, 0); - TEST_DONE(); - - TEST_START("select send and recv packets"); - r = accept_fd; - s = connect_fd; - ret = set_nonblock(s); - ASSERT_INT_EQ(ret, 0); - ret = set_nonblock(r); - ASSERT_INT_EQ(ret, 0); - send_buf = malloc(num_bytes); - recv_buf = malloc(num_bytes + 1); - ASSERT_PTR_NE(send_buf, NULL); - ASSERT_PTR_NE(recv_buf, NULL); - prep_input_buffer(send_buf, num_bytes, 17); - FD_ZERO(&read_set); - FD_ZERO(&write_set); - FD_SET(s, &write_set); - FD_SET(r, &read_set); while (-1 != select(max(r, s) + 1, &read_set, &write_set, NULL, &time_val)) { if (FD_ISSET(s, &write_set)) { while ((bytes_sent < num_bytes) && ((ret = send(s, send_buf + bytes_sent, num_bytes - bytes_sent, 0)) > 0)) @@ -546,6 +455,130 @@ void socket_typical_ssh_payload_tests() { freeaddrinfo(servinfo); } +void socket_typical_ssh_payload_tests() { + int s, r, i; + int max_bytes = 1024 * 700; //700KB + int max_packetsize = 1024 * 5, bytes_sent = 0; + int packets_sent = 0; + int packets_received = 0; + int send_packet_remaining = 0, recv_packet_remaining = 0; + int eagain_results = 0; + + TEST_START("connection setup"); + memset(&hints, 0, sizeof(hints)); + hints.ai_socktype = SOCK_STREAM; + ret = getaddrinfo("127.0.0.1", PORT, &hints, &servinfo); + ASSERT_INT_EQ(ret, 0); + listen_fd = socket(servinfo->ai_family, servinfo->ai_socktype, servinfo->ai_protocol); + ASSERT_INT_NE(listen_fd, -1); + ret = bind(listen_fd, servinfo->ai_addr, servinfo->ai_addrlen); + ASSERT_INT_EQ(ret, 0); + ret = listen(listen_fd, BACKLOG); + ASSERT_INT_EQ(ret, 0); + connect_fd = socket(servinfo->ai_family, servinfo->ai_socktype, servinfo->ai_protocol); + ASSERT_INT_NE(connect_fd, -1); + ret = connect(connect_fd, servinfo->ai_addr, servinfo->ai_addrlen); + ASSERT_INT_EQ(ret, 0); + accept_fd = accept(listen_fd, (struct sockaddr*)&their_addr, sizeof(their_addr)); + ASSERT_INT_NE(accept_fd, -1); + ret = close(listen_fd); + ASSERT_INT_EQ(ret, 0); + TEST_DONE(); + + TEST_START("select send and recv packets"); + r = accept_fd; + s = connect_fd; + ret = set_nonblock(s); + ASSERT_INT_EQ(ret, 0); + ret = set_nonblock(r); + ASSERT_INT_EQ(ret, 0); + send_buf = malloc(max_bytes); + recv_buf = malloc(max_bytes + 1); + ASSERT_PTR_NE(send_buf, NULL); + ASSERT_PTR_NE(recv_buf, NULL); + FD_ZERO(&read_set); + FD_ZERO(&write_set); + FD_SET(s, &write_set); + FD_SET(r, &read_set); + + int total = 0; + while (-1 != select(max(r, s) + 1, &read_set, &write_set, NULL, &time_val)) { + if (FD_ISSET(s, &write_set)) { + while ((send_packet_remaining) && ((ret = send(s, send_buf, send_packet_remaining, 0)) > 0)) { + send_packet_remaining -= ret; + bytes_sent += ret; + } + + if (send_packet_remaining) { + ASSERT_INT_EQ(ret, -1); + ASSERT_INT_EQ(errno, EAGAIN); + } + else if (bytes_sent < max_bytes) { + send_packet_remaining = ((double)rand() / (RAND_MAX + 1)) * (max_packetsize - 100) + 100; + ret = send(s, &send_packet_remaining, 4, 0); + if (ret == -1) { + send_packet_remaining = 0; //we'll try again when io is ready + } + else if (ret < 4) + /*unfortunate - sent half the header, we'll bail the test out*/ + ASSERT_INT_EQ(1, 0); + else { + ASSERT_INT_EQ(ret, 4); + packets_sent++; + //printf("sending packet of size %d\n", send_packet_remaining); + } + } + } + + if (FD_ISSET(r, &read_set)) { + while (recv_packet_remaining && ((ret = recv(r, recv_buf, recv_packet_remaining, 0)) > 0)) { + recv_packet_remaining -= ret; + } + + if (recv_packet_remaining) { + ASSERT_INT_EQ(ret, -1); + ASSERT_INT_EQ(errno, EAGAIN); + } + else { + ret = recv(r, &recv_packet_remaining, 4, 0); + if (ret == -1) { + ASSERT_INT_EQ(ret, -1); + ASSERT_INT_EQ(errno, EAGAIN); + } + else if (ret == 0) + break; + else if (ret < 4) + /*unfortunate.. read partial header, bail out*/ + ASSERT_INT_EQ(1, 0); + else { + ASSERT_INT_EQ(ret, 4); + packets_received++; + //printf("recevied packet of size %d\n", recv_packet_remaining); + } + } + } + + if ((bytes_sent >= max_bytes) && (send_packet_remaining == 0)) { + FD_CLR(s, &write_set); + ret = shutdown(s, SD_SEND); + ASSERT_INT_EQ(ret, 0); + } + else + FD_SET(s, &write_set); + + FD_SET(r, &read_set); + } + + ASSERT_INT_EQ(packets_sent, packets_received); + ret = close(connect_fd); + ASSERT_INT_EQ(ret, 0); + ret = close(accept_fd); + ASSERT_INT_EQ(ret, 0); + TEST_DONE(); + + freeaddrinfo(servinfo); +} + void socket_tests() { @@ -554,136 +587,7 @@ void socket_tests() socket_blocking_io_tests(); socket_nonblocking_io_tests(); socket_select_tests(); - w32posix_done(); -} - -int socket_prepare(char* ip) -{ - struct addrinfo hints; - memset(&hints, 0, sizeof(hints)); - hints.ai_socktype = SOCK_STREAM; - if (getaddrinfo(ip, PORT, &hints, &servinfo) == -1) - return -1; - - listen_fd = socket(servinfo->ai_family, servinfo->ai_socktype, servinfo->ai_protocol); - connect_fd = socket(servinfo->ai_family, servinfo->ai_socktype, servinfo->ai_protocol); - if ((listen_fd == -1) || (connect_fd == -1)) - return -1; - - if (-1 == bind(listen_fd, servinfo->ai_addr, servinfo->ai_addrlen)) - return -1; - - if (-1 == listen(listen_fd, BACKLOG)) - return -1; - - return 0; -} -#define READ_BUf_SIZE 1024 * 100 -#define WRITE_BUF_SIZE 1024 * 100 -void sample() -{ - w32posix_initialize(); - listen_fd = -1; - accept_fd = -1; - connect_fd = -1; - servinfo = NULL; - - int ret; - - ret = socket_prepare("127.0.0.1"); - //Assert::AreEqual(ret, 0); - - ret = connect(connect_fd, servinfo->ai_addr, servinfo->ai_addrlen); - - accept_fd = accept(listen_fd, NULL, NULL); - //Assert::AreNotEqual(accept_fd, -1, L"", LINE_INFO()); - - //close(listen_fd); - //listen_fd = -1; - - int c = connect_fd; - int s = accept_fd; - - set_nonblock(c); - set_nonblock(s); - - char *to_write = (char*)malloc(WRITE_BUF_SIZE); - - char *read_to = (char*)malloc(READ_BUf_SIZE); - - //write from c, read from s - fd_set read_set; - fd_set write_set; - FD_ZERO(&read_set); - FD_ZERO(&write_set); - FD_SET(s, &read_set); - FD_SET(c, &write_set); - int max_fd = max(c, s) + 1; - struct timeval time; - time.tv_sec = 60 * 60; - time.tv_usec = 0; - long long bytes_written = 0; - long long bytes_read = 0; - - while (-1 != select(max_fd, &read_set, &write_set, NULL, &time)) - { - BOOL read_ready = FD_ISSET(s, &read_set); - BOOL write_ready = FD_ISSET(c, &write_set); - FD_ZERO(&read_set); - FD_ZERO(&write_set); - FD_SET(s, &read_set); - - if (write_ready) - { - -#define WR_LIMIT WRITE_BUF_SIZE*5 - - int bw = 0;// send(c, to_write, WRITE_BUF_SIZE, 0); - while ((bw != -1) && (bytes_written < WR_LIMIT)) { - - bw = send(c, to_write, WRITE_BUF_SIZE, 0); - if (bw > 0) - bytes_written += bw; - else { - ret = errno; - //Assert::AreEqual(errno, EAGAIN, L"", LINE_INFO()); - } - - } - - if (bytes_written >= WR_LIMIT) - { - ret = shutdown(c, SD_SEND); - //Assert::AreEqual(ret, 0, L"", LINE_INFO()); - } - else - FD_SET(c, &write_set); - } - - - - if (read_ready) - { - int br = read(s, read_to, READ_BUf_SIZE); - while (br > 1) { - bytes_read += br; - br = read(s, read_to, READ_BUf_SIZE); - } - - if (br == 0) //send from other side is done - break; - ret = errno; - //Assert::AreEqual(errno, EAGAIN, L"", LINE_INFO()); - } - - } - - //Assert::AreEqual((bytes_written == bytes_read) ? 1 : 0, TRUE, L"", LINE_INFO()); - - if (servinfo) freeaddrinfo(servinfo); - if (listen_fd != -1) close(listen_fd); - if (connect_fd != -1) close(connect_fd); - if (accept_fd != -1) close(accept_fd); + socket_typical_ssh_payload_tests(); w32posix_done(); } diff --git a/contrib/win32/w32-posix-prototype/win32posix/win32posix/signal.c b/contrib/win32/w32-posix-prototype/win32posix/win32posix/signal.c index b2669a7..25d8829 100644 --- a/contrib/win32/w32-posix-prototype/win32posix/win32posix/signal.c +++ b/contrib/win32/w32-posix-prototype/win32posix/win32posix/signal.c @@ -11,6 +11,7 @@ // - any APCs caused by IO completions // - time out (errno = ETIMEOUT) // Returns 0 on IO completion and -1 on rest +// if milli_seconds is 0, this function returns 0, its called with 0 to execute any scheduled APCs int wait_for_any_event(HANDLE* events, int num_events, DWORD milli_seconds) { //todo - implement signal catching and handling @@ -25,6 +26,8 @@ int wait_for_any_event(HANDLE* events, int num_events, DWORD milli_seconds) return 0; } else if (ret == WAIT_TIMEOUT) { + if (milli_seconds == 0) + return 0; errno = ETIMEDOUT; debug("ERROR: wait timed out"); return -1; @@ -42,7 +45,8 @@ int wait_for_any_event(HANDLE* events, int num_events, DWORD milli_seconds) return 0; } else if (ret == 0) { - //timed out + if (milli_seconds == 0) + return 0; errno = ETIMEDOUT; debug("ERROR: wait timed out"); return -1; diff --git a/contrib/win32/w32-posix-prototype/win32posix/win32posix/socketio.c b/contrib/win32/w32-posix-prototype/win32posix/win32posix/socketio.c index f16c3f8..6a50a6c 100644 --- a/contrib/win32/w32-posix-prototype/win32posix/win32posix/socketio.c +++ b/contrib/win32/w32-posix-prototype/win32posix/win32posix/socketio.c @@ -128,8 +128,8 @@ void CALLBACK WSARecvCompletionRoutine( ) { struct w32_io* pio = (struct w32_io*)((char*)lpOverlapped - offsetof(struct w32_io, read_overlapped)); - debug2("io:%p, pending_state:%d, remaining:%d, completed:%d, error:%d, transferred:%d", - pio, pio->read_details.pending, pio->read_details.remaining, pio->read_details.completed, dwError, cbTransferred); + debug2("io:%p, pending_state:%d, flags:%d, error:%d, received:%d", + pio, pio->read_details.pending, dwFlags, dwError, cbTransferred); if (!dwError && !cbTransferred) dwError = ERROR_GRACEFUL_DISCONNECT; pio->read_details.error = dwError; @@ -179,6 +179,7 @@ int socketio_WSARecv(struct w32_io* pio, BOOL* completed) { if (WSAGetLastError() == WSA_IO_PENDING) { //io is initiated and pending + debug2("WSARecv reported IO pending"); pio->read_details.pending = TRUE; } else { //failed @@ -382,7 +383,7 @@ void CALLBACK WSASendCompletionRoutine( ) { struct w32_io* pio = (struct w32_io*)((char*)lpOverlapped - offsetof(struct w32_io, write_overlapped)); - debug2("io:%p, pending_state:%d, error:%d, transferred:%d of remaining:%d", pio, pio->write_details.pending, dwError, cbTransferred, pio->write_details.remaining); + debug2("io:%p, pending_state:%d, error:%d, sent:%d of remaining:%d", pio, pio->write_details.pending, dwError, cbTransferred, pio->write_details.remaining); pio->write_details.error = dwError; //assert that remaining == cbTransferred pio->write_details.remaining -= cbTransferred; @@ -785,7 +786,13 @@ BOOL socketio_is_io_available(struct w32_io* pio, BOOL rd) { int socketio_on_select(struct w32_io* pio, BOOL rd) { - debug2("io:%p type:%d", pio, pio->type); + debug2("io:%p type:%d rd:%d", pio, pio->type, rd); + + //return if io is already available + if (socketio_is_io_available(pio, rd)) + return 0; + + //return if io is pending if (rd && pio->read_details.pending) return 0; diff --git a/contrib/win32/w32-posix-prototype/win32posix/win32posix/w32fd.c b/contrib/win32/w32-posix-prototype/win32posix/win32posix/w32fd.c index c46bfe6..5f9b04f 100644 --- a/contrib/win32/w32-posix-prototype/win32posix/win32posix/w32fd.c +++ b/contrib/win32/w32-posix-prototype/win32posix/win32posix/w32fd.c @@ -364,35 +364,18 @@ int w32_select(int fds, fd_set* readfds, fd_set* writefds, fd_set* exceptfds, co if (readfds) { for (i = 0; i < fds; i++) - if (FD_ISSET(i, readfds)) + if (FD_ISSET(i, readfds)) { CHECK_FD(i); + in_ready_fds++; + } } if (writefds) { for (i = 0; i < fds; i++) - if (FD_ISSET(i, writefds)) + if (FD_ISSET(i, writefds)) { CHECK_FD(i); - } - - - //see if any io is ready - for (i = 0; i < fds; i++) { - - if (readfds && FD_ISSET(i, readfds)) { - in_ready_fds++; - if (w32_io_is_io_available(fd_table.w32_ios[i], TRUE)) { - FD_SET(i, &read_ready_fds); - out_ready_fds++; + in_ready_fds; } - } - - if (writefds && FD_ISSET(i, writefds)) { - in_ready_fds++; - if (w32_io_is_io_available(fd_table.w32_ios[i], FALSE)) { - FD_SET(i, &write_ready_fds); - out_ready_fds++; - } - } } //if none of input fds are set return error @@ -402,18 +385,8 @@ int w32_select(int fds, fd_set* readfds, fd_set* writefds, fd_set* exceptfds, co return -1; } - //if io on some fds is already ready, return - if (out_ready_fds) - { - if (readfds) - *readfds = read_ready_fds; - if (writefds) - *writefds = write_ready_fds; - debug2("IO ready:%d, no wait", out_ready_fds); - return out_ready_fds; - } - //start async io on selected fds + //start async io on selected fds if needed and pick up any events that select needs to listen on for (int i = 0; i < fds; i++) { if (readfds && FD_ISSET(i, readfds)) { @@ -427,9 +400,45 @@ int w32_select(int fds, fd_set* readfds, fd_set* writefds, fd_set* exceptfds, co if (writefds && FD_ISSET(i, writefds)) { if (w32_io_on_select(fd_table.w32_ios[i], FALSE) == -1) return -1; + if (fd_table.w32_ios[i]->type == CONNECT_FD) { + events[num_events++] = fd_table.w32_ios[i]->write_overlapped.hEvent; + } } } + //excute any scheduled APCs + if (0 != wait_for_any_event(NULL, 0, 0)) { + return -1; + } + + //see if any io is ready + for (i = 0; i < fds; i++) { + + if (readfds && FD_ISSET(i, readfds)) { + if (w32_io_is_io_available(fd_table.w32_ios[i], TRUE)) { + FD_SET(i, &read_ready_fds); + out_ready_fds++; + } + } + + if (writefds && FD_ISSET(i, writefds)) { + if (w32_io_is_io_available(fd_table.w32_ios[i], FALSE)) { + FD_SET(i, &write_ready_fds); + out_ready_fds++; + } + } + } + + //if io on some fds is already ready, return + if (out_ready_fds) + { + if (readfds) + *readfds = read_ready_fds; + if (writefds) + *writefds = write_ready_fds; + debug2("IO ready:%d, no wait", out_ready_fds); + return out_ready_fds; + } do { ticks_now = GetTickCount64();