From 4b6bb2ac18671009563e0c6a9dc22874327bf261 Mon Sep 17 00:00:00 2001 From: manojampalam Date: Sun, 28 Feb 2016 00:10:51 -0800 Subject: [PATCH] 2-28 C1 --- .../win32posix/SampleServer/SampleServer.cpp | 4 +- .../win32posix/Tests/socketiotests.cpp | 279 +++++++++++------- .../win32posix/win32posix/debug.c | 6 +- .../win32posix/win32posix/socketio.c | 102 +++++-- 4 files changed, 255 insertions(+), 136 deletions(-) diff --git a/contrib/win32/w32-posix-prototype/win32posix/SampleServer/SampleServer.cpp b/contrib/win32/w32-posix-prototype/win32posix/SampleServer/SampleServer.cpp index 5ea7968..084e61b 100644 --- a/contrib/win32/w32-posix-prototype/win32posix/SampleServer/SampleServer.cpp +++ b/contrib/win32/w32-posix-prototype/win32posix/SampleServer/SampleServer.cpp @@ -538,9 +538,9 @@ int pipelinetest() int __cdecl main(void) { - //return regular(); + return regular(); //return async(); writemode = TRUE; //return throughput(); - return pipetest(); + //return pipetest(); } diff --git a/contrib/win32/w32-posix-prototype/win32posix/Tests/socketiotests.cpp b/contrib/win32/w32-posix-prototype/win32posix/Tests/socketiotests.cpp index 011c373..19fbd33 100644 --- a/contrib/win32/w32-posix-prototype/win32posix/Tests/socketiotests.cpp +++ b/contrib/win32/w32-posix-prototype/win32posix/Tests/socketiotests.cpp @@ -5,7 +5,70 @@ extern "C" { using namespace Microsoft::VisualStudio::CppUnitTestFramework; -#define DEFAULT_PORT "27015" +#define PORT "34912" // the port users will be connecting to + +#define BACKLOG 10 // how many pending connections queue will hold + +// get sockaddr, IPv4 or IPv6: +void *get_in_addr(struct sockaddr *sa) +{ + if (sa->sa_family == AF_INET) { + return &(((struct sockaddr_in*)sa)->sin_addr); + } + + return &(((struct sockaddr_in6*)sa)->sin6_addr); +} + +int +unset_nonblock(int fd) +{ + int val; + + val = fcntl(fd, F_GETFL, 0); + if (val < 0) { + return (-1); + } + if (!(val & O_NONBLOCK)) { + return (0); + } + val &= ~O_NONBLOCK; + if (fcntl(fd, F_SETFL, val) == -1) { + return (-1); + } + return (0); +} + +int +set_nonblock(int fd) +{ + int val; + + val = fcntl(fd, F_GETFL, 0); + if (val < 0) { + return (-1); + } + if (val & O_NONBLOCK) { + return (0); + } + val |= O_NONBLOCK; + if (fcntl(fd, F_SETFL, val) == -1) { + return (-1); + } + return (0); + +} + +int listen_fd = -1; +int accept_fd = -1; +int connect_fd = -1; + +DWORD WINAPI MyThreadFunction(LPVOID lpParam) +{ + accept_fd = accept(listen_fd, NULL, NULL); + return 0; +} + + namespace UnitTests { @@ -14,63 +77,139 @@ namespace UnitTests public: - struct addrinfo *result = NULL; + struct addrinfo *servinfo = NULL, *p; struct addrinfo hints; - int ListenSocket = -1; TEST_METHOD_INITIALIZE(TestMethodInitialize) { - int iResult; - + w32posix_initialize(); - ZeroMemory(&hints, sizeof(hints)); - hints.ai_family = AF_INET; + listen_fd = -1; + accept_fd = -1; + connect_fd = -1; + struct sockaddr_storage their_addr; // connector's address information + socklen_t sin_size; + int yes = 1; + char s[INET6_ADDRSTRLEN]; + int rv; + + memset(&hints, 0, sizeof hints); + hints.ai_family = AF_UNSPEC; hints.ai_socktype = SOCK_STREAM; - hints.ai_protocol = IPPROTO_TCP; - hints.ai_flags = AI_PASSIVE; + hints.ai_flags = AI_PASSIVE; // use my IP - iResult = getaddrinfo(NULL, DEFAULT_PORT, &hints, &result); - if (iResult != 0) { - printf("getaddrinfo failed with error: %d\n", iResult); + if ((rv = getaddrinfo("127.0.0.1", PORT, &hints, &servinfo)) != 0) { + fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(rv)); return; } - ListenSocket = socket(result->ai_family, result->ai_socktype, result->ai_protocol); - if (ListenSocket == -1) { - printf("socket failed with error: %ld\n", errno); - return; + // loop through all the results and bind to the first we can + for (p = servinfo; p != NULL; p = p->ai_next) { + if ((listen_fd = socket(p->ai_family, p->ai_socktype, + p->ai_protocol)) == -1) { + perror("server: socket"); + continue; + } + + if (setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, (char*)&yes, + sizeof(int)) == -1) { + perror("setsockopt"); + exit(1); + } + + if (bind(listen_fd, p->ai_addr, p->ai_addrlen) == -1) { + int i = errno; + close(listen_fd); + perror("server: bind"); + continue; + } + + break; } - // Setup the TCP listening socket - iResult = bind(ListenSocket, result->ai_addr, (int)result->ai_addrlen); - if (iResult == -1) { - printf("bind failed with error: %d\n", errno); - return ; + freeaddrinfo(servinfo); // all done with this structure + servinfo = NULL; + + if (p == NULL) { + fprintf(stderr, "server: failed to bind\n"); + exit(1); } - iResult = listen(ListenSocket, SOMAXCONN); - if (iResult == -1) { - printf("listen failed with error: %d\n", errno); - return; - } - freeaddrinfo(result); + if (listen(listen_fd, BACKLOG) == -1) { + perror("listen"); + exit(1); + } } TEST_METHOD_CLEANUP(TestMethodCleanup) { - if (result) - freeaddrinfo(result); - if (ListenSocket != -1) - close(ListenSocket); + if (servinfo) + freeaddrinfo(servinfo); + if (listen_fd != -1) + close(listen_fd); + if (connect_fd != -1) + close(connect_fd); + if (accept_fd != -1) + close(accept_fd); w32posix_done(); } TEST_METHOD(TestMethod1) { - // TODO: Your test code here + int rv; + struct sockaddr_storage their_addr; + socklen_t sin_size; + int ret; + servinfo = NULL; + + memset(&hints, 0, sizeof hints); + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; + + rv = getaddrinfo("::1", PORT, &hints, &servinfo); + Assert::AreEqual(rv, 0, L"getaddreinfo failed", LINE_INFO()); + + p = servinfo; + connect_fd = socket(p->ai_family, p->ai_socktype, p->ai_protocol); + Assert::AreNotEqual(connect_fd, -1, L"connect_fd", LINE_INFO()); + + //set_nonblock(listen_fd); + //set_nonblock(connect_fd); + + //fd_set read_set; + //fd_set write_set; + //FD_ZERO(&read_set); + //FD_ZERO(&write_set); + //FD_SET(listen_fd, &read_set); + //FD_SET(connect_fd, &write_set); + + HANDLE thread = CreateThread(NULL, 0, MyThreadFunction, &connect_fd, 0, NULL); + + //sin_size = sizeof(their_addr); + //accept_fd = accept(listen_fd, (struct sockaddr *)&their_addr, &sin_size); + //Assert::AreEqual(accept_fd, -1, L"", LINE_INFO()); + //Assert::AreEqual(errno, EAGAIN, L"", LINE_INFO()); + + ret = connect(connect_fd, servinfo->ai_addr, servinfo->ai_addrlen); + Assert::AreEqual(ret, 0, L"", LINE_INFO()); + + WaitForSingleObject(thread, INFINITE); + CloseHandle(thread); + + int i = 9; + /* accept_fd = accept(listen_fd, (struct sockaddr *)&their_addr, &sin_size); + Assert::AreNotEqual(accept_fd, -1, L"", LINE_INFO()); +*/ + /* ret = connect(connect_fd, servinfo->ai_addr, servinfo->ai_addrlen); + Assert::AreEqual(ret, 0, L"", LINE_INFO());*/ + + } + + TEST_METHOD(TestMethod) + { fd_set* set = (fd_set*)malloc(sizeof(fd_set)); FD_ZERO(set); @@ -87,82 +226,6 @@ namespace UnitTests Assert::AreEqual(0, FD_ISSET(0, set), L"", LINE_INFO()); Assert::AreEqual(0, FD_ISSET(1, set), L"", LINE_INFO()); Assert::AreEqual(0, FD_ISSET(2, set), L"", LINE_INFO()); - - w32posix_initialize(); - int sockfd, new_fd; // listen on sock_fd, new connection on new_fd - struct addrinfo hints, *servinfo, *p; - struct sockaddr_storage their_addr; // connector's address information - socklen_t sin_size; - int yes = 1; - int rv; -#define PORT "3490" // the port users will be connecting to - -#define BACKLOG 10 // how many pending connections queue will hold - - memset(&hints, 0, sizeof hints); - hints.ai_family = AF_UNSPEC; - hints.ai_socktype = SOCK_STREAM; - hints.ai_flags = AI_PASSIVE; // use my IP - - if ((rv = getaddrinfo(NULL, PORT, &hints, &servinfo)) != 0) { - fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(rv)); - } - - // loop through all the results and bind to the first we can - for (p = servinfo; p != NULL; p = p->ai_next) { - if ((sockfd = socket(p->ai_family, p->ai_socktype, - p->ai_protocol)) == -1) { - perror("server: socket"); - continue; - } - - if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, (char*)&yes, - sizeof(int)) == -1) { - perror("setsockopt"); - exit(1); - } - - if (bind(sockfd, p->ai_addr, p->ai_addrlen) == -1) { - close(sockfd); - perror("server: bind"); - continue; - } - - break; - } - - freeaddrinfo(servinfo); // all done with this structure - - if (p == NULL) { - fprintf(stderr, "server: failed to bind\n"); - exit(1); - } - - if (listen(sockfd, BACKLOG) == -1) { - perror("listen"); - exit(1); - } - - fcntl(sockfd, F_SETFL, O_NONBLOCK); - - fd_set read_set, write_set, except_set; - - ZeroMemory(&read_set, sizeof(fd_set)); - ZeroMemory(&write_set, sizeof(fd_set)); - ZeroMemory(&except_set, sizeof(fd_set)); - - FD_SET(sockfd, &read_set); - struct timeval timeout; - timeout.tv_sec = 300; - timeout.tv_usec = 0; - int ret = select(sockfd, &read_set, &write_set, &except_set, &timeout); - - new_fd = accept(sockfd, (struct sockaddr *)&their_addr, &sin_size); - } - - - - }; } \ No newline at end of file diff --git a/contrib/win32/w32-posix-prototype/win32posix/win32posix/debug.c b/contrib/win32/w32-posix-prototype/win32posix/win32posix/debug.c index b74a541..d30d554 100644 --- a/contrib/win32/w32-posix-prototype/win32posix/win32posix/debug.c +++ b/contrib/win32/w32-posix-prototype/win32posix/win32posix/debug.c @@ -19,11 +19,15 @@ void debug_initialize() { } void debug_done() { - fclose(log); + if (log) + fclose(log); } void write_log(const char *source_name, const char *function_name, int line_num, const char *fmt, ...) { + if (!log) + return; + va_list args; fprintf(log,"\n%s:%s:%d: ", source_name, function_name, line_num); va_start(args, fmt); diff --git a/contrib/win32/w32-posix-prototype/win32posix/win32posix/socketio.c b/contrib/win32/w32-posix-prototype/win32posix/win32posix/socketio.c index 0918bc4..a46e564 100644 --- a/contrib/win32/w32-posix-prototype/win32posix/win32posix/socketio.c +++ b/contrib/win32/w32-posix-prototype/win32posix/win32posix/socketio.c @@ -217,13 +217,14 @@ struct w32_io* socketio_socket(int domain, int type, int protocol) { return pio; } -#define SET_ERRNO_ON_ERROR(ret) \ +#define SET_ERRNO_ON_ERROR(expr) \ do { \ - if ((ret) == SOCKET_ERROR) { \ + int ret = (expr); \ + if (ret == SOCKET_ERROR) { \ errno = errno_from_WSALastError(); \ - debug("ERROR:%d, io:%p", errno, pio); \ + debug("ERROR:%d", errno); \ } \ - return (ret); \ + return ret; \ } while (0) int socketio_setsockopt(struct w32_io* pio, int level, int optname, const char* optval, int optlen) { @@ -248,7 +249,7 @@ int socketio_listen(struct w32_io* pio, int backlog) { } int socketio_bind(struct w32_io* pio, const struct sockaddr *name, int namelen) { - SET_ERRNO_ON_ERROR(bind(pio->sock, name, namelen)); + SET_ERRNO_ON_ERROR(bind(pio->sock, name, namelen)); } int socketio_recv(struct w32_io* pio, void *buf, size_t len, int flags) { @@ -523,7 +524,7 @@ int socketio_close(struct w32_io* pio) { struct w32_io* socketio_accept(struct w32_io* pio, struct sockaddr* addr, int* addrlen) { struct w32_io *accept_io = NULL; int iResult = 0; - struct acceptEx_context* context = (struct acceptEx_context*)pio->context; + struct acceptEx_context* context; debug2("io:%p", pio); //start io if not already started @@ -553,11 +554,22 @@ struct w32_io* socketio_accept(struct w32_io* pio, struct sockaddr* addr, int* a } + context = (struct acceptEx_context*)pio->context; + pio->read_details.pending = FALSE; + ResetEvent(pio->read_overlapped.hEvent); + + if (pio->read_details.error) + { + errno = errno_from_WSAError(pio->read_details.error); + debug("ERROR: async io completed with error: %d, io:%p", errno, pio); + goto on_error; + } + if (0 != setsockopt(context->accept_socket, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, (char*)&pio->sock, sizeof(pio->sock))) { errno = errno_from_WSALastError(); debug("ERROR: setsockopt failed:%d, io:%p", errno, pio); - return NULL; + goto on_error; } accept_io = (struct w32_io*)malloc(sizeof(struct w32_io)); @@ -565,33 +577,58 @@ struct w32_io* socketio_accept(struct w32_io* pio, struct sockaddr* addr, int* a { errno = ENOMEM; debug("ERROR:%d, io:%p", errno, pio); - return NULL; + goto on_error; } memset(accept_io, 0, sizeof(struct w32_io)); accept_io->sock = context->accept_socket; accept_io->type = SOCK_FD; context->accept_socket = INVALID_SOCKET; - pio->read_details.pending = FALSE; - ResetEvent(pio->read_overlapped.hEvent); debug2("accept io:%p", accept_io); + + //TODO : fill in addr return accept_io; + +on_error: + if (context->accept_socket != INVALID_SOCKET) { + closesocket(context->accept_socket); + context->accept_socket = INVALID_SOCKET; + } + + return NULL; } int socketio_connectex(struct w32_io* pio, const struct sockaddr* name, int namelen) { - struct sockaddr_in tmp_addr; + struct sockaddr_in tmp_addr4; + struct sockaddr_in6 tmp_addr6; + SOCKADDR* tmp_addr; + size_t tmp_addr_len; DWORD tmp_bytes; GUID connectex_guid = WSAID_CONNECTEX; LPFN_CONNECTEX ConnectEx; - - //TODO - add support for DGRAM socket, below works only for STREAM sockets - ZeroMemory(&tmp_addr, sizeof(tmp_addr)); - tmp_addr.sin_family = AF_UNSPEC; - tmp_addr.sin_addr.s_addr = INADDR_ANY; - tmp_addr.sin_port = 0; - if (SOCKET_ERROR == bind(pio->sock, (SOCKADDR*)&tmp_addr, sizeof(tmp_addr))) + if (name->sa_family == AF_INET6) { + ZeroMemory(&tmp_addr6, sizeof(tmp_addr6)); + tmp_addr6.sin6_family = AF_INET6; + tmp_addr6.sin6_port = 0; + tmp_addr = (SOCKADDR*)&tmp_addr6; + tmp_addr_len = sizeof(tmp_addr6); + } + else if (name->sa_family == AF_INET) { + ZeroMemory(&tmp_addr4, sizeof(tmp_addr4)); + tmp_addr4.sin_family = AF_INET; + tmp_addr4.sin_port = 0; + tmp_addr = (SOCKADDR*)&tmp_addr4; + tmp_addr_len = sizeof(tmp_addr4); + } + else { + errno = ENOTSUP; + debug("ERROR: unsuppored address family:%d, io:%p", name->sa_family, pio); + return -1; + } + + if (SOCKET_ERROR == bind(pio->sock, tmp_addr, tmp_addr_len)) { errno = errno_from_WSALastError(); debug("ERROR: bind failed :%d, io:%p", errno, pio); @@ -614,7 +651,7 @@ int socketio_connectex(struct w32_io* pio, const struct sockaddr* name, int name return -1; } - if (TRUE == ConnectEx(pio->sock, name, namelen, NULL, 0, pio->write_details.completed, &pio->write_overlapped)) + if (TRUE == ConnectEx(pio->sock, name, namelen, NULL, 0, NULL, &pio->write_overlapped)) { //set completion event SetEvent(pio->write_overlapped.hEvent); @@ -632,7 +669,7 @@ int socketio_connectex(struct w32_io* pio, const struct sockaddr* name, int name } pio->write_details.pending = TRUE; - pio->type == CONNECT_FD; + pio->type = CONNECT_FD; return 0; } @@ -665,6 +702,16 @@ int socketio_connect(struct w32_io* pio, const struct sockaddr* name, int namele } + //close event handle + CloseHandle(pio->write_overlapped.hEvent); + pio->write_overlapped.hEvent = 0; + + if (pio->write_details.error) { + errno = errno_from_WSAError(pio->write_details.error); + debug("ERROR: async io completed with error: %d, io:%p", errno, pio); + return -1; + } + if (0 != setsockopt(pio->sock, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, NULL, 0)) { errno = errno_from_WSALastError(); @@ -672,9 +719,7 @@ int socketio_connect(struct w32_io* pio, const struct sockaddr* name, int namele return NULL; } - //close event handle - CloseHandle(pio->write_overlapped.hEvent); - pio->write_overlapped.hEvent = 0; + pio->type = SOCK_FD; return 0; } @@ -693,8 +738,11 @@ BOOL socketio_is_io_available(struct w32_io* pio, BOOL rd) { } else { if (pending && WSAGetLastError() != WSA_IO_INCOMPLETE) { - //unexpected error; - debug("ERROR:Unxpected State. io:%p, WSAError:%d", pio, WSAGetLastError()); + if (pio->type == LISTEN_FD) + pio->read_details.error = WSAGetLastError(); + else + pio->write_details.error = WSAGetLastError(); + return TRUE; } return FALSE; } @@ -725,6 +773,10 @@ int socketio_on_select(struct w32_io* pio, BOOL rd) { return -1; return 0; } + else if (pio->type == CONNECT_FD) { + //nothing to do for connect + return 0; + } else if (rd) { if (socketio_WSARecv(pio, NULL) != 0) return -1;