diff --git a/contrib/win32/w32-posix-prototype/win32posix/Tests/unittest1.cpp b/contrib/win32/w32-posix-prototype/win32posix/Tests/unittest1.cpp index 9cdeabf..8f42ebb 100644 --- a/contrib/win32/w32-posix-prototype/win32posix/Tests/unittest1.cpp +++ b/contrib/win32/w32-posix-prototype/win32posix/Tests/unittest1.cpp @@ -87,6 +87,22 @@ namespace UnitTests exit(1); } + fcntl(sockfd, F_SETFL, O_NONBLOCK); + + fd_set read_set, write_set, except_set; + + ZeroMemory(&read_set, sizeof(fd_set)); + ZeroMemory(&write_set, sizeof(fd_set)); + ZeroMemory(&except_set, sizeof(fd_set)); + + FD_SET(sockfd, &read_set); + struct timeval timeout; + timeout.tv_sec = 30; + timeout.tv_usec = 0; + int ret = select(sockfd, &read_set, &write_set, &except_set, &timeout); + + new_fd = accept(sockfd, (struct sockaddr *)&their_addr, &sin_size); + } diff --git a/contrib/win32/w32-posix-prototype/win32posix/win32posix/socketio.c b/contrib/win32/w32-posix-prototype/win32posix/win32posix/socketio.c index b9b578a..d0590fe 100644 --- a/contrib/win32/w32-posix-prototype/win32posix/win32posix/socketio.c +++ b/contrib/win32/w32-posix-prototype/win32posix/win32posix/socketio.c @@ -3,6 +3,7 @@ #include #include #include "w32fd.h" +#include static int getWSAErrno() @@ -64,28 +65,6 @@ struct w32_io* socketio_socket(int domain, int type, int protocol) { return pio; } -struct w32_io* socketio_accept(struct w32_io* pio, struct sockaddr* addr, int* addrlen) { - struct w32_io *accept_io = NULL; - - accept_io = (struct w32_io*)malloc(sizeof(struct w32_io)); - if (!accept_io) - { - errno = ENOMEM; - return NULL; - } - - accept_io->sock = accept(pio->sock, addr, addrlen); - if (accept_io->sock == INVALID_SOCKET) { - errno = getWSAErrno(); - free(accept_io); - return NULL; - } - - pio->type = SOCK_FD; - return accept_io; -} - - int socketio_setsockopt(struct w32_io* pio, int level, int optname, const char* optval, int optlen) { return set_errno_on_error(setsockopt(pio->sock, level, optname, optval, optlen)); } @@ -115,6 +94,147 @@ int socketio_connect(struct w32_io* pio, const struct sockaddr* name, int namele return set_errno_on_error(connect(pio->sock, name, namelen)); } +void CALLBACK WSARecvCompletionRoutine( + IN DWORD dwError, + IN DWORD cbTransferred, + IN LPWSAOVERLAPPED lpOverlapped, + IN DWORD dwFlags + ) +{ + struct w32_io* pio = lpOverlapped - offsetof(struct w32_io, read_overlapped); + pio->read_details.error = dwError; + pio->read_details.remaining = cbTransferred; + pio->read_details.completed = 0; + pio->read_details.pending = FALSE; +} + +int socketio_recv(struct w32_io* pio, void *buf, size_t len, int flags) { + int ret = 0; + DWORD bytes_received; + + //if io is already pending + if (pio->read_details.pending) + { + errno = EAGAIN; + return -1; + } + + //initialize recv buffers if needed + if (pio->read_overlapped.hEvent == NULL) + { + WSABUF* wsabuf = malloc(sizeof(WSABUF)); + if (wsabuf) { + wsabuf->len = 1024; + wsabuf->buf = malloc(wsabuf->len); + } + + if (!wsabuf || !wsabuf->buf) + { + if (wsabuf) + free(wsabuf); + errno = ENOMEM; + return -1; + } + + pio->read_overlapped.hEvent = (HANDLE)wsabuf; + } + + //if we have some buffer copy it and retun #bytes copied + if (pio->read_details.remaining) + { + int num_bytes_copied = min(len, pio->read_details.remaining); + memcpy(buf, pio->read_overlapped.hEvent, num_bytes_copied); + pio->read_details.remaining -= num_bytes_copied; + return num_bytes_copied; + } + + //TODO - implement flags if any needed for OpenSSH + ret = WSARecv(pio->sock, (WSABUF*)pio->read_overlapped.hEvent, 1, &bytes_received, 0, &pio->read_overlapped, &WSARecvCompletionRoutine); + if (ret == 0) + { + //receive has completed and APC is scheduled, let it run + pio->read_details.pending = TRUE; + SleepEx(1, TRUE); + if ((pio->read_details.pending == FALSE) || (pio->read_details.remaining == 0)) { + errno = EOTHER; + return -1; + } + + //we should have some bytes copied to internal buffer + } + else if (ret == WSA_IO_PENDING) { + //io is initiated and pending + pio->read_details.pending = TRUE; + + if (w32_io_is_blocking(pio)) + { + //wait until io is done + while (pio->read_details.pending) + SleepEx(INFINITE, TRUE); + } + else { + errno = EAGAIN; + return -1; + } + } + else { //failed + errno = getWSAErrno(); + return -1; + } + + //by this time we should have some bytes in internal buffer + //if we have some buffer copy it and retun #bytes copied + if (pio->read_details.remaining) + { + int num_bytes_copied = min(len, pio->read_details.remaining); + memcpy(buf, pio->read_overlapped.hEvent, num_bytes_copied); + pio->read_details.remaining -= num_bytes_copied; + return num_bytes_copied; + } + else { + errno = EOTHER; + return -1; + } + +} + +void CALLBACK WSASendCompletionRoutine( + IN DWORD dwError, + IN DWORD cbTransferred, + IN LPWSAOVERLAPPED lpOverlapped, + IN DWORD dwFlags + ) +{ + struct w32_io* pio = lpOverlapped - offsetof(struct w32_io, write_overlapped); + pio->write_details.error = dwError; + //assert that remaining == cbTransferred + pio->write_details.remaining -= cbTransferred; + pio->write_details.pending = FALSE; +} + +int socketio_send(struct w32_io* pio, const void *buf, size_t len, int flags) { + + if (pio->write_overlapped.hEvent == NULL) + { + WSABUF* wsabuf = malloc(sizeof(WSABUF)); + if (wsabuf) { + wsabuf->len = 1024; + wsabuf->buf = malloc(wsabuf->len); + } + + if (!wsabuf || !wsabuf->buf) + { + if (wsabuf) + free(wsabuf); + errno = ENOMEM; + return -1; + } + + pio->write_overlapped.hEvent = (HANDLE)wsabuf; + } +} + + int socketio_shutdown(struct w32_io* pio, int how) { return set_errno_on_error(shutdown(pio->sock, how)); } @@ -133,7 +253,77 @@ struct acceptEx_context { DWORD bytes_received; }; -int socketio_start_asyncio(struct w32_io* pio, BOOL read) { + +struct w32_io* socketio_accept(struct w32_io* pio, struct sockaddr* addr, int* addrlen) { + struct w32_io *accept_io = NULL; + + accept_io = (struct w32_io*)malloc(sizeof(struct w32_io)); + if (!accept_io) + { + errno = ENOMEM; + return NULL; + } + + if (w32_io_is_blocking(pio)) { + accept_io->sock = accept(pio->sock, addr, addrlen); + if (accept_io->sock == INVALID_SOCKET) { + errno = getWSAErrno(); + free(accept_io); + return NULL; + } + } + else { + //ensure i/o is ready + if (FALSE == socketio_is_ioready(pio, TRUE)) { + free(accept_io); + errno = EAGAIN; + return -1; + } + + struct acceptEx_context* context = (struct acceptEx_context*)pio->context; + + accept_io->sock = context->accept_socket; + context->accept_socket = INVALID_SOCKET; + pio->read_details.pending = FALSE; + ResetEvent(pio->read_overlapped.hEvent); + } + + pio->type = SOCK_FD; + return accept_io; +} + +BOOL socketio_is_ioready(struct w32_io* pio, BOOL rd) { + struct acceptEx_context* context = (struct acceptEx_context*)pio->context; + + if (w32_io_is_blocking(pio)) + return FALSE; + + if (pio->type == LISTEN_FD) { + DWORD numBytes = 0; + if (pio->read_details.pending && GetOverlappedResult(pio->read_overlapped.hEvent, &pio->read_overlapped, &numBytes, FALSE)) { + return TRUE; + } + else { + if (pio->read_details.pending && GetLastError() != ERROR_IO_INCOMPLETE) { + //unexpected error; + } + return FALSE; + } + } + else { //regular socket + //todo + return FALSE; + } + +} + +int socketio_start_asyncio(struct w32_io* pio, BOOL rd) { + + if (w32_io_is_blocking(pio)) { + errno = EPERM; + return -1; + } + if (pio->type == LISTEN_FD) { if (!pio->read_details.pending) { struct acceptEx_context *context; @@ -181,7 +371,7 @@ int socketio_start_asyncio(struct w32_io* pio, BOOL read) { return -1; } - if (FALSE == context->lpfnAcceptEx(pio->sock, + if (TRUE == context->lpfnAcceptEx(pio->sock, context->accept_socket, context->lpOutputBuf, 0, @@ -190,9 +380,15 @@ int socketio_start_asyncio(struct w32_io* pio, BOOL read) { &context->bytes_received, &pio->read_overlapped)) { - - errno = getWSAErrno(); - return -1; + //we are already connected. Set event so subsequent select will catch + SetEvent(pio->read_overlapped.hEvent); + } + else { + //if overlapped io is in progress, we are good + if (WSAGetLastError() != ERROR_IO_PENDING) { + errno = getWSAErrno(); + return -1; + } } pio->read_details.pending = TRUE; diff --git a/contrib/win32/w32-posix-prototype/win32posix/win32posix/w32fd.c b/contrib/win32/w32-posix-prototype/win32posix/win32posix/w32fd.c index ba61e0c..b837c0a 100644 --- a/contrib/win32/w32-posix-prototype/win32posix/win32posix/w32fd.c +++ b/contrib/win32/w32-posix-prototype/win32posix/win32posix/w32fd.c @@ -1,6 +1,7 @@ #include "w32posix.h" #include "w32fd.h" #include +#include struct w32fd_table { w32_fd_set occupied; @@ -24,8 +25,10 @@ int fd_table_get_min_index() { { bitmap++; min_index += 8; - if (min_index >= MAX_FDS) + if (min_index >= MAX_FDS) { + errno = ENOMEM; return -1; + } } tmp = *bitmap; @@ -54,6 +57,7 @@ void fd_table_clear(int index) } void w32posix_initialize() { + fd_table_initialize(); socketio_initialize(); } @@ -63,7 +67,30 @@ void w32posix_done() { BOOL w32_io_is_blocking(struct w32_io* pio) { - return (pio->fd_status_flags & O_NONBLOCK) ? TRUE : FALSE; + return (pio->fd_status_flags & O_NONBLOCK) ? FALSE : TRUE; +} + +BOOL w32_io_is_ioready(struct w32_io* pio, BOOL rd) { + if ((pio->type == LISTEN_FD) || (pio->type == SOCK_FD)) { + return socketio_is_ioready(pio, rd); + } + else { + //return fileio_is_ready(pio); + return FALSE; + } + +} + +int w32_io_start_asyncio(struct w32_io* pio, BOOL rd) +{ + if ((pio->type == LISTEN_FD) || (pio->type == SOCK_FD)) { + return socketio_start_asyncio(pio, rd); + } + else { + //return fileio_is_ready(pio); + return -1; + } + } int w32_socket(int domain, int type, int protocol) { @@ -71,9 +98,7 @@ int w32_socket(int domain, int type, int protocol) { struct w32_io* pio = NULL; if (min_index == -1) - { return -1; - } pio = socketio_socket(domain, type, protocol); if (!pio) { @@ -90,9 +115,7 @@ int w32_accept(int fd, struct sockaddr* addr, int* addrlen) struct w32_io* pio = NULL; if (min_index == -1) - { return -1; - } pio = socketio_accept(fd_table.w32_ios[fd], addr, addrlen); if (!pio) { @@ -131,6 +154,15 @@ int w32_connect(int fd, const struct sockaddr* name, int namelen) { return socketio_connect(fd_table.w32_ios[fd], name, namelen); } +int w32_recv(int fd, void *buf, size_t len, int flags) { + return socketio_recv(fd_table.w32_ios[fd], buf, len, flags); +} + +int w32_send(int fd, const void *buf, size_t len, int flags) { + return socketio_send(fd_table.w32_ios[fd], buf, len, flags); +} + + int w32_shutdown(int fd, int how) { return socketio_shutdown(fd_table.w32_ios[fd], how); } @@ -140,8 +172,7 @@ int w32_close(int fd) { fd_table_clear(pio->table_index); if ((pio->type == LISTEN_FD) || (pio->type == SOCK_FD)) { - socketio_close(pio); - return 0; + return socketio_close(pio); } else return -1; @@ -159,10 +190,158 @@ int w32_fcntl(int fd, int cmd, ... /* arg */) { return 0; case F_GETFD: return fd_table.w32_ios[fd]->fd_flags; - break; + return 0; case F_SETFD: fd_table.w32_ios[fd]->fd_flags = va_arg(valist, int); return 0; + default: + errno = EINVAL; + return -1; } } +int w32_select(int fds, fd_set* readfds, fd_set* writefds, fd_set* exceptfds, const struct timeval *timeout) { + int in_ready_fds = 0, out_ready_fds = 0; + fd_set read_ready_fds, write_ready_fds; + HANDLE events[10]; + int num_events = 0; + + memset(&read_ready_fds, 0, sizeof(fd_set)); + memset(&write_ready_fds, 0, sizeof(fd_set)); + + if (fds > MAX_FDS - 1) { + errno = EINVAL; + return -1; + } + + //see if any io is ready + for (int i = 0; i <= fds; i++) { + + if FD_ISSET(i, readfds) { + if (fd_table.w32_ios[i] == NULL) { + errno = EPERM; + return -1; + } + + in_ready_fds++; + if (w32_io_is_ioready(fd_table.w32_ios[i], TRUE)) { + FD_SET(i, &read_ready_fds); + out_ready_fds++; + } + } + + if FD_ISSET(i, writefds) { + if (fd_table.w32_ios[i] == NULL) { + errno = EPERM; + return -1; + } + + in_ready_fds++; + if (w32_io_is_ioready(fd_table.w32_ios[i], FALSE)) { + FD_SET(i, &write_ready_fds); + out_ready_fds++; + } + } + + } + + //if none of input fds are set return error + if (in_ready_fds == 0) { + errno = EINVAL; + return -1; + } + + //if some fds are already ready, return + if (out_ready_fds) + { + *readfds = read_ready_fds; + *writefds = write_ready_fds; + return out_ready_fds; + } + + //start async io on selected fds + for (int i = 0; i <= fds; i++) { + + if FD_ISSET(i, readfds) { + if (w32_io_start_asyncio(fd_table.w32_ios[i], TRUE) == -1) + return -1; + if (fd_table.w32_ios[i]->type == LISTEN_FD) { + events[num_events++] = fd_table.w32_ios[i]->read_overlapped.hEvent; + } + } + + if FD_ISSET(i, writefds) { + if (w32_io_start_asyncio(fd_table.w32_ios[i], FALSE) == -1) + return -1; + } + } + + //wait for any io to complete + if (num_events) + { + DWORD ret = WaitForMultipleObjectsEx(num_events, events, FALSE, ((timeout->tv_sec) * 1000) + ((timeout->tv_usec) / 1000), TRUE); + if ((ret >= WAIT_OBJECT_0) && (ret <= WAIT_OBJECT_0 + num_events - 1)) { + //woken up by event signalled + } + else if (ret == WAIT_IO_COMPLETION) { + //woken up by APC from IO completion + } + else if (ret == WAIT_TIMEOUT) { + errno = ETIMEDOUT; + return -1; + } + else { //some other error + errno = EOTHER; + return -1; + } + } + else + { + DWORD ret = SleepEx(((timeout->tv_sec) * 1000) + ((timeout->tv_usec) / 1000), TRUE); + if (ret == WAIT_IO_COMPLETION) { + //worken up by APC from IO completion + } + else if (ret == 0) { + //timed out + errno = ETIMEDOUT; + return -1; + } + else { //some other error + errno = EOTHER; + return -1; + } + } + + //check on fd status + out_ready_fds = 0; + for (int i = 0; i <= fds; i++) { + + if FD_ISSET(i, readfds) { + in_ready_fds++; + if (w32_io_is_ioready(fd_table.w32_ios[i], TRUE)) { + FD_SET(i, &read_ready_fds); + out_ready_fds++; + } + } + + if FD_ISSET(i, writefds) { + in_ready_fds++; + if (w32_io_is_ioready(fd_table.w32_ios[i], FALSE)) { + FD_SET(i, &write_ready_fds); + out_ready_fds++; + } + } + + } + + if (out_ready_fds) + { + *readfds = read_ready_fds; + *writefds = write_ready_fds; + return out_ready_fds; + } + + errno = EOTHER; + return -1; +} + diff --git a/contrib/win32/w32-posix-prototype/win32posix/win32posix/w32fd.h b/contrib/win32/w32-posix-prototype/win32posix/win32posix/w32fd.h index 81aecce..04a3ad1 100644 --- a/contrib/win32/w32-posix-prototype/win32posix/win32posix/w32fd.h +++ b/contrib/win32/w32-posix-prototype/win32posix/win32posix/w32fd.h @@ -41,6 +41,7 @@ struct w32_io { }; BOOL w32_io_is_blocking(struct w32_io*); +BOOL w32_io_is_ioready(struct w32_io* pio, BOOL rd); int fd_table_initialize(); int fd_table_add(struct w32_io*); @@ -48,6 +49,8 @@ int fd_table_delete(struct w32_io*); int socketio_initialize(); int socketio_done(); +BOOL socketio_is_ioready(struct w32_io* pio, BOOL rd); +int socketio_start_asyncio(struct w32_io* pio, BOOL rd); struct w32_io* socketio_socket(int domain, int type, int protocol); struct w32_io* socketio_accept(struct w32_io* pio, struct sockaddr* addr, int* addrlen); int socketio_setsockopt(struct w32_io* pio, int level, int optname, const char* optval, int optlen); @@ -57,22 +60,8 @@ int socketio_getpeername(struct w32_io* pio, struct sockaddr* name, int* namelen int socketio_listen(struct w32_io* pio, int backlog); int socketio_bind(struct w32_io* pio, const struct sockaddr *name, int namelen); int socketio_connect(struct w32_io* pio, const struct sockaddr* name, int namelen); +int socketio_recv(struct w32_io* pio, void *buf, size_t len, int flags); +int socketio_send(struct w32_io* pio, const void *buf, size_t len, int flags); int socketio_shutdown(struct w32_io* pio, int how); int socketio_close(struct w32_io* pio); -/*non-network i/o*/ -int w32_pipe(int *pfds); -int w32_open(const char *pathname, int flags, ...); -int w32_wopen(const wchar_t *pathname, int flags, ...); -int w32_creat(const char *pathname, int mode); -int w32_read(int fd, void *dst, unsigned int max); -int w32_write(int fd, const void *buf, unsigned int max); -int w32_close(int fd); - -/*operations on fds*/ -int w32_ioctl(int d, int request, ...); -int w32_select(int fds, fd_set* readfds, fd_set* writefds, fd_set* exceptfds, const struct timeval *timeout); -int w32_fcntl(int fd, int cmd, ... /* arg */); -int w32_dup(int oldfd); -int w32_dup2(int oldfd, int newfd); - diff --git a/contrib/win32/w32-posix-prototype/win32posix/win32posix/w32posix.h b/contrib/win32/w32-posix-prototype/win32posix/win32posix/w32posix.h index e3f5630..06ccca5 100644 --- a/contrib/win32/w32-posix-prototype/win32posix/win32posix/w32posix.h +++ b/contrib/win32/w32-posix-prototype/win32posix/win32posix/w32posix.h @@ -53,6 +53,8 @@ void w32posix_done(); #define listen w32_listen #define bind w32_bind #define connect w32_connect +#define recv w32_recv +#define send w32_send #define shutdown w32_shutdown int w32_socket(int domain, int type, int protocol); int w32_accept(int fd, struct sockaddr* addr, int* addrlen); @@ -63,6 +65,8 @@ int w32_getpeername(int fd, struct sockaddr* name, int* namelen); int w32_listen(int fd, int backlog); int w32_bind(int fd, const struct sockaddr *name, int namelen); int w32_connect(int fd, const struct sockaddr* name, int namelen); +int w32_recv(int fd, void *buf, size_t len, int flags); +int w32_send(int fd, const void *buf, size_t len, int flags); int w32_shutdown(int fd, int how); /*non-network i/o*/