This commit is contained in:
Manoj Ampalam 2016-01-30 00:16:46 -08:00
parent 8433e8ee77
commit a9d62206f4
3 changed files with 109 additions and 34 deletions

View File

@ -6,9 +6,11 @@
//signal queue //signal queue
//wakes on //wakes on
// - any signals (errno = EINTR ) // - any signals (errno = EINTR ) - TODO
// - any of the supplied events set // - any of the supplied events set
// - any APCs caused by IO completions // - any APCs caused by IO completions
// - time out (errno = ETIMEOUT)
// Returns 0 on IO completion and -1 on rest
int wait_for_any_event(HANDLE* events, int num_events, DWORD milli_seconds) int wait_for_any_event(HANDLE* events, int num_events, DWORD milli_seconds)
{ {
//todo - implement signal catching and handling //todo - implement signal catching and handling
@ -24,10 +26,12 @@ int wait_for_any_event(HANDLE* events, int num_events, DWORD milli_seconds)
} }
else if (ret == WAIT_TIMEOUT) { else if (ret == WAIT_TIMEOUT) {
errno = ETIMEDOUT; errno = ETIMEDOUT;
debug("ERROR: wait timed out");
return -1; return -1;
} }
else { //some other error else { //some other error
errno = EOTHER; errno = EOTHER;
debug("ERROR: unxpected wait end with error: %d", ret);
return -1; return -1;
} }
} }
@ -40,10 +44,12 @@ int wait_for_any_event(HANDLE* events, int num_events, DWORD milli_seconds)
else if (ret == 0) { else if (ret == 0) {
//timed out //timed out
errno = ETIMEDOUT; errno = ETIMEDOUT;
debug("ERROR: wait timed out");
return -1; return -1;
} }
else { //some other error else { //some other error
errno = EOTHER; errno = EOTHER;
debug("ERROR: unxpected SleepEx error: %d", ret);
return -1; return -1;
} }
} }

View File

@ -9,9 +9,10 @@
#define INTERNAL_RECV_BUFFER_SIZE 70*1024 //70KB #define INTERNAL_RECV_BUFFER_SIZE 70*1024 //70KB
static int errno_from_WSAError() #define errno_from_WSALastError() errno_from_WSAError(WSAGetLastError())
static int errno_from_WSAError(int wsaerrno)
{ {
int wsaerrno = WSAGetLastError();
if (wsaerrno == WSAEWOULDBLOCK) if (wsaerrno == WSAEWOULDBLOCK)
{ {
@ -31,14 +32,6 @@ static int errno_from_WSAError()
return wsaerrno; return wsaerrno;
} }
static int set_errno_on_error(int ret)
{
if (ret == SOCKET_ERROR) {
errno = errno_from_WSAError();
}
return ret;
}
int socketio_initialize() { int socketio_initialize() {
WSADATA wsaData = { 0 }; WSADATA wsaData = { 0 };
return WSAStartup(MAKEWORD(2, 2), &wsaData); return WSAStartup(MAKEWORD(2, 2), &wsaData);
@ -67,6 +60,7 @@ int socketio_acceptEx(struct w32_io* pio) {
context = (struct acceptEx_context*)malloc(sizeof(struct acceptEx_context)); context = (struct acceptEx_context*)malloc(sizeof(struct acceptEx_context));
if (context == NULL) { if (context == NULL) {
errno = ENOMEM; errno = ENOMEM;
debug("ERROR: %d", errno);
return -1; return -1;
} }
@ -76,7 +70,8 @@ int socketio_acceptEx(struct w32_io* pio) {
&dwBytes, NULL, NULL)) &dwBytes, NULL, NULL))
{ {
free(context); free(context);
errno = errno_from_WSAError(); errno = errno_from_WSALastError();
debug("ERROR: %d", errno);
return -1; return -1;
} }
@ -90,6 +85,7 @@ int socketio_acceptEx(struct w32_io* pio) {
if (pio->read_overlapped.hEvent == NULL) { if (pio->read_overlapped.hEvent == NULL) {
if ((pio->read_overlapped.hEvent = CreateEvent(NULL, TRUE, FALSE, NULL)) == NULL) { if ((pio->read_overlapped.hEvent = CreateEvent(NULL, TRUE, FALSE, NULL)) == NULL) {
errno = ENOMEM; errno = ENOMEM;
debug("ERROR: %d", errno);
return -1; return -1;
} }
} }
@ -99,7 +95,8 @@ int socketio_acceptEx(struct w32_io* pio) {
//todo - get socket parameters from listening socket //todo - get socket parameters from listening socket
context->accept_socket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); context->accept_socket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (context->accept_socket == INVALID_SOCKET) { if (context->accept_socket == INVALID_SOCKET) {
errno = errno_from_WSAError(); errno = errno_from_WSALastError();
debug("ERROR: %d", errno);
return -1; return -1;
} }
@ -118,7 +115,8 @@ int socketio_acceptEx(struct w32_io* pio) {
else { else {
//if overlapped io is in progress, we are good //if overlapped io is in progress, we are good
if (WSAGetLastError() != ERROR_IO_PENDING) { if (WSAGetLastError() != ERROR_IO_PENDING) {
errno = errno_from_WSAError(); errno = errno_from_WSALastError();
debug("ERROR: %d", errno);
return -1; return -1;
} }
} }
@ -135,6 +133,8 @@ void CALLBACK WSARecvCompletionRoutine(
) )
{ {
struct w32_io* pio = (struct w32_io*)((char*)lpOverlapped - offsetof(struct w32_io, read_overlapped)); struct w32_io* pio = (struct w32_io*)((char*)lpOverlapped - offsetof(struct w32_io, read_overlapped));
debug("pio:%p, pending_state:%d, remaining:%d, completed:%d, error:%d, transferred: %d",
pio, pio->read_details.pending, pio->read_details.remaining, pio->read_details.pending, dwError, cbTransferred);
if (!dwError && !cbTransferred) if (!dwError && !cbTransferred)
dwError = ERROR_GRACEFUL_DISCONNECT; dwError = ERROR_GRACEFUL_DISCONNECT;
pio->read_details.error = dwError; pio->read_details.error = dwError;
@ -160,6 +160,7 @@ int socketio_WSARecv(struct w32_io* pio, BOOL* completed) {
if (!wsabuf.buf) if (!wsabuf.buf)
{ {
errno = ENOMEM; errno = ENOMEM;
debug("ERROR: %d", errno);
return -1; return -1;
} }
@ -170,12 +171,12 @@ int socketio_WSARecv(struct w32_io* pio, BOOL* completed) {
wsabuf.buf = pio->read_details.buf; wsabuf.buf = pio->read_details.buf;
//TODO - implement flags if any needed for OpenSSH
ret = WSARecv(pio->sock, &wsabuf, 1, NULL, &recv_flags, &pio->read_overlapped, &WSARecvCompletionRoutine); ret = WSARecv(pio->sock, &wsabuf, 1, NULL, &recv_flags, &pio->read_overlapped, &WSARecvCompletionRoutine);
if (ret == 0) if (ret == 0)
{ {
pio->read_details.pending = TRUE; pio->read_details.pending = TRUE;
//receive has completed but APC is pending to be scheduled //receive has completed but APC is pending to be scheduled
debug("WSARecv immediate completion");
if (completed) if (completed)
*completed = TRUE; *completed = TRUE;
} }
@ -186,7 +187,8 @@ int socketio_WSARecv(struct w32_io* pio, BOOL* completed) {
pio->read_details.pending = TRUE; pio->read_details.pending = TRUE;
} }
else { //failed else { //failed
errno = errno_from_WSAError(); errno = errno_from_WSALastError();
debug("ERROR:%d", errno);
return -1; return -1;
} }
} }
@ -198,14 +200,16 @@ struct w32_io* socketio_socket(int domain, int type, int protocol) {
struct w32_io *pio = (struct w32_io*)malloc(sizeof(struct w32_io)); struct w32_io *pio = (struct w32_io*)malloc(sizeof(struct w32_io));
if (!pio) { if (!pio) {
errno = ENOMEM; errno = ENOMEM;
debug("ERROR:%d", errno);
return NULL; return NULL;
} }
memset(pio, 0, sizeof(struct w32_io)); memset(pio, 0, sizeof(struct w32_io));
pio->sock = socket(domain, type, protocol); pio->sock = socket(domain, type, protocol);
if (pio->sock == INVALID_SOCKET) { if (pio->sock == INVALID_SOCKET) {
errno = errno_from_WSAError(); errno = errno_from_WSALastError();
free(pio); free(pio);
debug("ERROR:%d", errno);
return NULL; return NULL;
} }
@ -213,46 +217,65 @@ struct w32_io* socketio_socket(int domain, int type, int protocol) {
return pio; return pio;
} }
#define SET_ERRNO_ON_ERROR(ret) \
do { \
if ((ret) == SOCKET_ERROR) { \
errno = errno_from_WSALastError(); \
debug("ERROR: %d", errno); \
} \
return (ret); \
} while (0)
int socketio_setsockopt(struct w32_io* pio, int level, int optname, const char* optval, int optlen) { int socketio_setsockopt(struct w32_io* pio, int level, int optname, const char* optval, int optlen) {
return set_errno_on_error(setsockopt(pio->sock, level, optname, optval, optlen)); SET_ERRNO_ON_ERROR(setsockopt(pio->sock, level, optname, optval, optlen));
} }
int socketio_getsockopt(struct w32_io* pio, int level, int optname, char* optval, int* optlen) { int socketio_getsockopt(struct w32_io* pio, int level, int optname, char* optval, int* optlen) {
return set_errno_on_error(getsockopt(pio->sock, level, optname, optval, optlen)); SET_ERRNO_ON_ERROR(getsockopt(pio->sock, level, optname, optval, optlen));
} }
int socketio_getsockname(struct w32_io* pio, struct sockaddr* name, int* namelen) { int socketio_getsockname(struct w32_io* pio, struct sockaddr* name, int* namelen) {
return set_errno_on_error(getsockname(pio->sock, name, namelen)); SET_ERRNO_ON_ERROR(getsockname(pio->sock, name, namelen));
} }
int socketio_getpeername(struct w32_io* pio, struct sockaddr* name, int* namelen) { int socketio_getpeername(struct w32_io* pio, struct sockaddr* name, int* namelen) {
return set_errno_on_error(getpeername(pio->sock, name, namelen)); SET_ERRNO_ON_ERROR(getpeername(pio->sock, name, namelen));
} }
int socketio_listen(struct w32_io* pio, int backlog) { int socketio_listen(struct w32_io* pio, int backlog) {
pio->type = LISTEN_FD; pio->type = LISTEN_FD;
return set_errno_on_error(listen(pio->sock, backlog)); SET_ERRNO_ON_ERROR(listen(pio->sock, backlog));
} }
int socketio_bind(struct w32_io* pio, const struct sockaddr *name, int namelen) { int socketio_bind(struct w32_io* pio, const struct sockaddr *name, int namelen) {
return set_errno_on_error(bind(pio->sock, name, namelen)); SET_ERRNO_ON_ERROR(bind(pio->sock, name, namelen));
} }
int socketio_connect(struct w32_io* pio, const struct sockaddr* name, int namelen) { int socketio_connect(struct w32_io* pio, const struct sockaddr* name, int namelen) {
return set_errno_on_error(connect(pio->sock, name, namelen)); SET_ERRNO_ON_ERROR(connect(pio->sock, name, namelen));
} }
int socketio_recv(struct w32_io* pio, void *buf, size_t len, int flags) { int socketio_recv(struct w32_io* pio, void *buf, size_t len, int flags) {
BOOL completed = FALSE; BOOL completed = FALSE;
debug("pio: %p", pio);
if ((buf == NULL) || (len == 0)){ if ((buf == NULL) || (len == 0)){
errno = EINVAL; errno = EINVAL;
debug("ERROR, buf:%p, len:%d", buf, len);
return -1;
}
if (flags != 0) {
errno = ENOTSUP;
debug("ERROR: flags are not currently supported");
return -1; return -1;
} }
//if io is already pending //if io is already pending
if (pio->read_details.pending) { if (pio->read_details.pending) {
errno = EAGAIN; errno = EAGAIN;
debug("Read is already pending");
return -1; return -1;
} }
@ -263,19 +286,21 @@ int socketio_recv(struct w32_io* pio, void *buf, size_t len, int flags) {
memcpy(buf, pio->read_details.buf + pio->read_details.completed, num_bytes_copied); memcpy(buf, pio->read_details.buf + pio->read_details.completed, num_bytes_copied);
pio->read_details.remaining -= num_bytes_copied; pio->read_details.remaining -= num_bytes_copied;
pio->read_details.completed += num_bytes_copied; pio->read_details.completed += num_bytes_copied;
debug("returning %d bytes from prior completed IO, remaining: %d", num_bytes_copied, pio->read_details.remaining);
return num_bytes_copied; return num_bytes_copied;
} }
//if there was an error on async call, return //if there was an error on async call, return
if (pio->read_details.error) { if (pio->read_details.error) {
if (pio->read_details.error == ERROR_GRACEFUL_DISCONNECT) { if (pio->read_details.error == ERROR_GRACEFUL_DISCONNECT) {
debug("connection closed");
//connection is closed //connection is closed
return 0; return 0;
} }
else { else {
//todo - get qualified error errno = errno_from_WSAError(pio->read_details.error);
errno = EOTHER;
pio->read_details.error = 0; pio->read_details.error = 0;
debug("ERROR: %d", errno);
return -1; return -1;
} }
} }
@ -289,6 +314,7 @@ int socketio_recv(struct w32_io* pio, void *buf, size_t len, int flags) {
if (pio->read_details.pending) { if (pio->read_details.pending) {
//this shouldn't be happening //this shouldn't be happening
errno = EOTHER; errno = EOTHER;
debug("ERROR: Unexpected IO state");
return -1; return -1;
} }
} }
@ -304,6 +330,7 @@ int socketio_recv(struct w32_io* pio, void *buf, size_t len, int flags) {
else { else {
if (socketio_is_io_available(pio, TRUE) == FALSE) { if (socketio_is_io_available(pio, TRUE) == FALSE) {
errno = EAGAIN; errno = EAGAIN;
debug("IO is pending");
return -1; return -1;
} }
} }
@ -313,12 +340,13 @@ int socketio_recv(struct w32_io* pio, void *buf, size_t len, int flags) {
{ {
if (pio->read_details.error == ERROR_GRACEFUL_DISCONNECT) { if (pio->read_details.error == ERROR_GRACEFUL_DISCONNECT) {
//connection is closed //connection is closed
debug("connection closed");
return 0; return 0;
} }
else { else {
//todo - get qualified error errno = errno_from_WSAError(pio->read_details.error);
errno = EOTHER;
pio->read_details.error = 0; pio->read_details.error = 0;
debug("ERROR:%d", errno);
return -1; return -1;
} }
} }
@ -328,11 +356,13 @@ int socketio_recv(struct w32_io* pio, void *buf, size_t len, int flags) {
memcpy(buf, pio->read_details.buf, num_bytes_copied); memcpy(buf, pio->read_details.buf, num_bytes_copied);
pio->read_details.remaining -= num_bytes_copied; pio->read_details.remaining -= num_bytes_copied;
pio->read_details.completed = num_bytes_copied; pio->read_details.completed = num_bytes_copied;
debug("returning %d bytes from completed IO, remaining: %d", num_bytes_copied, pio->read_details.remaining);
return num_bytes_copied; return num_bytes_copied;
} }
else { else {
//this should not happen //this should not happen
errno = EOTHER; errno = EOTHER;
debug("ERROR:Unexpected IO stated");
return -1; return -1;
} }
@ -346,6 +376,7 @@ void CALLBACK WSASendCompletionRoutine(
) )
{ {
struct w32_io* pio = (struct w32_io*)((char*)lpOverlapped - offsetof(struct w32_io, write_overlapped)); struct w32_io* pio = (struct w32_io*)((char*)lpOverlapped - offsetof(struct w32_io, write_overlapped));
debug("pio: %p, pending_state:%d, error:%d, transferred:%d, remaining: %d", pio, pio->write_details.pending, dwError, cbTransferred, pio->write_details.remaining);
pio->write_details.error = dwError; pio->write_details.error = dwError;
//assert that remaining == cbTransferred //assert that remaining == cbTransferred
pio->write_details.remaining -= cbTransferred; pio->write_details.remaining -= cbTransferred;
@ -355,7 +386,21 @@ void CALLBACK WSASendCompletionRoutine(
int socketio_send(struct w32_io* pio, const void *buf, size_t len, int flags) { int socketio_send(struct w32_io* pio, const void *buf, size_t len, int flags) {
int ret = 0; int ret = 0;
WSABUF wsabuf; WSABUF wsabuf;
debug("pio: %p", pio);
if ((buf == NULL) || (len == 0)){
errno = EINVAL;
debug("ERROR, buf:%p, len:%d", buf, len);
return -1;
}
if (flags != 0) {
errno = ENOTSUP;
debug("ERROR: flags are not currently supported");
return -1;
}
//if io is already pending //if io is already pending
if (pio->write_details.pending) if (pio->write_details.pending)
{ {
@ -363,6 +408,7 @@ int socketio_send(struct w32_io* pio, const void *buf, size_t len, int flags) {
{ {
//this covers the scenario when the fd was previously non blocking (and hence io is still pending) //this covers the scenario when the fd was previously non blocking (and hence io is still pending)
//wait for previous io to complete //wait for previous io to complete
debug("waiting for IO on a previous nonblocking send to complete");
while (pio->write_details.pending) { while (pio->write_details.pending) {
if (wait_for_any_event(NULL, 0, INFINITE) == -1) if (wait_for_any_event(NULL, 0, INFINITE) == -1)
return -1; return -1;
@ -370,6 +416,7 @@ int socketio_send(struct w32_io* pio, const void *buf, size_t len, int flags) {
} }
else { else {
errno = EAGAIN; errno = EAGAIN;
debug("IO pending");
return -1; return -1;
} }
} }
@ -377,6 +424,7 @@ int socketio_send(struct w32_io* pio, const void *buf, size_t len, int flags) {
if (pio->write_details.error) { if (pio->write_details.error) {
errno = errno_from_WSAError(pio->write_details.error); errno = errno_from_WSAError(pio->write_details.error);
debug("ERROR:%d", errno);
return -1; return -1;
} }
@ -388,6 +436,7 @@ int socketio_send(struct w32_io* pio, const void *buf, size_t len, int flags) {
if (!wsabuf.buf) if (!wsabuf.buf)
{ {
errno = ENOMEM; errno = ENOMEM;
debug("ERROR:%d", errno);
return -1; return -1;
} }
@ -407,11 +456,13 @@ int socketio_send(struct w32_io* pio, const void *buf, size_t len, int flags) {
if (ret == 0) if (ret == 0)
{ {
//send has completed and APC is scheduled, let it run //send has completed and APC is scheduled, let it run
debug("WSASend immediate completion");
pio->write_details.pending = TRUE; pio->write_details.pending = TRUE;
pio->write_details.remaining = wsabuf.len; pio->write_details.remaining = wsabuf.len;
SleepEx(1, TRUE); SleepEx(1, TRUE);
if ((pio->write_details.pending) || (pio->write_details.remaining != 0)) { if ((pio->write_details.pending) || (pio->write_details.remaining != 0)) {
errno = EOTHER; errno = EOTHER;
debug("ERROR: Unexpected IO state");
return -1; return -1;
} }
@ -422,6 +473,7 @@ int socketio_send(struct w32_io* pio, const void *buf, size_t len, int flags) {
if (WSAGetLastError() == WSA_IO_PENDING) if (WSAGetLastError() == WSA_IO_PENDING)
{ {
//io is initiated and pending //io is initiated and pending
debug("IO pending");
pio->write_details.pending = TRUE; pio->write_details.pending = TRUE;
pio->write_details.remaining = wsabuf.len; pio->write_details.remaining = wsabuf.len;
if (w32_io_is_blocking(pio)) if (w32_io_is_blocking(pio))
@ -434,7 +486,8 @@ int socketio_send(struct w32_io* pio, const void *buf, size_t len, int flags) {
return wsabuf.len; return wsabuf.len;
} }
else { //failed else { //failed
errno = errno_from_WSAError(); errno = errno_from_WSALastError();
debug("ERROR:%d", errno);
return -1; return -1;
} }
} }
@ -443,7 +496,7 @@ int socketio_send(struct w32_io* pio, const void *buf, size_t len, int flags) {
int socketio_shutdown(struct w32_io* pio, int how) { int socketio_shutdown(struct w32_io* pio, int how) {
return set_errno_on_error(shutdown(pio->sock, how)); SET_ERRNO_ON_ERROR(shutdown(pio->sock, how));
} }
int socketio_close(struct w32_io* pio) { int socketio_close(struct w32_io* pio) {
@ -502,7 +555,7 @@ struct w32_io* socketio_accept(struct w32_io* pio, struct sockaddr* addr, int* a
if (0 != setsockopt(context->accept_socket, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, (char*)&pio->sock, sizeof(pio->sock))) if (0 != setsockopt(context->accept_socket, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, (char*)&pio->sock, sizeof(pio->sock)))
{ {
errno = errno_from_WSAError(); errno = errno_from_WSALastError();
return NULL; return NULL;
} }

View File

@ -2,6 +2,7 @@
#include "w32fd.h" #include "w32fd.h"
#include <stdarg.h> #include <stdarg.h>
#include <errno.h> #include <errno.h>
#include <time.h>
struct w32fd_table { struct w32fd_table {
w32_fd_set occupied; w32_fd_set occupied;
@ -315,6 +316,8 @@ int w32_select(int fds, fd_set* readfds, fd_set* writefds, fd_set* exceptfds, co
fd_set read_ready_fds, write_ready_fds; fd_set read_ready_fds, write_ready_fds;
HANDLE events[32]; HANDLE events[32];
int num_events = 0; int num_events = 0;
unsigned int time_milliseconds = timeout->tv_sec * 100 + timeout->tv_usec / 1000;
ULONGLONG ticks_start = GetTickCount64(), ticks_now;
memset(&read_ready_fds, 0, sizeof(fd_set)); memset(&read_ready_fds, 0, sizeof(fd_set));
memset(&write_ready_fds, 0, sizeof(fd_set)); memset(&write_ready_fds, 0, sizeof(fd_set));
@ -405,9 +408,16 @@ int w32_select(int fds, fd_set* readfds, fd_set* writefds, fd_set* exceptfds, co
} }
} }
do { do {
//to-do cut down wait time on subsequent waits ticks_now = GetTickCount64();
if (0 != wait_for_any_event(events, num_events, ((timeout->tv_sec) * 1000) + ((timeout->tv_usec) / 1000))) { if (time_milliseconds < (ticks_now - ticks_start)) {
errno = ETIMEDOUT;
debug("select timing out");
return -1;
}
if (0 != wait_for_any_event(events, num_events, time_milliseconds - (ticks_now - ticks_start))) {
return -1; return -1;
} }
@ -435,6 +445,8 @@ int w32_select(int fds, fd_set* readfds, fd_set* writefds, fd_set* exceptfds, co
if (out_ready_fds) if (out_ready_fds)
break; break;
debug("wait ended without any IO completion, looping again");
} while (1); } while (1);
if (readfds) if (readfds)
@ -448,9 +460,13 @@ int w32_select(int fds, fd_set* readfds, fd_set* writefds, fd_set* exceptfds, co
int w32_dup(int oldfd) { int w32_dup(int oldfd) {
errno = EOPNOTSUPP;
debug("ERROR: dup is not implemented yet");
return -1; return -1;
} }
int w32_dup2(int oldfd, int newfd) { int w32_dup2(int oldfd, int newfd) {
errno = EOPNOTSUPP;
debug("ERROR: dup2 is not implemented yet");
return -1; return -1;
} }