This commit is contained in:
manojampalam 2016-02-28 00:10:51 -08:00
parent 3e61818f19
commit 4b6bb2ac18
4 changed files with 255 additions and 136 deletions

View File

@ -538,9 +538,9 @@ int pipelinetest()
int __cdecl main(void)
{
//return regular();
return regular();
//return async();
writemode = TRUE;
//return throughput();
return pipetest();
//return pipetest();
}

View File

@ -5,7 +5,70 @@ extern "C" {
using namespace Microsoft::VisualStudio::CppUnitTestFramework;
#define DEFAULT_PORT "27015"
#define PORT "34912" // the port users will be connecting to
#define BACKLOG 10 // how many pending connections queue will hold
// get sockaddr, IPv4 or IPv6:
void *get_in_addr(struct sockaddr *sa)
{
if (sa->sa_family == AF_INET) {
return &(((struct sockaddr_in*)sa)->sin_addr);
}
return &(((struct sockaddr_in6*)sa)->sin6_addr);
}
int
unset_nonblock(int fd)
{
int val;
val = fcntl(fd, F_GETFL, 0);
if (val < 0) {
return (-1);
}
if (!(val & O_NONBLOCK)) {
return (0);
}
val &= ~O_NONBLOCK;
if (fcntl(fd, F_SETFL, val) == -1) {
return (-1);
}
return (0);
}
int
set_nonblock(int fd)
{
int val;
val = fcntl(fd, F_GETFL, 0);
if (val < 0) {
return (-1);
}
if (val & O_NONBLOCK) {
return (0);
}
val |= O_NONBLOCK;
if (fcntl(fd, F_SETFL, val) == -1) {
return (-1);
}
return (0);
}
int listen_fd = -1;
int accept_fd = -1;
int connect_fd = -1;
DWORD WINAPI MyThreadFunction(LPVOID lpParam)
{
accept_fd = accept(listen_fd, NULL, NULL);
return 0;
}
namespace UnitTests
{
@ -14,63 +77,139 @@ namespace UnitTests
public:
struct addrinfo *result = NULL;
struct addrinfo *servinfo = NULL, *p;
struct addrinfo hints;
int ListenSocket = -1;
TEST_METHOD_INITIALIZE(TestMethodInitialize)
{
int iResult;
w32posix_initialize();
ZeroMemory(&hints, sizeof(hints));
hints.ai_family = AF_INET;
listen_fd = -1;
accept_fd = -1;
connect_fd = -1;
struct sockaddr_storage their_addr; // connector's address information
socklen_t sin_size;
int yes = 1;
char s[INET6_ADDRSTRLEN];
int rv;
memset(&hints, 0, sizeof hints);
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
hints.ai_protocol = IPPROTO_TCP;
hints.ai_flags = AI_PASSIVE;
hints.ai_flags = AI_PASSIVE; // use my IP
iResult = getaddrinfo(NULL, DEFAULT_PORT, &hints, &result);
if (iResult != 0) {
printf("getaddrinfo failed with error: %d\n", iResult);
if ((rv = getaddrinfo("127.0.0.1", PORT, &hints, &servinfo)) != 0) {
fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(rv));
return;
}
ListenSocket = socket(result->ai_family, result->ai_socktype, result->ai_protocol);
if (ListenSocket == -1) {
printf("socket failed with error: %ld\n", errno);
return;
// loop through all the results and bind to the first we can
for (p = servinfo; p != NULL; p = p->ai_next) {
if ((listen_fd = socket(p->ai_family, p->ai_socktype,
p->ai_protocol)) == -1) {
perror("server: socket");
continue;
}
if (setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, (char*)&yes,
sizeof(int)) == -1) {
perror("setsockopt");
exit(1);
}
if (bind(listen_fd, p->ai_addr, p->ai_addrlen) == -1) {
int i = errno;
close(listen_fd);
perror("server: bind");
continue;
}
break;
}
// Setup the TCP listening socket
iResult = bind(ListenSocket, result->ai_addr, (int)result->ai_addrlen);
if (iResult == -1) {
printf("bind failed with error: %d\n", errno);
return ;
freeaddrinfo(servinfo); // all done with this structure
servinfo = NULL;
if (p == NULL) {
fprintf(stderr, "server: failed to bind\n");
exit(1);
}
iResult = listen(ListenSocket, SOMAXCONN);
if (iResult == -1) {
printf("listen failed with error: %d\n", errno);
return;
}
freeaddrinfo(result);
if (listen(listen_fd, BACKLOG) == -1) {
perror("listen");
exit(1);
}
}
TEST_METHOD_CLEANUP(TestMethodCleanup)
{
if (result)
freeaddrinfo(result);
if (ListenSocket != -1)
close(ListenSocket);
if (servinfo)
freeaddrinfo(servinfo);
if (listen_fd != -1)
close(listen_fd);
if (connect_fd != -1)
close(connect_fd);
if (accept_fd != -1)
close(accept_fd);
w32posix_done();
}
TEST_METHOD(TestMethod1)
{
// TODO: Your test code here
int rv;
struct sockaddr_storage their_addr;
socklen_t sin_size;
int ret;
servinfo = NULL;
memset(&hints, 0, sizeof hints);
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
rv = getaddrinfo("::1", PORT, &hints, &servinfo);
Assert::AreEqual(rv, 0, L"getaddreinfo failed", LINE_INFO());
p = servinfo;
connect_fd = socket(p->ai_family, p->ai_socktype, p->ai_protocol);
Assert::AreNotEqual(connect_fd, -1, L"connect_fd", LINE_INFO());
//set_nonblock(listen_fd);
//set_nonblock(connect_fd);
//fd_set read_set;
//fd_set write_set;
//FD_ZERO(&read_set);
//FD_ZERO(&write_set);
//FD_SET(listen_fd, &read_set);
//FD_SET(connect_fd, &write_set);
HANDLE thread = CreateThread(NULL, 0, MyThreadFunction, &connect_fd, 0, NULL);
//sin_size = sizeof(their_addr);
//accept_fd = accept(listen_fd, (struct sockaddr *)&their_addr, &sin_size);
//Assert::AreEqual(accept_fd, -1, L"", LINE_INFO());
//Assert::AreEqual(errno, EAGAIN, L"", LINE_INFO());
ret = connect(connect_fd, servinfo->ai_addr, servinfo->ai_addrlen);
Assert::AreEqual(ret, 0, L"", LINE_INFO());
WaitForSingleObject(thread, INFINITE);
CloseHandle(thread);
int i = 9;
/* accept_fd = accept(listen_fd, (struct sockaddr *)&their_addr, &sin_size);
Assert::AreNotEqual(accept_fd, -1, L"", LINE_INFO());
*/
/* ret = connect(connect_fd, servinfo->ai_addr, servinfo->ai_addrlen);
Assert::AreEqual(ret, 0, L"", LINE_INFO());*/
}
TEST_METHOD(TestMethod)
{
fd_set* set = (fd_set*)malloc(sizeof(fd_set));
FD_ZERO(set);
@ -87,82 +226,6 @@ namespace UnitTests
Assert::AreEqual(0, FD_ISSET(0, set), L"", LINE_INFO());
Assert::AreEqual(0, FD_ISSET(1, set), L"", LINE_INFO());
Assert::AreEqual(0, FD_ISSET(2, set), L"", LINE_INFO());
w32posix_initialize();
int sockfd, new_fd; // listen on sock_fd, new connection on new_fd
struct addrinfo hints, *servinfo, *p;
struct sockaddr_storage their_addr; // connector's address information
socklen_t sin_size;
int yes = 1;
int rv;
#define PORT "3490" // the port users will be connecting to
#define BACKLOG 10 // how many pending connections queue will hold
memset(&hints, 0, sizeof hints);
hints.ai_family = AF_UNSPEC;
hints.ai_socktype = SOCK_STREAM;
hints.ai_flags = AI_PASSIVE; // use my IP
if ((rv = getaddrinfo(NULL, PORT, &hints, &servinfo)) != 0) {
fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(rv));
}
// loop through all the results and bind to the first we can
for (p = servinfo; p != NULL; p = p->ai_next) {
if ((sockfd = socket(p->ai_family, p->ai_socktype,
p->ai_protocol)) == -1) {
perror("server: socket");
continue;
}
if (setsockopt(sockfd, SOL_SOCKET, SO_REUSEADDR, (char*)&yes,
sizeof(int)) == -1) {
perror("setsockopt");
exit(1);
}
if (bind(sockfd, p->ai_addr, p->ai_addrlen) == -1) {
close(sockfd);
perror("server: bind");
continue;
}
break;
}
freeaddrinfo(servinfo); // all done with this structure
if (p == NULL) {
fprintf(stderr, "server: failed to bind\n");
exit(1);
}
if (listen(sockfd, BACKLOG) == -1) {
perror("listen");
exit(1);
}
fcntl(sockfd, F_SETFL, O_NONBLOCK);
fd_set read_set, write_set, except_set;
ZeroMemory(&read_set, sizeof(fd_set));
ZeroMemory(&write_set, sizeof(fd_set));
ZeroMemory(&except_set, sizeof(fd_set));
FD_SET(sockfd, &read_set);
struct timeval timeout;
timeout.tv_sec = 300;
timeout.tv_usec = 0;
int ret = select(sockfd, &read_set, &write_set, &except_set, &timeout);
new_fd = accept(sockfd, (struct sockaddr *)&their_addr, &sin_size);
}
};
}

View File

@ -19,11 +19,15 @@ void debug_initialize() {
}
void debug_done() {
fclose(log);
if (log)
fclose(log);
}
void write_log(const char *source_name, const char *function_name, int line_num, const char *fmt, ...) {
if (!log)
return;
va_list args;
fprintf(log,"\n%s:%s:%d: ", source_name, function_name, line_num);
va_start(args, fmt);

View File

@ -217,13 +217,14 @@ struct w32_io* socketio_socket(int domain, int type, int protocol) {
return pio;
}
#define SET_ERRNO_ON_ERROR(ret) \
#define SET_ERRNO_ON_ERROR(expr) \
do { \
if ((ret) == SOCKET_ERROR) { \
int ret = (expr); \
if (ret == SOCKET_ERROR) { \
errno = errno_from_WSALastError(); \
debug("ERROR:%d, io:%p", errno, pio); \
debug("ERROR:%d", errno); \
} \
return (ret); \
return ret; \
} while (0)
int socketio_setsockopt(struct w32_io* pio, int level, int optname, const char* optval, int optlen) {
@ -248,7 +249,7 @@ int socketio_listen(struct w32_io* pio, int backlog) {
}
int socketio_bind(struct w32_io* pio, const struct sockaddr *name, int namelen) {
SET_ERRNO_ON_ERROR(bind(pio->sock, name, namelen));
SET_ERRNO_ON_ERROR(bind(pio->sock, name, namelen));
}
int socketio_recv(struct w32_io* pio, void *buf, size_t len, int flags) {
@ -523,7 +524,7 @@ int socketio_close(struct w32_io* pio) {
struct w32_io* socketio_accept(struct w32_io* pio, struct sockaddr* addr, int* addrlen) {
struct w32_io *accept_io = NULL;
int iResult = 0;
struct acceptEx_context* context = (struct acceptEx_context*)pio->context;
struct acceptEx_context* context;
debug2("io:%p", pio);
//start io if not already started
@ -553,11 +554,22 @@ struct w32_io* socketio_accept(struct w32_io* pio, struct sockaddr* addr, int* a
}
context = (struct acceptEx_context*)pio->context;
pio->read_details.pending = FALSE;
ResetEvent(pio->read_overlapped.hEvent);
if (pio->read_details.error)
{
errno = errno_from_WSAError(pio->read_details.error);
debug("ERROR: async io completed with error: %d, io:%p", errno, pio);
goto on_error;
}
if (0 != setsockopt(context->accept_socket, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, (char*)&pio->sock, sizeof(pio->sock)))
{
errno = errno_from_WSALastError();
debug("ERROR: setsockopt failed:%d, io:%p", errno, pio);
return NULL;
goto on_error;
}
accept_io = (struct w32_io*)malloc(sizeof(struct w32_io));
@ -565,33 +577,58 @@ struct w32_io* socketio_accept(struct w32_io* pio, struct sockaddr* addr, int* a
{
errno = ENOMEM;
debug("ERROR:%d, io:%p", errno, pio);
return NULL;
goto on_error;
}
memset(accept_io, 0, sizeof(struct w32_io));
accept_io->sock = context->accept_socket;
accept_io->type = SOCK_FD;
context->accept_socket = INVALID_SOCKET;
pio->read_details.pending = FALSE;
ResetEvent(pio->read_overlapped.hEvent);
debug2("accept io:%p", accept_io);
//TODO : fill in addr
return accept_io;
on_error:
if (context->accept_socket != INVALID_SOCKET) {
closesocket(context->accept_socket);
context->accept_socket = INVALID_SOCKET;
}
return NULL;
}
int socketio_connectex(struct w32_io* pio, const struct sockaddr* name, int namelen) {
struct sockaddr_in tmp_addr;
struct sockaddr_in tmp_addr4;
struct sockaddr_in6 tmp_addr6;
SOCKADDR* tmp_addr;
size_t tmp_addr_len;
DWORD tmp_bytes;
GUID connectex_guid = WSAID_CONNECTEX;
LPFN_CONNECTEX ConnectEx;
//TODO - add support for DGRAM socket, below works only for STREAM sockets
ZeroMemory(&tmp_addr, sizeof(tmp_addr));
tmp_addr.sin_family = AF_UNSPEC;
tmp_addr.sin_addr.s_addr = INADDR_ANY;
tmp_addr.sin_port = 0;
if (SOCKET_ERROR == bind(pio->sock, (SOCKADDR*)&tmp_addr, sizeof(tmp_addr)))
if (name->sa_family == AF_INET6) {
ZeroMemory(&tmp_addr6, sizeof(tmp_addr6));
tmp_addr6.sin6_family = AF_INET6;
tmp_addr6.sin6_port = 0;
tmp_addr = (SOCKADDR*)&tmp_addr6;
tmp_addr_len = sizeof(tmp_addr6);
}
else if (name->sa_family == AF_INET) {
ZeroMemory(&tmp_addr4, sizeof(tmp_addr4));
tmp_addr4.sin_family = AF_INET;
tmp_addr4.sin_port = 0;
tmp_addr = (SOCKADDR*)&tmp_addr4;
tmp_addr_len = sizeof(tmp_addr4);
}
else {
errno = ENOTSUP;
debug("ERROR: unsuppored address family:%d, io:%p", name->sa_family, pio);
return -1;
}
if (SOCKET_ERROR == bind(pio->sock, tmp_addr, tmp_addr_len))
{
errno = errno_from_WSALastError();
debug("ERROR: bind failed :%d, io:%p", errno, pio);
@ -614,7 +651,7 @@ int socketio_connectex(struct w32_io* pio, const struct sockaddr* name, int name
return -1;
}
if (TRUE == ConnectEx(pio->sock, name, namelen, NULL, 0, pio->write_details.completed, &pio->write_overlapped))
if (TRUE == ConnectEx(pio->sock, name, namelen, NULL, 0, NULL, &pio->write_overlapped))
{
//set completion event
SetEvent(pio->write_overlapped.hEvent);
@ -632,7 +669,7 @@ int socketio_connectex(struct w32_io* pio, const struct sockaddr* name, int name
}
pio->write_details.pending = TRUE;
pio->type == CONNECT_FD;
pio->type = CONNECT_FD;
return 0;
}
@ -665,6 +702,16 @@ int socketio_connect(struct w32_io* pio, const struct sockaddr* name, int namele
}
//close event handle
CloseHandle(pio->write_overlapped.hEvent);
pio->write_overlapped.hEvent = 0;
if (pio->write_details.error) {
errno = errno_from_WSAError(pio->write_details.error);
debug("ERROR: async io completed with error: %d, io:%p", errno, pio);
return -1;
}
if (0 != setsockopt(pio->sock, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, NULL, 0))
{
errno = errno_from_WSALastError();
@ -672,9 +719,7 @@ int socketio_connect(struct w32_io* pio, const struct sockaddr* name, int namele
return NULL;
}
//close event handle
CloseHandle(pio->write_overlapped.hEvent);
pio->write_overlapped.hEvent = 0;
pio->type = SOCK_FD;
return 0;
}
@ -693,8 +738,11 @@ BOOL socketio_is_io_available(struct w32_io* pio, BOOL rd) {
}
else {
if (pending && WSAGetLastError() != WSA_IO_INCOMPLETE) {
//unexpected error;
debug("ERROR:Unxpected State. io:%p, WSAError:%d", pio, WSAGetLastError());
if (pio->type == LISTEN_FD)
pio->read_details.error = WSAGetLastError();
else
pio->write_details.error = WSAGetLastError();
return TRUE;
}
return FALSE;
}
@ -725,6 +773,10 @@ int socketio_on_select(struct w32_io* pio, BOOL rd) {
return -1;
return 0;
}
else if (pio->type == CONNECT_FD) {
//nothing to do for connect
return 0;
}
else if (rd) {
if (socketio_WSARecv(pio, NULL) != 0)
return -1;