This commit is contained in:
manojampalam 2016-02-27 18:05:20 -08:00
parent d42ef60f9c
commit 3e61818f19
2 changed files with 200 additions and 99 deletions

View File

@ -14,25 +14,25 @@
static int errno_from_WSAError(int wsaerrno)
{
if (wsaerrno == WSAEWOULDBLOCK)
{
return EAGAIN;
}
if (wsaerrno == WSAEWOULDBLOCK)
{
return EAGAIN;
}
if (wsaerrno == WSAEFAULT)
{
return EFAULT;
}
if (wsaerrno == WSAEFAULT)
{
return EFAULT;
}
if (wsaerrno == WSAEINVAL)
{
return EINVAL;
}
if (wsaerrno == WSAEINVAL)
{
return EINVAL;
}
return wsaerrno;
return wsaerrno;
}
int socketio_initialize() {
int socketio_initialize() {
WSADATA wsaData = { 0 };
return WSAStartup(MAKEWORD(2, 2), &wsaData);
}
@ -60,18 +60,18 @@ int socketio_acceptEx(struct w32_io* pio) {
context = (struct acceptEx_context*)malloc(sizeof(struct acceptEx_context));
if (context == NULL) {
errno = ENOMEM;
debug("ERROR: %d", errno);
debug("ERROR:%d, io:%p", errno, pio);
return -1;
}
if (SOCKET_ERROR == WSAIoctl(pio->sock, SIO_GET_EXTENSION_FUNCTION_POINTER,
&GuidAcceptEx, sizeof (GuidAcceptEx),
&context->lpfnAcceptEx, sizeof (context->lpfnAcceptEx),
&GuidAcceptEx, sizeof(GuidAcceptEx),
&context->lpfnAcceptEx, sizeof(context->lpfnAcceptEx),
&dwBytes, NULL, NULL))
{
free(context);
errno = errno_from_WSALastError();
debug("ERROR: %d", errno);
debug("ERROR:%d, io:%p", errno, pio);
return -1;
}
@ -85,7 +85,7 @@ int socketio_acceptEx(struct w32_io* pio) {
if (pio->read_overlapped.hEvent == NULL) {
if ((pio->read_overlapped.hEvent = CreateEvent(NULL, TRUE, FALSE, NULL)) == NULL) {
errno = ENOMEM;
debug("ERROR: %d", errno);
debug("ERROR:%d, io:%p", errno, pio);
return -1;
}
}
@ -96,7 +96,7 @@ int socketio_acceptEx(struct w32_io* pio) {
context->accept_socket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (context->accept_socket == INVALID_SOCKET) {
errno = errno_from_WSALastError();
debug("ERROR: %d", errno);
debug("ERROR:%d, io:%p", errno, pio);
return -1;
}
@ -116,7 +116,7 @@ int socketio_acceptEx(struct w32_io* pio) {
//if overlapped io is in progress, we are good
if (WSAGetLastError() != ERROR_IO_PENDING) {
errno = errno_from_WSALastError();
debug("ERROR: %d", errno);
debug("ERROR:%d, io:%p", errno, pio);
return -1;
}
}
@ -133,7 +133,7 @@ void CALLBACK WSARecvCompletionRoutine(
)
{
struct w32_io* pio = (struct w32_io*)((char*)lpOverlapped - offsetof(struct w32_io, read_overlapped));
debug2("pio:%p, pending_state:%d, remaining:%d, completed:%d, error:%d, transferred: %d",
debug2("io:%p, pending_state:%d, remaining:%d, completed:%d, error:%d, transferred:%d",
pio, pio->read_details.pending, pio->read_details.remaining, pio->read_details.pending, dwError, cbTransferred);
if (!dwError && !cbTransferred)
dwError = ERROR_GRACEFUL_DISCONNECT;
@ -160,7 +160,7 @@ int socketio_WSARecv(struct w32_io* pio, BOOL* completed) {
if (!wsabuf.buf)
{
errno = ENOMEM;
debug("ERROR: %d", errno);
debug("ERROR:%d, io:%p", errno, pio);
return -1;
}
@ -176,7 +176,7 @@ int socketio_WSARecv(struct w32_io* pio, BOOL* completed) {
{
pio->read_details.pending = TRUE;
//receive has completed but APC is pending to be scheduled
debug2("WSARecv immediate completion");
debug2("WSARecv immediate completion, io:%p", pio);
if (completed)
*completed = TRUE;
}
@ -188,7 +188,7 @@ int socketio_WSARecv(struct w32_io* pio, BOOL* completed) {
}
else { //failed
errno = errno_from_WSALastError();
debug("ERROR:%d", errno);
debug("ERROR: io:%p %d", pio, errno);
return -1;
}
}
@ -197,31 +197,31 @@ int socketio_WSARecv(struct w32_io* pio, BOOL* completed) {
}
struct w32_io* socketio_socket(int domain, int type, int protocol) {
struct w32_io *pio = (struct w32_io*)malloc(sizeof(struct w32_io));
if (!pio) {
errno = ENOMEM;
debug("ERROR:%d", errno);
return NULL;
}
struct w32_io *pio = (struct w32_io*)malloc(sizeof(struct w32_io));
if (!pio) {
errno = ENOMEM;
debug("ERROR:%d, io:%p", errno, pio);
return NULL;
}
memset(pio, 0, sizeof(struct w32_io));
pio->sock = socket(domain, type, protocol);
if (pio->sock == INVALID_SOCKET) {
errno = errno_from_WSALastError();
free(pio);
debug("ERROR:%d", errno);
return NULL;
}
memset(pio, 0, sizeof(struct w32_io));
pio->sock = socket(domain, type, protocol);
if (pio->sock == INVALID_SOCKET) {
errno = errno_from_WSALastError();
free(pio);
debug("ERROR:%d, io:%p", errno, pio);
return NULL;
}
pio->type = SOCK_FD;
return pio;
return pio;
}
#define SET_ERRNO_ON_ERROR(ret) \
do { \
if ((ret) == SOCKET_ERROR) { \
errno = errno_from_WSALastError(); \
debug("ERROR: %d", errno); \
debug("ERROR:%d, io:%p", errno, pio); \
} \
return (ret); \
} while (0)
@ -251,31 +251,27 @@ int socketio_bind(struct w32_io* pio, const struct sockaddr *name, int namelen)
SET_ERRNO_ON_ERROR(bind(pio->sock, name, namelen));
}
int socketio_connect(struct w32_io* pio, const struct sockaddr* name, int namelen) {
SET_ERRNO_ON_ERROR(connect(pio->sock, name, namelen));
}
int socketio_recv(struct w32_io* pio, void *buf, size_t len, int flags) {
BOOL completed = FALSE;
debug2("pio: %p", pio);
debug2("io:%p", pio);
if ((buf == NULL) || (len == 0)){
if ((buf == NULL) || (len == 0)) {
errno = EINVAL;
debug("ERROR, buf:%p, len:%d", buf, len);
debug("ERROR, buf:%p, len:%d, io:%p", buf, len, pio);
return -1;
}
if (flags != 0) {
errno = ENOTSUP;
debug("ERROR: flags are not currently supported");
debug("ERROR: flags are not currently supported, io:%p", pio);
return -1;
}
//if io is already pending
if (pio->read_details.pending) {
errno = EAGAIN;
debug2("Read is already pending");
debug2("Read is already pending, io:%p", pio);
return -1;
}
@ -286,21 +282,21 @@ int socketio_recv(struct w32_io* pio, void *buf, size_t len, int flags) {
memcpy(buf, pio->read_details.buf + pio->read_details.completed, num_bytes_copied);
pio->read_details.remaining -= num_bytes_copied;
pio->read_details.completed += num_bytes_copied;
debug2("returning %d bytes from prior completed IO, remaining: %d", num_bytes_copied, pio->read_details.remaining);
debug2("returning %d bytes from prior completed IO, remaining:%d, io:%p", num_bytes_copied, pio->read_details.remaining, pio);
return num_bytes_copied;
}
//if there was an error on async call, return
if (pio->read_details.error) {
if (pio->read_details.error == ERROR_GRACEFUL_DISCONNECT) {
debug2("connection closed");
debug2("connection closed, io:%p", pio);
//connection is closed
return 0;
}
else {
errno = errno_from_WSAError(pio->read_details.error);
pio->read_details.error = 0;
debug("ERROR: %d", errno);
debug("ERROR:%d, io:%p", errno, pio);
return -1;
}
}
@ -314,7 +310,7 @@ int socketio_recv(struct w32_io* pio, void *buf, size_t len, int flags) {
if (pio->read_details.pending) {
//this shouldn't be happening
errno = EOTHER;
debug("ERROR: Unexpected IO state");
debug("ERROR: Unexpected IO state, io:%p", pio);
return -1;
}
}
@ -330,7 +326,7 @@ int socketio_recv(struct w32_io* pio, void *buf, size_t len, int flags) {
else {
if (socketio_is_io_available(pio, TRUE) == FALSE) {
errno = EAGAIN;
debug2("IO is pending");
debug2("IO is pending, io:%p", pio);
return -1;
}
}
@ -340,13 +336,13 @@ int socketio_recv(struct w32_io* pio, void *buf, size_t len, int flags) {
{
if (pio->read_details.error == ERROR_GRACEFUL_DISCONNECT) {
//connection is closed
debug2("connection closed");
debug2("connection closed, io:%p", pio);
return 0;
}
else {
errno = errno_from_WSAError(pio->read_details.error);
pio->read_details.error = 0;
debug("ERROR:%d", errno);
debug("ERROR:%d, io:%p", errno, pio);
return -1;
}
}
@ -356,13 +352,13 @@ int socketio_recv(struct w32_io* pio, void *buf, size_t len, int flags) {
memcpy(buf, pio->read_details.buf, num_bytes_copied);
pio->read_details.remaining -= num_bytes_copied;
pio->read_details.completed = num_bytes_copied;
debug2("returning %d bytes from completed IO, remaining: %d", num_bytes_copied, pio->read_details.remaining);
debug2("returning %d bytes from completed IO, remaining:%d, io:%p", num_bytes_copied, pio->read_details.remaining, pio);
return num_bytes_copied;
}
else {
//this should not happen
errno = EOTHER;
debug("ERROR:Unexpected IO stated");
debug("ERROR:Unexpected IO stated, io:%p", pio);
return -1;
}
@ -376,7 +372,7 @@ void CALLBACK WSASendCompletionRoutine(
)
{
struct w32_io* pio = (struct w32_io*)((char*)lpOverlapped - offsetof(struct w32_io, write_overlapped));
debug2("pio: %p, pending_state:%d, error:%d, transferred:%d, remaining: %d", pio, pio->write_details.pending, dwError, cbTransferred, pio->write_details.remaining);
debug2("io:%p, pending_state:%d, error:%d, transferred:%d, remaining:%d", pio, pio->write_details.pending, dwError, cbTransferred, pio->write_details.remaining);
pio->write_details.error = dwError;
//assert that remaining == cbTransferred
pio->write_details.remaining -= cbTransferred;
@ -387,17 +383,17 @@ int socketio_send(struct w32_io* pio, const void *buf, size_t len, int flags) {
int ret = 0;
WSABUF wsabuf;
debug2("pio: %p", pio);
debug2("io:%p", pio);
if ((buf == NULL) || (len == 0)){
if ((buf == NULL) || (len == 0)) {
errno = EINVAL;
debug("ERROR, buf:%p, len:%d", buf, len);
debug("ERROR, buf:%p, len:%d, io:%p", buf, len, pio);
return -1;
}
if (flags != 0) {
errno = ENOTSUP;
debug("ERROR: flags are not currently supported");
debug("ERROR: flags are not currently supported, io:%p", pio);
return -1;
}
@ -408,7 +404,7 @@ int socketio_send(struct w32_io* pio, const void *buf, size_t len, int flags) {
{
//this covers the scenario when the fd was previously non blocking (and hence io is still pending)
//wait for previous io to complete
debug2("waiting for IO on a previous nonblocking send to complete");
debug2("waiting for IO on a previous nonblocking send to complete, io:%p", pio);
while (pio->write_details.pending) {
if (wait_for_any_event(NULL, 0, INFINITE) == -1)
return -1;
@ -416,7 +412,7 @@ int socketio_send(struct w32_io* pio, const void *buf, size_t len, int flags) {
}
else {
errno = EAGAIN;
debug2("IO pending");
debug2("IO pending, io:%p", pio);
return -1;
}
}
@ -424,7 +420,7 @@ int socketio_send(struct w32_io* pio, const void *buf, size_t len, int flags) {
if (pio->write_details.error) {
errno = errno_from_WSAError(pio->write_details.error);
debug("ERROR:%d", errno);
debug("ERROR:%d, io:%p", errno, pio);
return -1;
}
@ -436,7 +432,7 @@ int socketio_send(struct w32_io* pio, const void *buf, size_t len, int flags) {
if (!wsabuf.buf)
{
errno = ENOMEM;
debug("ERROR:%d", errno);
debug("ERROR:%d, io:%p", errno, pio);
return -1;
}
@ -456,13 +452,13 @@ int socketio_send(struct w32_io* pio, const void *buf, size_t len, int flags) {
if (ret == 0)
{
//send has completed and APC is scheduled, let it run
debug2("WSASend immediate completion");
debug2("WSASend immediate completion, io:%p", pio);
pio->write_details.pending = TRUE;
pio->write_details.remaining = wsabuf.len;
SleepEx(1, TRUE);
if ((pio->write_details.pending) || (pio->write_details.remaining != 0)) {
errno = EOTHER;
debug("ERROR: Unexpected IO state");
debug("ERROR: Unexpected IO state, io:%p", pio);
return -1;
}
@ -473,7 +469,7 @@ int socketio_send(struct w32_io* pio, const void *buf, size_t len, int flags) {
if (WSAGetLastError() == WSA_IO_PENDING)
{
//io is initiated and pending
debug2("IO pending");
debug2("IO pending, io:%p", pio);
pio->write_details.pending = TRUE;
pio->write_details.remaining = wsabuf.len;
if (w32_io_is_blocking(pio))
@ -487,7 +483,7 @@ int socketio_send(struct w32_io* pio, const void *buf, size_t len, int flags) {
}
else { //failed
errno = errno_from_WSALastError();
debug("ERROR:%d", errno);
debug("ERROR:%d, io:%p", errno, pio);
return -1;
}
}
@ -500,12 +496,12 @@ int socketio_shutdown(struct w32_io* pio, int how) {
}
int socketio_close(struct w32_io* pio) {
debug2("pio: %p", pio);
debug2("io:%p", pio);
closesocket(pio->sock);
//wait for pending io to abort
SleepEx(0, TRUE);
if (pio->read_details.pending || pio->write_details.pending)
debug2("IO is still pending on closed socket. read:%d, write:%d", pio->read_details.pending, pio->write_details.pending);
debug2("IO is still pending on closed socket. read:%d, write:%d, io:%p", pio->read_details.pending, pio->write_details.pending, pio);
if (pio->type == LISTEN_FD) {
if (pio->read_overlapped.hEvent)
CloseHandle(pio->read_overlapped.hEvent);
@ -529,7 +525,7 @@ struct w32_io* socketio_accept(struct w32_io* pio, struct sockaddr* addr, int* a
int iResult = 0;
struct acceptEx_context* context = (struct acceptEx_context*)pio->context;
debug2("pio:%p", pio);
debug2("io:%p", pio);
//start io if not already started
if (pio->read_details.pending == FALSE) {
if (socketio_acceptEx(pio) != 0) {
@ -551,7 +547,7 @@ struct w32_io* socketio_accept(struct w32_io* pio, struct sockaddr* addr, int* a
//if i/o is not ready
if (FALSE == socketio_is_io_available(pio, TRUE)) {
errno = EAGAIN;
debug2("accept is pending");
debug2("accept is pending, io:%p", pio);
return NULL;
}
@ -560,7 +556,7 @@ struct w32_io* socketio_accept(struct w32_io* pio, struct sockaddr* addr, int* a
if (0 != setsockopt(context->accept_socket, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, (char*)&pio->sock, sizeof(pio->sock)))
{
errno = errno_from_WSALastError();
debug("ERROR: setsockopt failed: %d", errno);
debug("ERROR: setsockopt failed:%d, io:%p", errno, pio);
return NULL;
}
@ -568,7 +564,7 @@ struct w32_io* socketio_accept(struct w32_io* pio, struct sockaddr* addr, int* a
if (!accept_io)
{
errno = ENOMEM;
debug("ERROR:%d", errno);
debug("ERROR:%d, io:%p", errno, pio);
return NULL;
}
memset(accept_io, 0, sizeof(struct w32_io));
@ -582,38 +578,142 @@ struct w32_io* socketio_accept(struct w32_io* pio, struct sockaddr* addr, int* a
return accept_io;
}
int socketio_connectex(struct w32_io* pio, const struct sockaddr* name, int namelen) {
struct sockaddr_in tmp_addr;
DWORD tmp_bytes;
GUID connectex_guid = WSAID_CONNECTEX;
LPFN_CONNECTEX ConnectEx;
//TODO - add support for DGRAM socket, below works only for STREAM sockets
ZeroMemory(&tmp_addr, sizeof(tmp_addr));
tmp_addr.sin_family = AF_UNSPEC;
tmp_addr.sin_addr.s_addr = INADDR_ANY;
tmp_addr.sin_port = 0;
if (SOCKET_ERROR == bind(pio->sock, (SOCKADDR*)&tmp_addr, sizeof(tmp_addr)))
{
errno = errno_from_WSALastError();
debug("ERROR: bind failed :%d, io:%p", errno, pio);
return -1;
}
if (SOCKET_ERROR == WSAIoctl(pio->sock, SIO_GET_EXTENSION_FUNCTION_POINTER,
&connectex_guid, sizeof(connectex_guid),
&ConnectEx, sizeof(ConnectEx),
&tmp_bytes, NULL, NULL))
{
errno = errno_from_WSALastError();
debug("ERROR:%d, io:%p", errno, pio);
return -1;
}
if ((pio->write_overlapped.hEvent = CreateEvent(NULL, TRUE, FALSE, NULL)) == NULL) {
errno = ENOMEM;
debug("ERROR:%d, io:%p", errno, pio);
return -1;
}
if (TRUE == ConnectEx(pio->sock, name, namelen, NULL, 0, pio->write_details.completed, &pio->write_overlapped))
{
//set completion event
SetEvent(pio->write_overlapped.hEvent);
}
else
{
if (WSAGetLastError() != ERROR_IO_PENDING)
{
CloseHandle(pio->write_overlapped.hEvent);
pio->write_overlapped.hEvent = 0;
errno = errno_from_WSALastError();
debug("ERROR ConnectEx :%d, io:%p", errno, pio);
return -1;
}
}
pio->write_details.pending = TRUE;
pio->type == CONNECT_FD;
return 0;
}
int socketio_connect(struct w32_io* pio, const struct sockaddr* name, int namelen) {
//SET_ERRNO_ON_ERROR(connect(pio->sock, name, namelen));
if (pio->write_details.pending == FALSE)
{
if (-1 == socketio_connectex(pio, name, namelen))
return -1;
}
if (w32_io_is_blocking(pio)) {
// block until connect io is complete
while (FALSE == socketio_is_io_available(pio, TRUE))
{
if (0 != wait_for_any_event(&pio->write_overlapped.hEvent, 1, INFINITE))
{
return -1;
}
}
}
else {
//if i/o is not ready
if (FALSE == socketio_is_io_available(pio, TRUE)) {
errno = EINPROGRESS;
debug2("connect is in progress, io:%p", pio);
return -1;
}
}
if (0 != setsockopt(pio->sock, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, NULL, 0))
{
errno = errno_from_WSALastError();
debug("ERROR: setsockopt failed:%d, io:%p", errno, pio);
return NULL;
}
//close event handle
CloseHandle(pio->write_overlapped.hEvent);
pio->write_overlapped.hEvent = 0;
pio->type = SOCK_FD;
return 0;
}
BOOL socketio_is_io_available(struct w32_io* pio, BOOL rd) {
struct acceptEx_context* context = (struct acceptEx_context*)pio->context;
if (pio->type == LISTEN_FD) {
if ((pio->type == LISTEN_FD) || (pio->type == CONNECT_FD)) {
DWORD numBytes = 0;
DWORD flags;
if (pio->read_details.pending && WSAGetOverlappedResult(pio->sock, &pio->read_overlapped, &numBytes, FALSE, &flags)) {
OVERLAPPED *overlapped = (pio->type == LISTEN_FD) ? &pio->read_overlapped : &pio->write_overlapped;
BOOL pending = (pio->type == LISTEN_FD) ? pio->read_details.pending : pio->write_details.pending;
if (pending && WSAGetOverlappedResult(pio->sock, overlapped, &numBytes, FALSE, &flags)) {
return TRUE;
}
else {
if (pio->read_details.pending && WSAGetLastError() != WSA_IO_INCOMPLETE) {
if (pending && WSAGetLastError() != WSA_IO_INCOMPLETE) {
//unexpected error;
debug("ERROR:Unxpected State. WSAError:%d", WSAGetLastError());
debug("ERROR:Unxpected State. io:%p, WSAError:%d", pio, WSAGetLastError());
}
return FALSE;
}
}
else if (rd){
else if (rd) {
if (pio->read_details.remaining || pio->read_details.error)
return TRUE;
else
return FALSE;
}
else { //write
return (pio->write_details.pending == FALSE)? TRUE : FALSE;
return (pio->write_details.pending == FALSE) ? TRUE : FALSE;
}
}
int socketio_on_select(struct w32_io* pio, BOOL rd) {
debug2("pio:%p", pio);
debug2("io:%p", pio);
if (rd && pio->read_details.pending)
return 0;

View File

@ -5,6 +5,7 @@
enum w32_io_type {
UNKOWN_FD = 0,
LISTEN_FD,
CONNECT_FD,
SOCK_FD,
FILE_FD,
PIPE_FD,