This commit is contained in:
manojampalam 2016-02-28 22:37:36 -08:00
parent ef3f1691e8
commit 684470e9f2
6 changed files with 80 additions and 41 deletions

View File

@ -74,8 +74,8 @@ int socket_prepare(char* ip)
return 0; return 0;
} }
#define READ_BUf_SIZE 1024 * 1024 * 2 #define READ_BUf_SIZE 1024 * 100
#define WRITE_BUF_SIZE 1024 * 1024 * 5 #define WRITE_BUF_SIZE 1024 * 100
namespace UnitTests namespace UnitTests
{ {
@ -105,7 +105,7 @@ namespace UnitTests
TEST_METHOD(socketio) TEST_METHOD(socketio)
{ {
int ret; int ret;
ret = socket_prepare("::1"); ret = socket_prepare("127.0.0.1");
Assert::AreEqual(ret, 0, L"failed to prepare sockets", LINE_INFO()); Assert::AreEqual(ret, 0, L"failed to prepare sockets", LINE_INFO());
ret = connect(connect_fd, servinfo->ai_addr, servinfo->ai_addrlen); ret = connect(connect_fd, servinfo->ai_addr, servinfo->ai_addrlen);
@ -114,15 +114,18 @@ namespace UnitTests
accept_fd = accept(listen_fd, NULL, NULL); accept_fd = accept(listen_fd, NULL, NULL);
Assert::AreNotEqual(accept_fd, -1, L"", LINE_INFO()); Assert::AreNotEqual(accept_fd, -1, L"", LINE_INFO());
//close(listen_fd);
//listen_fd = -1;
int c = connect_fd; int c = connect_fd;
int s = accept_fd; int s = accept_fd;
set_nonblock(c); set_nonblock(c);
set_nonblock(s); set_nonblock(s);
char *to_write = (char*)malloc(WRITE_BUF_SIZE); //5MB char *to_write = (char*)malloc(WRITE_BUF_SIZE);
char *read_to = (char*)malloc(READ_BUf_SIZE); //2MB char *read_to = (char*)malloc(READ_BUf_SIZE);
//write from c, read from s //write from c, read from s
fd_set read_set; fd_set read_set;
@ -144,24 +147,37 @@ namespace UnitTests
BOOL write_ready = FD_ISSET(c, &write_set); BOOL write_ready = FD_ISSET(c, &write_set);
FD_ZERO(&read_set); FD_ZERO(&read_set);
FD_ZERO(&write_set); FD_ZERO(&write_set);
FD_SET(s, &read_set);
if (bytes_written > WRITE_BUF_SIZE * 100) if (write_ready)
{
#define WR_LIMIT WRITE_BUF_SIZE*5
int bw = 0;// send(c, to_write, WRITE_BUF_SIZE, 0);
while ((bw != -1) && (bytes_written < WR_LIMIT)) {
bw = send(c, to_write, WRITE_BUF_SIZE, 0);
if (bw > 0)
bytes_written += bw;
else {
ret = errno;
Assert::AreEqual(errno, EAGAIN, L"", LINE_INFO());
}
}
if (bytes_written >= WR_LIMIT)
{ {
ret = shutdown(c, SD_SEND), 0, L"", LINE_INFO(); ret = shutdown(c, SD_SEND), 0, L"", LINE_INFO();
Assert::AreEqual(ret, 0, L"", LINE_INFO()); Assert::AreEqual(ret, 0, L"", LINE_INFO());
} }
else if (write_ready) else
{
int bw = send(c, to_write, WRITE_BUF_SIZE, 0);
while (bw != -1) {
bytes_written += bw;
bw = send(c, to_write, WRITE_BUF_SIZE, 0);
}
ret = errno;
Assert::AreEqual(ret, EAGAIN, L"", LINE_INFO());
FD_SET(c, &write_set); FD_SET(c, &write_set);
} }
if (read_ready) if (read_ready)
{ {
int br = read(s, read_to, READ_BUf_SIZE); int br = read(s, read_to, READ_BUf_SIZE);
@ -173,8 +189,7 @@ namespace UnitTests
if (br == 0) //send from other side is done if (br == 0) //send from other side is done
break; break;
ret = errno; ret = errno;
Assert::AreEqual(ret, EAGAIN, L"", LINE_INFO()); Assert::AreEqual(errno, EAGAIN, L"", LINE_INFO());
FD_SET(s, &read_set);
} }
} }

View File

@ -14,7 +14,7 @@ void debug_initialize() {
GetLocalTime(&time); GetLocalTime(&time);
sprintf(filename + len, "_%d_%d_%d.log", time.wHour, time.wMinute, time.wSecond); sprintf(filename + len, "_%d_%d_%d.log", time.wHour, time.wMinute, time.wSecond);
//sprintf(filename, "%s", "e:\\tmp.log");
log = fopen(filename, "w"); log = fopen(filename, "w");
} }
@ -29,9 +29,9 @@ void write_log(const char *source_name, const char *function_name, int line_num,
return; return;
va_list args; va_list args;
fprintf(log,"\n%s:%s:%d: ", source_name, function_name, line_num); fprintf(log,"\n%s:%d: ", function_name, line_num);
va_start(args, fmt); va_start(args, fmt);
fprintf(log,fmt, args); vfprintf(log,fmt, args);
va_end(args); va_end(args);
fflush(log); fflush(log);
} }

View File

@ -2,6 +2,7 @@
#define debug(cformat, ...) write_log(__FILE__, __FUNCTION__, __LINE__, cformat, __VA_ARGS__) #define debug(cformat, ...) write_log(__FILE__, __FUNCTION__, __LINE__, cformat, __VA_ARGS__)
#define debug2(cformat, ...) write_log(__FILE__, __FUNCTION__, __LINE__, cformat, __VA_ARGS__) #define debug2(cformat, ...) write_log(__FILE__, __FUNCTION__, __LINE__, cformat, __VA_ARGS__)
#define debug3(cformat, ...) write_log(__FILE__, __FUNCTION__, __LINE__, cformat, __VA_ARGS__)
void debug_initialize(); void debug_initialize();
void debug_done(); void debug_done();

View File

@ -134,7 +134,7 @@ void CALLBACK WSARecvCompletionRoutine(
{ {
struct w32_io* pio = (struct w32_io*)((char*)lpOverlapped - offsetof(struct w32_io, read_overlapped)); struct w32_io* pio = (struct w32_io*)((char*)lpOverlapped - offsetof(struct w32_io, read_overlapped));
debug2("io:%p, pending_state:%d, remaining:%d, completed:%d, error:%d, transferred:%d", debug2("io:%p, pending_state:%d, remaining:%d, completed:%d, error:%d, transferred:%d",
pio, pio->read_details.pending, pio->read_details.remaining, pio->read_details.pending, dwError, cbTransferred); pio, pio->read_details.pending, pio->read_details.remaining, pio->read_details.completed, dwError, cbTransferred);
if (!dwError && !cbTransferred) if (!dwError && !cbTransferred)
dwError = ERROR_GRACEFUL_DISCONNECT; dwError = ERROR_GRACEFUL_DISCONNECT;
pio->read_details.error = dwError; pio->read_details.error = dwError;
@ -176,7 +176,7 @@ int socketio_WSARecv(struct w32_io* pio, BOOL* completed) {
{ {
pio->read_details.pending = TRUE; pio->read_details.pending = TRUE;
//receive has completed but APC is pending to be scheduled //receive has completed but APC is pending to be scheduled
debug2("WSARecv immediate completion, io:%p", pio); debug2("WSARecv returned 0, io:%p", pio);
if (completed) if (completed)
*completed = TRUE; *completed = TRUE;
} }
@ -287,6 +287,7 @@ int socketio_recv(struct w32_io* pio, void *buf, size_t len, int flags) {
return num_bytes_copied; return num_bytes_copied;
} }
//if there was an error on async call, return //if there was an error on async call, return
if (pio->read_details.error) { if (pio->read_details.error) {
if (pio->read_details.error == ERROR_GRACEFUL_DISCONNECT) { if (pio->read_details.error == ERROR_GRACEFUL_DISCONNECT) {
@ -307,6 +308,7 @@ int socketio_recv(struct w32_io* pio, void *buf, size_t len, int flags) {
if (completed) { if (completed) {
//Let APC be scheduled //Let APC be scheduled
debug2("Letting APC to execute");
SleepEx(1, TRUE); SleepEx(1, TRUE);
if (pio->read_details.pending) { if (pio->read_details.pending) {
//this shouldn't be happening //this shouldn't be happening
@ -319,6 +321,7 @@ int socketio_recv(struct w32_io* pio, void *buf, size_t len, int flags) {
if (w32_io_is_blocking(pio)) if (w32_io_is_blocking(pio))
{ {
//wait until io is done //wait until io is done
debug3("socket in blocking mode");
while (socketio_is_io_available(pio, TRUE) == FALSE) { while (socketio_is_io_available(pio, TRUE) == FALSE) {
if (0 != wait_for_any_event(NULL, 0, INFINITE)) if (0 != wait_for_any_event(NULL, 0, INFINITE))
return -1; return -1;
@ -373,7 +376,7 @@ void CALLBACK WSASendCompletionRoutine(
) )
{ {
struct w32_io* pio = (struct w32_io*)((char*)lpOverlapped - offsetof(struct w32_io, write_overlapped)); struct w32_io* pio = (struct w32_io*)((char*)lpOverlapped - offsetof(struct w32_io, write_overlapped));
debug2("io:%p, pending_state:%d, error:%d, transferred:%d, remaining:%d", pio, pio->write_details.pending, dwError, cbTransferred, pio->write_details.remaining); debug2("io:%p, pending_state:%d, error:%d, transferred:%d of remaining:%d", pio, pio->write_details.pending, dwError, cbTransferred, pio->write_details.remaining);
pio->write_details.error = dwError; pio->write_details.error = dwError;
//assert that remaining == cbTransferred //assert that remaining == cbTransferred
pio->write_details.remaining -= cbTransferred; pio->write_details.remaining -= cbTransferred;
@ -413,7 +416,7 @@ int socketio_send(struct w32_io* pio, const void *buf, size_t len, int flags) {
} }
else { else {
errno = EAGAIN; errno = EAGAIN;
debug2("IO pending, io:%p", pio); debug2("IO currently pending, EAGAIN, io:%p", pio);
return -1; return -1;
} }
} }
@ -453,7 +456,7 @@ int socketio_send(struct w32_io* pio, const void *buf, size_t len, int flags) {
if (ret == 0) if (ret == 0)
{ {
//send has completed and APC is scheduled, let it run //send has completed and APC is scheduled, let it run
debug2("WSASend immediate completion, io:%p", pio); debug2("WSASend returned 0, APC scheduled io:%p", pio);
pio->write_details.pending = TRUE; pio->write_details.pending = TRUE;
pio->write_details.remaining = wsabuf.len; pio->write_details.remaining = wsabuf.len;
SleepEx(1, TRUE); SleepEx(1, TRUE);
@ -470,16 +473,18 @@ int socketio_send(struct w32_io* pio, const void *buf, size_t len, int flags) {
if (WSAGetLastError() == WSA_IO_PENDING) if (WSAGetLastError() == WSA_IO_PENDING)
{ {
//io is initiated and pending //io is initiated and pending
debug2("IO pending, io:%p", pio); debug2("WSASend reported IO pending, io:%p", pio);
pio->write_details.pending = TRUE; pio->write_details.pending = TRUE;
pio->write_details.remaining = wsabuf.len; pio->write_details.remaining = wsabuf.len;
if (w32_io_is_blocking(pio)) if (w32_io_is_blocking(pio))
{ {
//wait until io is done //wait until io is done
debug3("waiting as socket is in blocking mode, io:%p", pio);
while (pio->write_details.pending) while (pio->write_details.pending)
SleepEx(INFINITE, TRUE); SleepEx(INFINITE, TRUE);
} }
debug3("returning %d", wsabuf.len);
return wsabuf.len; return wsabuf.len;
} }
else { //failed else { //failed
@ -513,9 +518,10 @@ int socketio_close(struct w32_io* pio) {
closesocket(ctx->accept_socket); closesocket(ctx->accept_socket);
if (ctx->lpOutputBuf) if (ctx->lpOutputBuf)
free(ctx->lpOutputBuf); free(ctx->lpOutputBuf);
free(pio->context); //TODO: Fix this. Freeing this is crashing
//free(pio->context);
} }
//TODO: cleanup other details in pio->context
} }
else if (pio->type == CONNECT_FD) { else if (pio->type == CONNECT_FD) {
if (pio->write_overlapped.hEvent) if (pio->write_overlapped.hEvent)
@ -723,9 +729,8 @@ int socketio_connect(struct w32_io* pio, const struct sockaddr* name, int namele
return -1; return -1;
} }
if (0 != setsockopt(pio->sock, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, NULL, 0)) if (0 != setsockopt(pio->sock, SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, NULL, 0))
{ {
int i = WSAGetLastError();
errno = errno_from_WSALastError(); errno = errno_from_WSALastError();
debug("ERROR: setsockopt failed:%d, io:%p", errno, pio); debug("ERROR: setsockopt failed:%d, io:%p", errno, pio);
return -1; return -1;
@ -774,7 +779,7 @@ BOOL socketio_is_io_available(struct w32_io* pio, BOOL rd) {
int socketio_on_select(struct w32_io* pio, BOOL rd) { int socketio_on_select(struct w32_io* pio, BOOL rd) {
debug2("io:%p", pio); debug2("io:%p type:%d", pio, pio->type);
if (rd && pio->read_details.pending) if (rd && pio->read_details.pending)
return 0; return 0;

View File

@ -117,12 +117,13 @@ int w32_socket(int domain, int type, int protocol) {
} }
fd_table_set(pio, min_index); fd_table_set(pio, min_index);
debug("socket:%p, io:%p, fd:%d ", pio->sock, pio, min_index); debug("socket:%d, io:%p, fd:%d ", pio->sock, pio, min_index);
return min_index; return min_index;
} }
int w32_accept(int fd, struct sockaddr* addr, int* addrlen) int w32_accept(int fd, struct sockaddr* addr, int* addrlen)
{ {
debug3("fd:%d", fd);
int min_index = fd_table_get_min_index(); int min_index = fd_table_get_min_index();
struct w32_io* pio = NULL; struct w32_io* pio = NULL;
@ -135,7 +136,7 @@ int w32_accept(int fd, struct sockaddr* addr, int* addrlen)
} }
fd_table_set(pio, min_index); fd_table_set(pio, min_index);
debug("socket:%p, io:%p, fd:%d ", pio->sock, pio, min_index); debug("socket:%d, io:%p, fd:%d ", pio->sock, pio, min_index);
return min_index; return min_index;
} }
@ -149,52 +150,62 @@ do { \
} while (0) } while (0)
int w32_setsockopt(int fd, int level, int optname, const char* optval, int optlen) { int w32_setsockopt(int fd, int level, int optname, const char* optval, int optlen) {
debug3("fd:%d", fd);
CHECK_FD(fd); CHECK_FD(fd);
return socketio_setsockopt(fd_table.w32_ios[fd], level, optname, optval, optlen); return socketio_setsockopt(fd_table.w32_ios[fd], level, optname, optval, optlen);
} }
int w32_getsockopt(int fd, int level, int optname, char* optval, int* optlen) { int w32_getsockopt(int fd, int level, int optname, char* optval, int* optlen) {
debug3("fd:%d", fd);
CHECK_FD(fd); CHECK_FD(fd);
return socketio_getsockopt(fd_table.w32_ios[fd], level, optname, optval, optlen); return socketio_getsockopt(fd_table.w32_ios[fd], level, optname, optval, optlen);
} }
int w32_getsockname(int fd, struct sockaddr* name, int* namelen) { int w32_getsockname(int fd, struct sockaddr* name, int* namelen) {
debug3("fd:%d", fd);
CHECK_FD(fd); CHECK_FD(fd);
return socketio_getsockname(fd_table.w32_ios[fd], name, namelen); return socketio_getsockname(fd_table.w32_ios[fd], name, namelen);
} }
int w32_getpeername(int fd, struct sockaddr* name, int* namelen) { int w32_getpeername(int fd, struct sockaddr* name, int* namelen) {
debug3("fd:%d", fd);
CHECK_FD(fd); CHECK_FD(fd);
return socketio_getpeername(fd_table.w32_ios[fd], name, namelen); return socketio_getpeername(fd_table.w32_ios[fd], name, namelen);
} }
int w32_listen(int fd, int backlog) { int w32_listen(int fd, int backlog) {
debug3("fd:%d", fd);
CHECK_FD(fd); CHECK_FD(fd);
return socketio_listen(fd_table.w32_ios[fd], backlog); return socketio_listen(fd_table.w32_ios[fd], backlog);
} }
int w32_bind(int fd, const struct sockaddr *name, int namelen) { int w32_bind(int fd, const struct sockaddr *name, int namelen) {
debug3("fd:%d", fd);
CHECK_FD(fd); CHECK_FD(fd);
return socketio_bind(fd_table.w32_ios[fd], name, namelen); return socketio_bind(fd_table.w32_ios[fd], name, namelen);
} }
int w32_connect(int fd, const struct sockaddr* name, int namelen) { int w32_connect(int fd, const struct sockaddr* name, int namelen) {
debug3("fd:%d", fd);
CHECK_FD(fd); CHECK_FD(fd);
return socketio_connect(fd_table.w32_ios[fd], name, namelen); return socketio_connect(fd_table.w32_ios[fd], name, namelen);
} }
int w32_recv(int fd, void *buf, size_t len, int flags) { int w32_recv(int fd, void *buf, size_t len, int flags) {
debug3("fd:%d", fd);
CHECK_FD(fd); CHECK_FD(fd);
return socketio_recv(fd_table.w32_ios[fd], buf, len, flags); return socketio_recv(fd_table.w32_ios[fd], buf, len, flags);
} }
int w32_send(int fd, const void *buf, size_t len, int flags) { int w32_send(int fd, const void *buf, size_t len, int flags) {
debug3("fd:%d", fd);
CHECK_FD(fd); CHECK_FD(fd);
return socketio_send(fd_table.w32_ios[fd], buf, len, flags); return socketio_send(fd_table.w32_ios[fd], buf, len, flags);
} }
int w32_shutdown(int fd, int how) { int w32_shutdown(int fd, int how) {
debug3("fd:%d how:%d", fd, how);
CHECK_FD(fd); CHECK_FD(fd);
return socketio_shutdown(fd_table.w32_ios[fd], how); return socketio_shutdown(fd_table.w32_ios[fd], how);
} }
@ -246,11 +257,15 @@ int w32_open(const char *pathname, int flags, ...) {
int w32_read(int fd, void *dst, unsigned int max) { int w32_read(int fd, void *dst, unsigned int max) {
CHECK_FD(fd); CHECK_FD(fd);
if (fd_table.w32_ios[fd]->type == SOCK_FD)
return socketio_recv(fd_table.w32_ios[fd], dst, max, 0);
return fileio_read(fd_table.w32_ios[fd], dst, max); return fileio_read(fd_table.w32_ios[fd], dst, max);
} }
int w32_write(int fd, const void *buf, unsigned int max) { int w32_write(int fd, const void *buf, unsigned int max) {
CHECK_FD(fd); CHECK_FD(fd);
if (fd_table.w32_ios[fd]->type == SOCK_FD)
return socketio_send(fd_table.w32_ios[fd], buf, max, 0);
return fileio_write(fd_table.w32_ios[fd], buf, max); return fileio_write(fd_table.w32_ios[fd], buf, max);
} }

View File

@ -1,16 +1,19 @@
#pragma once
#include <Windows.h> #include <Windows.h>
#include <stdio.h> #include <stdio.h>
#include "debug.h" #include "debug.h"
//order to be maintained //order to be maintained
enum w32_io_type { enum w32_io_type {
UNKOWN_FD = 0, UNKOWN_FD = 0,
LISTEN_FD, LISTEN_FD = 1,
CONNECT_FD, CONNECT_FD = 2,
SOCK_FD, SOCK_FD = 3,
FILE_FD, FILE_FD = 4,
PIPE_FD, PIPE_FD = 5,
CONSOLE_FD CONSOLE_FD = 6
}; };
struct w32_io { struct w32_io {