This commit is contained in:
manojampalam 2016-03-01 21:43:32 -08:00
parent 8b98efe6e5
commit cddde07e3a
4 changed files with 190 additions and 266 deletions

View File

@ -9,7 +9,7 @@
#define SMALL_RECV_BUF_SIZE 128
int listen_fd, accept_fd, connect_fd, ret;
struct addrinfo hints,*servinfo;
struct addrinfo hints, *servinfo;
fd_set read_set, write_set, except_set;
struct timeval time_val;
struct sockaddr_storage their_addr;
@ -56,9 +56,9 @@ set_nonblock(int fd)
void socket_fd_tests()
{
fd_set set,*pset;
fd_set set, *pset;
pset = &set;
TEST_START("fd_set initial state");
FD_ZERO(pset);
ASSERT_CHAR_EQ(0, FD_ISSET(0, pset));
@ -73,7 +73,7 @@ void socket_fd_tests()
ASSERT_CHAR_EQ(1, FD_ISSET(1, pset));
ASSERT_CHAR_EQ(0, FD_ISSET(2, pset));
TEST_DONE();
TEST_START("FD_CLR");
FD_CLR(0, pset);
ASSERT_CHAR_EQ(0, FD_ISSET(0, pset));
@ -109,7 +109,7 @@ void socket_fd_tests()
ASSERT_INT_EQ(errno, EBADF);
ASSERT_INT_EQ(recv(9, NULL, 0, 0), -1);
ASSERT_INT_EQ(errno, EBADF);
ASSERT_INT_EQ(send(10, NULL, 0,0), -1);
ASSERT_INT_EQ(send(10, NULL, 0, 0), -1);
ASSERT_INT_EQ(errno, EBADF);
ASSERT_INT_EQ(shutdown(11, 0), -1);
ASSERT_INT_EQ(errno, EBADF);
@ -121,7 +121,7 @@ void socket_fd_tests()
ASSERT_INT_EQ(errno, EBADF);
ASSERT_INT_EQ(isatty(12), -1);
ASSERT_INT_EQ(errno, EBADF);
ASSERT_INT_EQ(fdopen(13,NULL), -1);
ASSERT_INT_EQ(fdopen(13, NULL), -1);
ASSERT_INT_EQ(errno, EBADF);
ASSERT_INT_EQ(close(14), -1);
ASSERT_INT_EQ(errno, EBADF);
@ -393,7 +393,7 @@ void socket_select_tests() {
ret = close(listen_fd);
ASSERT_INT_EQ(ret, 0);
TEST_DONE();
TEST_START("select send and recv");
s = accept_fd;
r = connect_fd;
@ -410,97 +410,6 @@ void socket_select_tests() {
FD_ZERO(&write_set);
FD_SET(s, &write_set);
FD_SET(r, &read_set);
while (-1 != select(max(r,s)+1, &read_set ,&write_set, NULL, &time_val)) {
if (FD_ISSET(s, &write_set)) {
while ((bytes_sent < num_bytes) && ((ret = send(s, send_buf + bytes_sent, num_bytes - bytes_sent, 0)) > 0))
bytes_sent += ret;
if (bytes_sent < num_bytes) {
ASSERT_INT_EQ(ret, -1);
ASSERT_INT_EQ(errno, EAGAIN);
eagain_results++;
}
}
if (FD_ISSET(r, &read_set)) {
while ((ret = recv(r, recv_buf + bytes_received, num_bytes - bytes_received + 1, 0)) > 0)
bytes_received += ret;
if (ret == 0)
break;
ASSERT_INT_EQ(ret, -1);
ASSERT_INT_EQ(errno, EAGAIN);
eagain_results++;
}
if (bytes_sent < num_bytes)
FD_SET(s, &write_set);
else {
FD_CLR(s, &write_set);
ret = shutdown(s, SD_SEND);
ASSERT_INT_EQ(ret, 0);
}
FD_SET(r, &read_set);
}
/*ensure that we hit send and recv paths that returned EAGAIN. Else it would not have touched the async paths*/
/*if this assert is being hit, then num_bytes is too small. up it*/
ASSERT_INT_GT(eagain_results, 0);
ASSERT_INT_EQ(bytes_sent, bytes_received);
ASSERT_INT_EQ(memcmp(send_buf, recv_buf, num_bytes), 0);
ret = close(connect_fd);
ASSERT_INT_EQ(ret, 0);
ret = close(accept_fd);
ASSERT_INT_EQ(ret, 0);
TEST_DONE();
freeaddrinfo(servinfo);
}
void socket_typical_ssh_payload_tests() {
int s, r, i;
int max_bytes = 1024 * 700; //700KB
int max_packetsize = 1024 * 5;
int packets_sent = 0;
int packets_received = 0;
int send_packet_remaining = 0, recv_packet_remaining = 0;
int eagain_results = 0;
TEST_START("connection setup");
memset(&hints, 0, sizeof(hints));
hints.ai_socktype = SOCK_STREAM;
ret = getaddrinfo("127.0.0.1", PORT, &hints, &servinfo);
ASSERT_INT_EQ(ret, 0);
listen_fd = socket(servinfo->ai_family, servinfo->ai_socktype, servinfo->ai_protocol);
ASSERT_INT_NE(listen_fd, -1);
ret = bind(listen_fd, servinfo->ai_addr, servinfo->ai_addrlen);
ASSERT_INT_EQ(ret, 0);
ret = listen(listen_fd, BACKLOG);
ASSERT_INT_EQ(ret, 0);
connect_fd = socket(servinfo->ai_family, servinfo->ai_socktype, servinfo->ai_protocol);
ASSERT_INT_NE(connect_fd, -1);
ret = connect(connect_fd, servinfo->ai_addr, servinfo->ai_addrlen);
ASSERT_INT_EQ(ret, 0);
accept_fd = accept(listen_fd, (struct sockaddr*)&their_addr, sizeof(their_addr));
ASSERT_INT_NE(accept_fd, -1);
ret = close(listen_fd);
ASSERT_INT_EQ(ret, 0);
TEST_DONE();
TEST_START("select send and recv packets");
r = accept_fd;
s = connect_fd;
ret = set_nonblock(s);
ASSERT_INT_EQ(ret, 0);
ret = set_nonblock(r);
ASSERT_INT_EQ(ret, 0);
send_buf = malloc(num_bytes);
recv_buf = malloc(num_bytes + 1);
ASSERT_PTR_NE(send_buf, NULL);
ASSERT_PTR_NE(recv_buf, NULL);
prep_input_buffer(send_buf, num_bytes, 17);
FD_ZERO(&read_set);
FD_ZERO(&write_set);
FD_SET(s, &write_set);
FD_SET(r, &read_set);
while (-1 != select(max(r, s) + 1, &read_set, &write_set, NULL, &time_val)) {
if (FD_ISSET(s, &write_set)) {
while ((bytes_sent < num_bytes) && ((ret = send(s, send_buf + bytes_sent, num_bytes - bytes_sent, 0)) > 0))
@ -546,6 +455,130 @@ void socket_typical_ssh_payload_tests() {
freeaddrinfo(servinfo);
}
void socket_typical_ssh_payload_tests() {
int s, r, i;
int max_bytes = 1024 * 700; //700KB
int max_packetsize = 1024 * 5, bytes_sent = 0;
int packets_sent = 0;
int packets_received = 0;
int send_packet_remaining = 0, recv_packet_remaining = 0;
int eagain_results = 0;
TEST_START("connection setup");
memset(&hints, 0, sizeof(hints));
hints.ai_socktype = SOCK_STREAM;
ret = getaddrinfo("127.0.0.1", PORT, &hints, &servinfo);
ASSERT_INT_EQ(ret, 0);
listen_fd = socket(servinfo->ai_family, servinfo->ai_socktype, servinfo->ai_protocol);
ASSERT_INT_NE(listen_fd, -1);
ret = bind(listen_fd, servinfo->ai_addr, servinfo->ai_addrlen);
ASSERT_INT_EQ(ret, 0);
ret = listen(listen_fd, BACKLOG);
ASSERT_INT_EQ(ret, 0);
connect_fd = socket(servinfo->ai_family, servinfo->ai_socktype, servinfo->ai_protocol);
ASSERT_INT_NE(connect_fd, -1);
ret = connect(connect_fd, servinfo->ai_addr, servinfo->ai_addrlen);
ASSERT_INT_EQ(ret, 0);
accept_fd = accept(listen_fd, (struct sockaddr*)&their_addr, sizeof(their_addr));
ASSERT_INT_NE(accept_fd, -1);
ret = close(listen_fd);
ASSERT_INT_EQ(ret, 0);
TEST_DONE();
TEST_START("select send and recv packets");
r = accept_fd;
s = connect_fd;
ret = set_nonblock(s);
ASSERT_INT_EQ(ret, 0);
ret = set_nonblock(r);
ASSERT_INT_EQ(ret, 0);
send_buf = malloc(max_bytes);
recv_buf = malloc(max_bytes + 1);
ASSERT_PTR_NE(send_buf, NULL);
ASSERT_PTR_NE(recv_buf, NULL);
FD_ZERO(&read_set);
FD_ZERO(&write_set);
FD_SET(s, &write_set);
FD_SET(r, &read_set);
int total = 0;
while (-1 != select(max(r, s) + 1, &read_set, &write_set, NULL, &time_val)) {
if (FD_ISSET(s, &write_set)) {
while ((send_packet_remaining) && ((ret = send(s, send_buf, send_packet_remaining, 0)) > 0)) {
send_packet_remaining -= ret;
bytes_sent += ret;
}
if (send_packet_remaining) {
ASSERT_INT_EQ(ret, -1);
ASSERT_INT_EQ(errno, EAGAIN);
}
else if (bytes_sent < max_bytes) {
send_packet_remaining = ((double)rand() / (RAND_MAX + 1)) * (max_packetsize - 100) + 100;
ret = send(s, &send_packet_remaining, 4, 0);
if (ret == -1) {
send_packet_remaining = 0; //we'll try again when io is ready
}
else if (ret < 4)
/*unfortunate - sent half the header, we'll bail the test out*/
ASSERT_INT_EQ(1, 0);
else {
ASSERT_INT_EQ(ret, 4);
packets_sent++;
//printf("sending packet of size %d\n", send_packet_remaining);
}
}
}
if (FD_ISSET(r, &read_set)) {
while (recv_packet_remaining && ((ret = recv(r, recv_buf, recv_packet_remaining, 0)) > 0)) {
recv_packet_remaining -= ret;
}
if (recv_packet_remaining) {
ASSERT_INT_EQ(ret, -1);
ASSERT_INT_EQ(errno, EAGAIN);
}
else {
ret = recv(r, &recv_packet_remaining, 4, 0);
if (ret == -1) {
ASSERT_INT_EQ(ret, -1);
ASSERT_INT_EQ(errno, EAGAIN);
}
else if (ret == 0)
break;
else if (ret < 4)
/*unfortunate.. read partial header, bail out*/
ASSERT_INT_EQ(1, 0);
else {
ASSERT_INT_EQ(ret, 4);
packets_received++;
//printf("recevied packet of size %d\n", recv_packet_remaining);
}
}
}
if ((bytes_sent >= max_bytes) && (send_packet_remaining == 0)) {
FD_CLR(s, &write_set);
ret = shutdown(s, SD_SEND);
ASSERT_INT_EQ(ret, 0);
}
else
FD_SET(s, &write_set);
FD_SET(r, &read_set);
}
ASSERT_INT_EQ(packets_sent, packets_received);
ret = close(connect_fd);
ASSERT_INT_EQ(ret, 0);
ret = close(accept_fd);
ASSERT_INT_EQ(ret, 0);
TEST_DONE();
freeaddrinfo(servinfo);
}
void socket_tests()
{
@ -554,136 +587,7 @@ void socket_tests()
socket_blocking_io_tests();
socket_nonblocking_io_tests();
socket_select_tests();
w32posix_done();
}
int socket_prepare(char* ip)
{
struct addrinfo hints;
memset(&hints, 0, sizeof(hints));
hints.ai_socktype = SOCK_STREAM;
if (getaddrinfo(ip, PORT, &hints, &servinfo) == -1)
return -1;
listen_fd = socket(servinfo->ai_family, servinfo->ai_socktype, servinfo->ai_protocol);
connect_fd = socket(servinfo->ai_family, servinfo->ai_socktype, servinfo->ai_protocol);
if ((listen_fd == -1) || (connect_fd == -1))
return -1;
if (-1 == bind(listen_fd, servinfo->ai_addr, servinfo->ai_addrlen))
return -1;
if (-1 == listen(listen_fd, BACKLOG))
return -1;
return 0;
}
#define READ_BUf_SIZE 1024 * 100
#define WRITE_BUF_SIZE 1024 * 100
void sample()
{
w32posix_initialize();
listen_fd = -1;
accept_fd = -1;
connect_fd = -1;
servinfo = NULL;
int ret;
ret = socket_prepare("127.0.0.1");
//Assert::AreEqual(ret, 0);
ret = connect(connect_fd, servinfo->ai_addr, servinfo->ai_addrlen);
accept_fd = accept(listen_fd, NULL, NULL);
//Assert::AreNotEqual(accept_fd, -1, L"", LINE_INFO());
//close(listen_fd);
//listen_fd = -1;
int c = connect_fd;
int s = accept_fd;
set_nonblock(c);
set_nonblock(s);
char *to_write = (char*)malloc(WRITE_BUF_SIZE);
char *read_to = (char*)malloc(READ_BUf_SIZE);
//write from c, read from s
fd_set read_set;
fd_set write_set;
FD_ZERO(&read_set);
FD_ZERO(&write_set);
FD_SET(s, &read_set);
FD_SET(c, &write_set);
int max_fd = max(c, s) + 1;
struct timeval time;
time.tv_sec = 60 * 60;
time.tv_usec = 0;
long long bytes_written = 0;
long long bytes_read = 0;
while (-1 != select(max_fd, &read_set, &write_set, NULL, &time))
{
BOOL read_ready = FD_ISSET(s, &read_set);
BOOL write_ready = FD_ISSET(c, &write_set);
FD_ZERO(&read_set);
FD_ZERO(&write_set);
FD_SET(s, &read_set);
if (write_ready)
{
#define WR_LIMIT WRITE_BUF_SIZE*5
int bw = 0;// send(c, to_write, WRITE_BUF_SIZE, 0);
while ((bw != -1) && (bytes_written < WR_LIMIT)) {
bw = send(c, to_write, WRITE_BUF_SIZE, 0);
if (bw > 0)
bytes_written += bw;
else {
ret = errno;
//Assert::AreEqual(errno, EAGAIN, L"", LINE_INFO());
}
}
if (bytes_written >= WR_LIMIT)
{
ret = shutdown(c, SD_SEND);
//Assert::AreEqual(ret, 0, L"", LINE_INFO());
}
else
FD_SET(c, &write_set);
}
if (read_ready)
{
int br = read(s, read_to, READ_BUf_SIZE);
while (br > 1) {
bytes_read += br;
br = read(s, read_to, READ_BUf_SIZE);
}
if (br == 0) //send from other side is done
break;
ret = errno;
//Assert::AreEqual(errno, EAGAIN, L"", LINE_INFO());
}
}
//Assert::AreEqual((bytes_written == bytes_read) ? 1 : 0, TRUE, L"", LINE_INFO());
if (servinfo) freeaddrinfo(servinfo);
if (listen_fd != -1) close(listen_fd);
if (connect_fd != -1) close(connect_fd);
if (accept_fd != -1) close(accept_fd);
socket_typical_ssh_payload_tests();
w32posix_done();
}

View File

@ -11,6 +11,7 @@
// - any APCs caused by IO completions
// - time out (errno = ETIMEOUT)
// Returns 0 on IO completion and -1 on rest
// if milli_seconds is 0, this function returns 0, its called with 0 to execute any scheduled APCs
int wait_for_any_event(HANDLE* events, int num_events, DWORD milli_seconds)
{
//todo - implement signal catching and handling
@ -25,6 +26,8 @@ int wait_for_any_event(HANDLE* events, int num_events, DWORD milli_seconds)
return 0;
}
else if (ret == WAIT_TIMEOUT) {
if (milli_seconds == 0)
return 0;
errno = ETIMEDOUT;
debug("ERROR: wait timed out");
return -1;
@ -42,7 +45,8 @@ int wait_for_any_event(HANDLE* events, int num_events, DWORD milli_seconds)
return 0;
}
else if (ret == 0) {
//timed out
if (milli_seconds == 0)
return 0;
errno = ETIMEDOUT;
debug("ERROR: wait timed out");
return -1;

View File

@ -128,8 +128,8 @@ void CALLBACK WSARecvCompletionRoutine(
)
{
struct w32_io* pio = (struct w32_io*)((char*)lpOverlapped - offsetof(struct w32_io, read_overlapped));
debug2("io:%p, pending_state:%d, remaining:%d, completed:%d, error:%d, transferred:%d",
pio, pio->read_details.pending, pio->read_details.remaining, pio->read_details.completed, dwError, cbTransferred);
debug2("io:%p, pending_state:%d, flags:%d, error:%d, received:%d",
pio, pio->read_details.pending, dwFlags, dwError, cbTransferred);
if (!dwError && !cbTransferred)
dwError = ERROR_GRACEFUL_DISCONNECT;
pio->read_details.error = dwError;
@ -179,6 +179,7 @@ int socketio_WSARecv(struct w32_io* pio, BOOL* completed) {
if (WSAGetLastError() == WSA_IO_PENDING)
{
//io is initiated and pending
debug2("WSARecv reported IO pending");
pio->read_details.pending = TRUE;
}
else { //failed
@ -382,7 +383,7 @@ void CALLBACK WSASendCompletionRoutine(
)
{
struct w32_io* pio = (struct w32_io*)((char*)lpOverlapped - offsetof(struct w32_io, write_overlapped));
debug2("io:%p, pending_state:%d, error:%d, transferred:%d of remaining:%d", pio, pio->write_details.pending, dwError, cbTransferred, pio->write_details.remaining);
debug2("io:%p, pending_state:%d, error:%d, sent:%d of remaining:%d", pio, pio->write_details.pending, dwError, cbTransferred, pio->write_details.remaining);
pio->write_details.error = dwError;
//assert that remaining == cbTransferred
pio->write_details.remaining -= cbTransferred;
@ -785,7 +786,13 @@ BOOL socketio_is_io_available(struct w32_io* pio, BOOL rd) {
int socketio_on_select(struct w32_io* pio, BOOL rd) {
debug2("io:%p type:%d", pio, pio->type);
debug2("io:%p type:%d rd:%d", pio, pio->type, rd);
//return if io is already available
if (socketio_is_io_available(pio, rd))
return 0;
//return if io is pending
if (rd && pio->read_details.pending)
return 0;

View File

@ -364,35 +364,18 @@ int w32_select(int fds, fd_set* readfds, fd_set* writefds, fd_set* exceptfds, co
if (readfds) {
for (i = 0; i < fds; i++)
if (FD_ISSET(i, readfds))
if (FD_ISSET(i, readfds)) {
CHECK_FD(i);
in_ready_fds++;
}
}
if (writefds) {
for (i = 0; i < fds; i++)
if (FD_ISSET(i, writefds))
if (FD_ISSET(i, writefds)) {
CHECK_FD(i);
}
//see if any io is ready
for (i = 0; i < fds; i++) {
if (readfds && FD_ISSET(i, readfds)) {
in_ready_fds++;
if (w32_io_is_io_available(fd_table.w32_ios[i], TRUE)) {
FD_SET(i, &read_ready_fds);
out_ready_fds++;
in_ready_fds;
}
}
if (writefds && FD_ISSET(i, writefds)) {
in_ready_fds++;
if (w32_io_is_io_available(fd_table.w32_ios[i], FALSE)) {
FD_SET(i, &write_ready_fds);
out_ready_fds++;
}
}
}
//if none of input fds are set return error
@ -402,18 +385,8 @@ int w32_select(int fds, fd_set* readfds, fd_set* writefds, fd_set* exceptfds, co
return -1;
}
//if io on some fds is already ready, return
if (out_ready_fds)
{
if (readfds)
*readfds = read_ready_fds;
if (writefds)
*writefds = write_ready_fds;
debug2("IO ready:%d, no wait", out_ready_fds);
return out_ready_fds;
}
//start async io on selected fds
//start async io on selected fds if needed and pick up any events that select needs to listen on
for (int i = 0; i < fds; i++) {
if (readfds && FD_ISSET(i, readfds)) {
@ -427,9 +400,45 @@ int w32_select(int fds, fd_set* readfds, fd_set* writefds, fd_set* exceptfds, co
if (writefds && FD_ISSET(i, writefds)) {
if (w32_io_on_select(fd_table.w32_ios[i], FALSE) == -1)
return -1;
if (fd_table.w32_ios[i]->type == CONNECT_FD) {
events[num_events++] = fd_table.w32_ios[i]->write_overlapped.hEvent;
}
}
}
//excute any scheduled APCs
if (0 != wait_for_any_event(NULL, 0, 0)) {
return -1;
}
//see if any io is ready
for (i = 0; i < fds; i++) {
if (readfds && FD_ISSET(i, readfds)) {
if (w32_io_is_io_available(fd_table.w32_ios[i], TRUE)) {
FD_SET(i, &read_ready_fds);
out_ready_fds++;
}
}
if (writefds && FD_ISSET(i, writefds)) {
if (w32_io_is_io_available(fd_table.w32_ios[i], FALSE)) {
FD_SET(i, &write_ready_fds);
out_ready_fds++;
}
}
}
//if io on some fds is already ready, return
if (out_ready_fds)
{
if (readfds)
*readfds = read_ready_fds;
if (writefds)
*writefds = write_ready_fds;
debug2("IO ready:%d, no wait", out_ready_fds);
return out_ready_fds;
}
do {
ticks_now = GetTickCount64();