This commit is contained in:
Manoj Ampalam 2016-01-07 13:18:20 -08:00
parent aacaa78d24
commit 71bd3ccd8f
5 changed files with 436 additions and 52 deletions

View File

@ -87,6 +87,22 @@ namespace UnitTests
exit(1);
}
fcntl(sockfd, F_SETFL, O_NONBLOCK);
fd_set read_set, write_set, except_set;
ZeroMemory(&read_set, sizeof(fd_set));
ZeroMemory(&write_set, sizeof(fd_set));
ZeroMemory(&except_set, sizeof(fd_set));
FD_SET(sockfd, &read_set);
struct timeval timeout;
timeout.tv_sec = 30;
timeout.tv_usec = 0;
int ret = select(sockfd, &read_set, &write_set, &except_set, &timeout);
new_fd = accept(sockfd, (struct sockaddr *)&their_addr, &sin_size);
}

View File

@ -3,6 +3,7 @@
#include <mswsock.h>
#include <errno.h>
#include "w32fd.h"
#include <stddef.h>
static int getWSAErrno()
@ -64,28 +65,6 @@ struct w32_io* socketio_socket(int domain, int type, int protocol) {
return pio;
}
struct w32_io* socketio_accept(struct w32_io* pio, struct sockaddr* addr, int* addrlen) {
struct w32_io *accept_io = NULL;
accept_io = (struct w32_io*)malloc(sizeof(struct w32_io));
if (!accept_io)
{
errno = ENOMEM;
return NULL;
}
accept_io->sock = accept(pio->sock, addr, addrlen);
if (accept_io->sock == INVALID_SOCKET) {
errno = getWSAErrno();
free(accept_io);
return NULL;
}
pio->type = SOCK_FD;
return accept_io;
}
int socketio_setsockopt(struct w32_io* pio, int level, int optname, const char* optval, int optlen) {
return set_errno_on_error(setsockopt(pio->sock, level, optname, optval, optlen));
}
@ -115,6 +94,147 @@ int socketio_connect(struct w32_io* pio, const struct sockaddr* name, int namele
return set_errno_on_error(connect(pio->sock, name, namelen));
}
void CALLBACK WSARecvCompletionRoutine(
IN DWORD dwError,
IN DWORD cbTransferred,
IN LPWSAOVERLAPPED lpOverlapped,
IN DWORD dwFlags
)
{
struct w32_io* pio = lpOverlapped - offsetof(struct w32_io, read_overlapped);
pio->read_details.error = dwError;
pio->read_details.remaining = cbTransferred;
pio->read_details.completed = 0;
pio->read_details.pending = FALSE;
}
int socketio_recv(struct w32_io* pio, void *buf, size_t len, int flags) {
int ret = 0;
DWORD bytes_received;
//if io is already pending
if (pio->read_details.pending)
{
errno = EAGAIN;
return -1;
}
//initialize recv buffers if needed
if (pio->read_overlapped.hEvent == NULL)
{
WSABUF* wsabuf = malloc(sizeof(WSABUF));
if (wsabuf) {
wsabuf->len = 1024;
wsabuf->buf = malloc(wsabuf->len);
}
if (!wsabuf || !wsabuf->buf)
{
if (wsabuf)
free(wsabuf);
errno = ENOMEM;
return -1;
}
pio->read_overlapped.hEvent = (HANDLE)wsabuf;
}
//if we have some buffer copy it and retun #bytes copied
if (pio->read_details.remaining)
{
int num_bytes_copied = min(len, pio->read_details.remaining);
memcpy(buf, pio->read_overlapped.hEvent, num_bytes_copied);
pio->read_details.remaining -= num_bytes_copied;
return num_bytes_copied;
}
//TODO - implement flags if any needed for OpenSSH
ret = WSARecv(pio->sock, (WSABUF*)pio->read_overlapped.hEvent, 1, &bytes_received, 0, &pio->read_overlapped, &WSARecvCompletionRoutine);
if (ret == 0)
{
//receive has completed and APC is scheduled, let it run
pio->read_details.pending = TRUE;
SleepEx(1, TRUE);
if ((pio->read_details.pending == FALSE) || (pio->read_details.remaining == 0)) {
errno = EOTHER;
return -1;
}
//we should have some bytes copied to internal buffer
}
else if (ret == WSA_IO_PENDING) {
//io is initiated and pending
pio->read_details.pending = TRUE;
if (w32_io_is_blocking(pio))
{
//wait until io is done
while (pio->read_details.pending)
SleepEx(INFINITE, TRUE);
}
else {
errno = EAGAIN;
return -1;
}
}
else { //failed
errno = getWSAErrno();
return -1;
}
//by this time we should have some bytes in internal buffer
//if we have some buffer copy it and retun #bytes copied
if (pio->read_details.remaining)
{
int num_bytes_copied = min(len, pio->read_details.remaining);
memcpy(buf, pio->read_overlapped.hEvent, num_bytes_copied);
pio->read_details.remaining -= num_bytes_copied;
return num_bytes_copied;
}
else {
errno = EOTHER;
return -1;
}
}
void CALLBACK WSASendCompletionRoutine(
IN DWORD dwError,
IN DWORD cbTransferred,
IN LPWSAOVERLAPPED lpOverlapped,
IN DWORD dwFlags
)
{
struct w32_io* pio = lpOverlapped - offsetof(struct w32_io, write_overlapped);
pio->write_details.error = dwError;
//assert that remaining == cbTransferred
pio->write_details.remaining -= cbTransferred;
pio->write_details.pending = FALSE;
}
int socketio_send(struct w32_io* pio, const void *buf, size_t len, int flags) {
if (pio->write_overlapped.hEvent == NULL)
{
WSABUF* wsabuf = malloc(sizeof(WSABUF));
if (wsabuf) {
wsabuf->len = 1024;
wsabuf->buf = malloc(wsabuf->len);
}
if (!wsabuf || !wsabuf->buf)
{
if (wsabuf)
free(wsabuf);
errno = ENOMEM;
return -1;
}
pio->write_overlapped.hEvent = (HANDLE)wsabuf;
}
}
int socketio_shutdown(struct w32_io* pio, int how) {
return set_errno_on_error(shutdown(pio->sock, how));
}
@ -133,7 +253,77 @@ struct acceptEx_context {
DWORD bytes_received;
};
int socketio_start_asyncio(struct w32_io* pio, BOOL read) {
struct w32_io* socketio_accept(struct w32_io* pio, struct sockaddr* addr, int* addrlen) {
struct w32_io *accept_io = NULL;
accept_io = (struct w32_io*)malloc(sizeof(struct w32_io));
if (!accept_io)
{
errno = ENOMEM;
return NULL;
}
if (w32_io_is_blocking(pio)) {
accept_io->sock = accept(pio->sock, addr, addrlen);
if (accept_io->sock == INVALID_SOCKET) {
errno = getWSAErrno();
free(accept_io);
return NULL;
}
}
else {
//ensure i/o is ready
if (FALSE == socketio_is_ioready(pio, TRUE)) {
free(accept_io);
errno = EAGAIN;
return -1;
}
struct acceptEx_context* context = (struct acceptEx_context*)pio->context;
accept_io->sock = context->accept_socket;
context->accept_socket = INVALID_SOCKET;
pio->read_details.pending = FALSE;
ResetEvent(pio->read_overlapped.hEvent);
}
pio->type = SOCK_FD;
return accept_io;
}
BOOL socketio_is_ioready(struct w32_io* pio, BOOL rd) {
struct acceptEx_context* context = (struct acceptEx_context*)pio->context;
if (w32_io_is_blocking(pio))
return FALSE;
if (pio->type == LISTEN_FD) {
DWORD numBytes = 0;
if (pio->read_details.pending && GetOverlappedResult(pio->read_overlapped.hEvent, &pio->read_overlapped, &numBytes, FALSE)) {
return TRUE;
}
else {
if (pio->read_details.pending && GetLastError() != ERROR_IO_INCOMPLETE) {
//unexpected error;
}
return FALSE;
}
}
else { //regular socket
//todo
return FALSE;
}
}
int socketio_start_asyncio(struct w32_io* pio, BOOL rd) {
if (w32_io_is_blocking(pio)) {
errno = EPERM;
return -1;
}
if (pio->type == LISTEN_FD) {
if (!pio->read_details.pending) {
struct acceptEx_context *context;
@ -181,7 +371,7 @@ int socketio_start_asyncio(struct w32_io* pio, BOOL read) {
return -1;
}
if (FALSE == context->lpfnAcceptEx(pio->sock,
if (TRUE == context->lpfnAcceptEx(pio->sock,
context->accept_socket,
context->lpOutputBuf,
0,
@ -190,10 +380,16 @@ int socketio_start_asyncio(struct w32_io* pio, BOOL read) {
&context->bytes_received,
&pio->read_overlapped))
{
//we are already connected. Set event so subsequent select will catch
SetEvent(pio->read_overlapped.hEvent);
}
else {
//if overlapped io is in progress, we are good
if (WSAGetLastError() != ERROR_IO_PENDING) {
errno = getWSAErrno();
return -1;
}
}
pio->read_details.pending = TRUE;
return 0;

View File

@ -1,6 +1,7 @@
#include "w32posix.h"
#include "w32fd.h"
#include <stdarg.h>
#include <errno.h>
struct w32fd_table {
w32_fd_set occupied;
@ -24,9 +25,11 @@ int fd_table_get_min_index() {
{
bitmap++;
min_index += 8;
if (min_index >= MAX_FDS)
if (min_index >= MAX_FDS) {
errno = ENOMEM;
return -1;
}
}
tmp = *bitmap;
@ -54,6 +57,7 @@ void fd_table_clear(int index)
}
void w32posix_initialize() {
fd_table_initialize();
socketio_initialize();
}
@ -63,7 +67,30 @@ void w32posix_done() {
BOOL w32_io_is_blocking(struct w32_io* pio)
{
return (pio->fd_status_flags & O_NONBLOCK) ? TRUE : FALSE;
return (pio->fd_status_flags & O_NONBLOCK) ? FALSE : TRUE;
}
BOOL w32_io_is_ioready(struct w32_io* pio, BOOL rd) {
if ((pio->type == LISTEN_FD) || (pio->type == SOCK_FD)) {
return socketio_is_ioready(pio, rd);
}
else {
//return fileio_is_ready(pio);
return FALSE;
}
}
int w32_io_start_asyncio(struct w32_io* pio, BOOL rd)
{
if ((pio->type == LISTEN_FD) || (pio->type == SOCK_FD)) {
return socketio_start_asyncio(pio, rd);
}
else {
//return fileio_is_ready(pio);
return -1;
}
}
int w32_socket(int domain, int type, int protocol) {
@ -71,9 +98,7 @@ int w32_socket(int domain, int type, int protocol) {
struct w32_io* pio = NULL;
if (min_index == -1)
{
return -1;
}
pio = socketio_socket(domain, type, protocol);
if (!pio) {
@ -90,9 +115,7 @@ int w32_accept(int fd, struct sockaddr* addr, int* addrlen)
struct w32_io* pio = NULL;
if (min_index == -1)
{
return -1;
}
pio = socketio_accept(fd_table.w32_ios[fd], addr, addrlen);
if (!pio) {
@ -131,6 +154,15 @@ int w32_connect(int fd, const struct sockaddr* name, int namelen) {
return socketio_connect(fd_table.w32_ios[fd], name, namelen);
}
int w32_recv(int fd, void *buf, size_t len, int flags) {
return socketio_recv(fd_table.w32_ios[fd], buf, len, flags);
}
int w32_send(int fd, const void *buf, size_t len, int flags) {
return socketio_send(fd_table.w32_ios[fd], buf, len, flags);
}
int w32_shutdown(int fd, int how) {
return socketio_shutdown(fd_table.w32_ios[fd], how);
}
@ -140,8 +172,7 @@ int w32_close(int fd) {
fd_table_clear(pio->table_index);
if ((pio->type == LISTEN_FD) || (pio->type == SOCK_FD)) {
socketio_close(pio);
return 0;
return socketio_close(pio);
}
else
return -1;
@ -159,10 +190,158 @@ int w32_fcntl(int fd, int cmd, ... /* arg */) {
return 0;
case F_GETFD:
return fd_table.w32_ios[fd]->fd_flags;
break;
return 0;
case F_SETFD:
fd_table.w32_ios[fd]->fd_flags = va_arg(valist, int);
return 0;
default:
errno = EINVAL;
return -1;
}
}
int w32_select(int fds, fd_set* readfds, fd_set* writefds, fd_set* exceptfds, const struct timeval *timeout) {
int in_ready_fds = 0, out_ready_fds = 0;
fd_set read_ready_fds, write_ready_fds;
HANDLE events[10];
int num_events = 0;
memset(&read_ready_fds, 0, sizeof(fd_set));
memset(&write_ready_fds, 0, sizeof(fd_set));
if (fds > MAX_FDS - 1) {
errno = EINVAL;
return -1;
}
//see if any io is ready
for (int i = 0; i <= fds; i++) {
if FD_ISSET(i, readfds) {
if (fd_table.w32_ios[i] == NULL) {
errno = EPERM;
return -1;
}
in_ready_fds++;
if (w32_io_is_ioready(fd_table.w32_ios[i], TRUE)) {
FD_SET(i, &read_ready_fds);
out_ready_fds++;
}
}
if FD_ISSET(i, writefds) {
if (fd_table.w32_ios[i] == NULL) {
errno = EPERM;
return -1;
}
in_ready_fds++;
if (w32_io_is_ioready(fd_table.w32_ios[i], FALSE)) {
FD_SET(i, &write_ready_fds);
out_ready_fds++;
}
}
}
//if none of input fds are set return error
if (in_ready_fds == 0) {
errno = EINVAL;
return -1;
}
//if some fds are already ready, return
if (out_ready_fds)
{
*readfds = read_ready_fds;
*writefds = write_ready_fds;
return out_ready_fds;
}
//start async io on selected fds
for (int i = 0; i <= fds; i++) {
if FD_ISSET(i, readfds) {
if (w32_io_start_asyncio(fd_table.w32_ios[i], TRUE) == -1)
return -1;
if (fd_table.w32_ios[i]->type == LISTEN_FD) {
events[num_events++] = fd_table.w32_ios[i]->read_overlapped.hEvent;
}
}
if FD_ISSET(i, writefds) {
if (w32_io_start_asyncio(fd_table.w32_ios[i], FALSE) == -1)
return -1;
}
}
//wait for any io to complete
if (num_events)
{
DWORD ret = WaitForMultipleObjectsEx(num_events, events, FALSE, ((timeout->tv_sec) * 1000) + ((timeout->tv_usec) / 1000), TRUE);
if ((ret >= WAIT_OBJECT_0) && (ret <= WAIT_OBJECT_0 + num_events - 1)) {
//woken up by event signalled
}
else if (ret == WAIT_IO_COMPLETION) {
//woken up by APC from IO completion
}
else if (ret == WAIT_TIMEOUT) {
errno = ETIMEDOUT;
return -1;
}
else { //some other error
errno = EOTHER;
return -1;
}
}
else
{
DWORD ret = SleepEx(((timeout->tv_sec) * 1000) + ((timeout->tv_usec) / 1000), TRUE);
if (ret == WAIT_IO_COMPLETION) {
//worken up by APC from IO completion
}
else if (ret == 0) {
//timed out
errno = ETIMEDOUT;
return -1;
}
else { //some other error
errno = EOTHER;
return -1;
}
}
//check on fd status
out_ready_fds = 0;
for (int i = 0; i <= fds; i++) {
if FD_ISSET(i, readfds) {
in_ready_fds++;
if (w32_io_is_ioready(fd_table.w32_ios[i], TRUE)) {
FD_SET(i, &read_ready_fds);
out_ready_fds++;
}
}
if FD_ISSET(i, writefds) {
in_ready_fds++;
if (w32_io_is_ioready(fd_table.w32_ios[i], FALSE)) {
FD_SET(i, &write_ready_fds);
out_ready_fds++;
}
}
}
if (out_ready_fds)
{
*readfds = read_ready_fds;
*writefds = write_ready_fds;
return out_ready_fds;
}
errno = EOTHER;
return -1;
}

View File

@ -41,6 +41,7 @@ struct w32_io {
};
BOOL w32_io_is_blocking(struct w32_io*);
BOOL w32_io_is_ioready(struct w32_io* pio, BOOL rd);
int fd_table_initialize();
int fd_table_add(struct w32_io*);
@ -48,6 +49,8 @@ int fd_table_delete(struct w32_io*);
int socketio_initialize();
int socketio_done();
BOOL socketio_is_ioready(struct w32_io* pio, BOOL rd);
int socketio_start_asyncio(struct w32_io* pio, BOOL rd);
struct w32_io* socketio_socket(int domain, int type, int protocol);
struct w32_io* socketio_accept(struct w32_io* pio, struct sockaddr* addr, int* addrlen);
int socketio_setsockopt(struct w32_io* pio, int level, int optname, const char* optval, int optlen);
@ -57,22 +60,8 @@ int socketio_getpeername(struct w32_io* pio, struct sockaddr* name, int* namelen
int socketio_listen(struct w32_io* pio, int backlog);
int socketio_bind(struct w32_io* pio, const struct sockaddr *name, int namelen);
int socketio_connect(struct w32_io* pio, const struct sockaddr* name, int namelen);
int socketio_recv(struct w32_io* pio, void *buf, size_t len, int flags);
int socketio_send(struct w32_io* pio, const void *buf, size_t len, int flags);
int socketio_shutdown(struct w32_io* pio, int how);
int socketio_close(struct w32_io* pio);
/*non-network i/o*/
int w32_pipe(int *pfds);
int w32_open(const char *pathname, int flags, ...);
int w32_wopen(const wchar_t *pathname, int flags, ...);
int w32_creat(const char *pathname, int mode);
int w32_read(int fd, void *dst, unsigned int max);
int w32_write(int fd, const void *buf, unsigned int max);
int w32_close(int fd);
/*operations on fds*/
int w32_ioctl(int d, int request, ...);
int w32_select(int fds, fd_set* readfds, fd_set* writefds, fd_set* exceptfds, const struct timeval *timeout);
int w32_fcntl(int fd, int cmd, ... /* arg */);
int w32_dup(int oldfd);
int w32_dup2(int oldfd, int newfd);

View File

@ -53,6 +53,8 @@ void w32posix_done();
#define listen w32_listen
#define bind w32_bind
#define connect w32_connect
#define recv w32_recv
#define send w32_send
#define shutdown w32_shutdown
int w32_socket(int domain, int type, int protocol);
int w32_accept(int fd, struct sockaddr* addr, int* addrlen);
@ -63,6 +65,8 @@ int w32_getpeername(int fd, struct sockaddr* name, int* namelen);
int w32_listen(int fd, int backlog);
int w32_bind(int fd, const struct sockaddr *name, int namelen);
int w32_connect(int fd, const struct sockaddr* name, int namelen);
int w32_recv(int fd, void *buf, size_t len, int flags);
int w32_send(int fd, const void *buf, size_t len, int flags);
int w32_shutdown(int fd, int how);
/*non-network i/o*/