From 684470e9f2b1379a41fefcb27a2c4ecba1a91e4f Mon Sep 17 00:00:00 2001 From: manojampalam Date: Sun, 28 Feb 2016 22:37:36 -0800 Subject: [PATCH] 2-28 C6 --- .../win32posix/Tests/socketiotests.cpp | 53 ++++++++++++------- .../win32posix/win32posix/debug.c | 6 +-- .../win32posix/win32posix/debug.h | 1 + .../win32posix/win32posix/socketio.c | 27 ++++++---- .../win32posix/win32posix/w32fd.c | 19 ++++++- .../win32posix/win32posix/w32fd.h | 15 +++--- 6 files changed, 80 insertions(+), 41 deletions(-) diff --git a/contrib/win32/w32-posix-prototype/win32posix/Tests/socketiotests.cpp b/contrib/win32/w32-posix-prototype/win32posix/Tests/socketiotests.cpp index 30a7fc5..4777317 100644 --- a/contrib/win32/w32-posix-prototype/win32posix/Tests/socketiotests.cpp +++ b/contrib/win32/w32-posix-prototype/win32posix/Tests/socketiotests.cpp @@ -74,8 +74,8 @@ int socket_prepare(char* ip) return 0; } -#define READ_BUf_SIZE 1024 * 1024 * 2 -#define WRITE_BUF_SIZE 1024 * 1024 * 5 +#define READ_BUf_SIZE 1024 * 100 +#define WRITE_BUF_SIZE 1024 * 100 namespace UnitTests { @@ -105,7 +105,7 @@ namespace UnitTests TEST_METHOD(socketio) { int ret; - ret = socket_prepare("::1"); + ret = socket_prepare("127.0.0.1"); Assert::AreEqual(ret, 0, L"failed to prepare sockets", LINE_INFO()); ret = connect(connect_fd, servinfo->ai_addr, servinfo->ai_addrlen); @@ -114,15 +114,18 @@ namespace UnitTests accept_fd = accept(listen_fd, NULL, NULL); Assert::AreNotEqual(accept_fd, -1, L"", LINE_INFO()); + //close(listen_fd); + //listen_fd = -1; + int c = connect_fd; int s = accept_fd; set_nonblock(c); set_nonblock(s); - char *to_write = (char*)malloc(WRITE_BUF_SIZE); //5MB + char *to_write = (char*)malloc(WRITE_BUF_SIZE); - char *read_to = (char*)malloc(READ_BUf_SIZE); //2MB + char *read_to = (char*)malloc(READ_BUf_SIZE); //write from c, read from s fd_set read_set; @@ -144,24 +147,37 @@ namespace UnitTests BOOL write_ready = FD_ISSET(c, &write_set); FD_ZERO(&read_set); FD_ZERO(&write_set); + FD_SET(s, &read_set); - if (bytes_written > WRITE_BUF_SIZE * 100) + if (write_ready) { - ret = shutdown(c, SD_SEND), 0, L"", LINE_INFO(); - Assert::AreEqual(ret, 0, L"", LINE_INFO()); - } - else if (write_ready) - { - int bw = send(c, to_write, WRITE_BUF_SIZE, 0); - while (bw != -1) { - bytes_written += bw; + +#define WR_LIMIT WRITE_BUF_SIZE*5 + + int bw = 0;// send(c, to_write, WRITE_BUF_SIZE, 0); + while ((bw != -1) && (bytes_written < WR_LIMIT)) { + bw = send(c, to_write, WRITE_BUF_SIZE, 0); + if (bw > 0) + bytes_written += bw; + else { + ret = errno; + Assert::AreEqual(errno, EAGAIN, L"", LINE_INFO()); + } + + } + + if (bytes_written >= WR_LIMIT) + { + ret = shutdown(c, SD_SEND), 0, L"", LINE_INFO(); + Assert::AreEqual(ret, 0, L"", LINE_INFO()); } - ret = errno; - Assert::AreEqual(ret, EAGAIN, L"", LINE_INFO()); - FD_SET(c, &write_set); + else + FD_SET(c, &write_set); } + + if (read_ready) { int br = read(s, read_to, READ_BUf_SIZE); @@ -173,8 +189,7 @@ namespace UnitTests if (br == 0) //send from other side is done break; ret = errno; - Assert::AreEqual(ret, EAGAIN, L"", LINE_INFO()); - FD_SET(s, &read_set); + Assert::AreEqual(errno, EAGAIN, L"", LINE_INFO()); } } diff --git a/contrib/win32/w32-posix-prototype/win32posix/win32posix/debug.c b/contrib/win32/w32-posix-prototype/win32posix/win32posix/debug.c index d30d554..00ec91b 100644 --- a/contrib/win32/w32-posix-prototype/win32posix/win32posix/debug.c +++ b/contrib/win32/w32-posix-prototype/win32posix/win32posix/debug.c @@ -14,7 +14,7 @@ void debug_initialize() { GetLocalTime(&time); sprintf(filename + len, "_%d_%d_%d.log", time.wHour, time.wMinute, time.wSecond); - + //sprintf(filename, "%s", "e:\\tmp.log"); log = fopen(filename, "w"); } @@ -29,9 +29,9 @@ void write_log(const char *source_name, const char *function_name, int line_num, return; va_list args; - fprintf(log,"\n%s:%s:%d: ", source_name, function_name, line_num); + fprintf(log,"\n%s:%d: ", function_name, line_num); va_start(args, fmt); - fprintf(log,fmt, args); + vfprintf(log,fmt, args); va_end(args); fflush(log); } \ No newline at end of file diff --git a/contrib/win32/w32-posix-prototype/win32posix/win32posix/debug.h b/contrib/win32/w32-posix-prototype/win32posix/win32posix/debug.h index e2b5bfe..52b9618 100644 --- a/contrib/win32/w32-posix-prototype/win32posix/win32posix/debug.h +++ b/contrib/win32/w32-posix-prototype/win32posix/win32posix/debug.h @@ -2,6 +2,7 @@ #define debug(cformat, ...) write_log(__FILE__, __FUNCTION__, __LINE__, cformat, __VA_ARGS__) #define debug2(cformat, ...) write_log(__FILE__, __FUNCTION__, __LINE__, cformat, __VA_ARGS__) +#define debug3(cformat, ...) write_log(__FILE__, __FUNCTION__, __LINE__, cformat, __VA_ARGS__) void debug_initialize(); void debug_done(); diff --git a/contrib/win32/w32-posix-prototype/win32posix/win32posix/socketio.c b/contrib/win32/w32-posix-prototype/win32posix/win32posix/socketio.c index 4d8246e..20bd873 100644 --- a/contrib/win32/w32-posix-prototype/win32posix/win32posix/socketio.c +++ b/contrib/win32/w32-posix-prototype/win32posix/win32posix/socketio.c @@ -134,7 +134,7 @@ void CALLBACK WSARecvCompletionRoutine( { struct w32_io* pio = (struct w32_io*)((char*)lpOverlapped - offsetof(struct w32_io, read_overlapped)); debug2("io:%p, pending_state:%d, remaining:%d, completed:%d, error:%d, transferred:%d", - pio, pio->read_details.pending, pio->read_details.remaining, pio->read_details.pending, dwError, cbTransferred); + pio, pio->read_details.pending, pio->read_details.remaining, pio->read_details.completed, dwError, cbTransferred); if (!dwError && !cbTransferred) dwError = ERROR_GRACEFUL_DISCONNECT; pio->read_details.error = dwError; @@ -176,7 +176,7 @@ int socketio_WSARecv(struct w32_io* pio, BOOL* completed) { { pio->read_details.pending = TRUE; //receive has completed but APC is pending to be scheduled - debug2("WSARecv immediate completion, io:%p", pio); + debug2("WSARecv returned 0, io:%p", pio); if (completed) *completed = TRUE; } @@ -287,6 +287,7 @@ int socketio_recv(struct w32_io* pio, void *buf, size_t len, int flags) { return num_bytes_copied; } + //if there was an error on async call, return if (pio->read_details.error) { if (pio->read_details.error == ERROR_GRACEFUL_DISCONNECT) { @@ -307,6 +308,7 @@ int socketio_recv(struct w32_io* pio, void *buf, size_t len, int flags) { if (completed) { //Let APC be scheduled + debug2("Letting APC to execute"); SleepEx(1, TRUE); if (pio->read_details.pending) { //this shouldn't be happening @@ -319,6 +321,7 @@ int socketio_recv(struct w32_io* pio, void *buf, size_t len, int flags) { if (w32_io_is_blocking(pio)) { //wait until io is done + debug3("socket in blocking mode"); while (socketio_is_io_available(pio, TRUE) == FALSE) { if (0 != wait_for_any_event(NULL, 0, INFINITE)) return -1; @@ -373,7 +376,7 @@ void CALLBACK WSASendCompletionRoutine( ) { struct w32_io* pio = (struct w32_io*)((char*)lpOverlapped - offsetof(struct w32_io, write_overlapped)); - debug2("io:%p, pending_state:%d, error:%d, transferred:%d, remaining:%d", pio, pio->write_details.pending, dwError, cbTransferred, pio->write_details.remaining); + debug2("io:%p, pending_state:%d, error:%d, transferred:%d of remaining:%d", pio, pio->write_details.pending, dwError, cbTransferred, pio->write_details.remaining); pio->write_details.error = dwError; //assert that remaining == cbTransferred pio->write_details.remaining -= cbTransferred; @@ -413,7 +416,7 @@ int socketio_send(struct w32_io* pio, const void *buf, size_t len, int flags) { } else { errno = EAGAIN; - debug2("IO pending, io:%p", pio); + debug2("IO currently pending, EAGAIN, io:%p", pio); return -1; } } @@ -453,7 +456,7 @@ int socketio_send(struct w32_io* pio, const void *buf, size_t len, int flags) { if (ret == 0) { //send has completed and APC is scheduled, let it run - debug2("WSASend immediate completion, io:%p", pio); + debug2("WSASend returned 0, APC scheduled io:%p", pio); pio->write_details.pending = TRUE; pio->write_details.remaining = wsabuf.len; SleepEx(1, TRUE); @@ -470,16 +473,18 @@ int socketio_send(struct w32_io* pio, const void *buf, size_t len, int flags) { if (WSAGetLastError() == WSA_IO_PENDING) { //io is initiated and pending - debug2("IO pending, io:%p", pio); + debug2("WSASend reported IO pending, io:%p", pio); pio->write_details.pending = TRUE; pio->write_details.remaining = wsabuf.len; if (w32_io_is_blocking(pio)) { //wait until io is done + debug3("waiting as socket is in blocking mode, io:%p", pio); while (pio->write_details.pending) SleepEx(INFINITE, TRUE); } + debug3("returning %d", wsabuf.len); return wsabuf.len; } else { //failed @@ -513,9 +518,10 @@ int socketio_close(struct w32_io* pio) { closesocket(ctx->accept_socket); if (ctx->lpOutputBuf) free(ctx->lpOutputBuf); - free(pio->context); + //TODO: Fix this. Freeing this is crashing + //free(pio->context); } - //TODO: cleanup other details in pio->context + } else if (pio->type == CONNECT_FD) { if (pio->write_overlapped.hEvent) @@ -723,9 +729,8 @@ int socketio_connect(struct w32_io* pio, const struct sockaddr* name, int namele return -1; } - if (0 != setsockopt(pio->sock, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, NULL, 0)) + if (0 != setsockopt(pio->sock, SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, NULL, 0)) { - int i = WSAGetLastError(); errno = errno_from_WSALastError(); debug("ERROR: setsockopt failed:%d, io:%p", errno, pio); return -1; @@ -774,7 +779,7 @@ BOOL socketio_is_io_available(struct w32_io* pio, BOOL rd) { int socketio_on_select(struct w32_io* pio, BOOL rd) { - debug2("io:%p", pio); + debug2("io:%p type:%d", pio, pio->type); if (rd && pio->read_details.pending) return 0; diff --git a/contrib/win32/w32-posix-prototype/win32posix/win32posix/w32fd.c b/contrib/win32/w32-posix-prototype/win32posix/win32posix/w32fd.c index d20acf3..152df11 100644 --- a/contrib/win32/w32-posix-prototype/win32posix/win32posix/w32fd.c +++ b/contrib/win32/w32-posix-prototype/win32posix/win32posix/w32fd.c @@ -117,12 +117,13 @@ int w32_socket(int domain, int type, int protocol) { } fd_table_set(pio, min_index); - debug("socket:%p, io:%p, fd:%d ", pio->sock, pio, min_index); + debug("socket:%d, io:%p, fd:%d ", pio->sock, pio, min_index); return min_index; } int w32_accept(int fd, struct sockaddr* addr, int* addrlen) { + debug3("fd:%d", fd); int min_index = fd_table_get_min_index(); struct w32_io* pio = NULL; @@ -135,7 +136,7 @@ int w32_accept(int fd, struct sockaddr* addr, int* addrlen) } fd_table_set(pio, min_index); - debug("socket:%p, io:%p, fd:%d ", pio->sock, pio, min_index); + debug("socket:%d, io:%p, fd:%d ", pio->sock, pio, min_index); return min_index; } @@ -149,52 +150,62 @@ do { \ } while (0) int w32_setsockopt(int fd, int level, int optname, const char* optval, int optlen) { + debug3("fd:%d", fd); CHECK_FD(fd); return socketio_setsockopt(fd_table.w32_ios[fd], level, optname, optval, optlen); } int w32_getsockopt(int fd, int level, int optname, char* optval, int* optlen) { + debug3("fd:%d", fd); CHECK_FD(fd); return socketio_getsockopt(fd_table.w32_ios[fd], level, optname, optval, optlen); } int w32_getsockname(int fd, struct sockaddr* name, int* namelen) { + debug3("fd:%d", fd); CHECK_FD(fd); return socketio_getsockname(fd_table.w32_ios[fd], name, namelen); } int w32_getpeername(int fd, struct sockaddr* name, int* namelen) { + debug3("fd:%d", fd); CHECK_FD(fd); return socketio_getpeername(fd_table.w32_ios[fd], name, namelen); } int w32_listen(int fd, int backlog) { + debug3("fd:%d", fd); CHECK_FD(fd); return socketio_listen(fd_table.w32_ios[fd], backlog); } int w32_bind(int fd, const struct sockaddr *name, int namelen) { + debug3("fd:%d", fd); CHECK_FD(fd); return socketio_bind(fd_table.w32_ios[fd], name, namelen); } int w32_connect(int fd, const struct sockaddr* name, int namelen) { + debug3("fd:%d", fd); CHECK_FD(fd); return socketio_connect(fd_table.w32_ios[fd], name, namelen); } int w32_recv(int fd, void *buf, size_t len, int flags) { + debug3("fd:%d", fd); CHECK_FD(fd); return socketio_recv(fd_table.w32_ios[fd], buf, len, flags); } int w32_send(int fd, const void *buf, size_t len, int flags) { + debug3("fd:%d", fd); CHECK_FD(fd); return socketio_send(fd_table.w32_ios[fd], buf, len, flags); } int w32_shutdown(int fd, int how) { + debug3("fd:%d how:%d", fd, how); CHECK_FD(fd); return socketio_shutdown(fd_table.w32_ios[fd], how); } @@ -246,11 +257,15 @@ int w32_open(const char *pathname, int flags, ...) { int w32_read(int fd, void *dst, unsigned int max) { CHECK_FD(fd); + if (fd_table.w32_ios[fd]->type == SOCK_FD) + return socketio_recv(fd_table.w32_ios[fd], dst, max, 0); return fileio_read(fd_table.w32_ios[fd], dst, max); } int w32_write(int fd, const void *buf, unsigned int max) { CHECK_FD(fd); + if (fd_table.w32_ios[fd]->type == SOCK_FD) + return socketio_send(fd_table.w32_ios[fd], buf, max, 0); return fileio_write(fd_table.w32_ios[fd], buf, max); } diff --git a/contrib/win32/w32-posix-prototype/win32posix/win32posix/w32fd.h b/contrib/win32/w32-posix-prototype/win32posix/win32posix/w32fd.h index 971a09b..7e8bfb2 100644 --- a/contrib/win32/w32-posix-prototype/win32posix/win32posix/w32fd.h +++ b/contrib/win32/w32-posix-prototype/win32posix/win32posix/w32fd.h @@ -1,16 +1,19 @@ +#pragma once + #include #include #include "debug.h" + //order to be maintained enum w32_io_type { UNKOWN_FD = 0, - LISTEN_FD, - CONNECT_FD, - SOCK_FD, - FILE_FD, - PIPE_FD, - CONSOLE_FD + LISTEN_FD = 1, + CONNECT_FD = 2, + SOCK_FD = 3, + FILE_FD = 4, + PIPE_FD = 5, + CONSOLE_FD = 6 }; struct w32_io {