This commit is contained in:
Manoj Ampalam 2016-03-04 13:05:58 -08:00
parent 8f4daa5bb4
commit 826d79e297
7 changed files with 312 additions and 215 deletions

View File

@ -12,8 +12,13 @@
#include <errno.h> #include <errno.h>
#include <stddef.h> #include <stddef.h>
/* internal read buffer size */
#define READ_BUFFER_SIZE 100*1024
/* internal write buffer size */
#define WRITE_BUFFER_SIZE 100*1024
#define errno_from_Win32LastError() errno_from_Win32Error(GetLastError()) #define errno_from_Win32LastError() errno_from_Win32Error(GetLastError())
/* maps Win32 error to errno */
static int static int
errno_from_Win32Error(int win32_error) errno_from_Win32Error(int win32_error)
{ {
@ -31,8 +36,13 @@ errno_from_Win32Error(int win32_error)
} }
} }
static int pipe_number = 0; /* used to name named pipes used to implement pipe() */
static int pipe_counter = 0;
/*
* pipe() implementation. Creates an inbound named pipe, uses CreateFile to connect
* to it. These handles are associated with read end and write end of the pipe
*/
int int
fileio_pipe(struct w32_io* pio[2]) { fileio_pipe(struct w32_io* pio[2]) {
HANDLE read_handle = INVALID_HANDLE_VALUE, write_handle = INVALID_HANDLE_VALUE; HANDLE read_handle = INVALID_HANDLE_VALUE, write_handle = INVALID_HANDLE_VALUE;
@ -40,14 +50,16 @@ fileio_pipe(struct w32_io* pio[2]) {
char pipe_name[MAX_PATH]; char pipe_name[MAX_PATH];
SECURITY_ATTRIBUTES sec_attributes; SECURITY_ATTRIBUTES sec_attributes;
debug2("");
if (pio == NULL) { if (pio == NULL) {
errno = EFAULT; errno = EINVAL;
debug("ERROR:%d", errno); debug("ERROR:%d", errno);
return -1; return -1;
} }
/* create name for named pipe */
if (-1 == sprintf_s(pipe_name, MAX_PATH, "\\\\.\\Pipe\\W32PosixPipe.%08x.%08x", GetCurrentProcessId(), pipe_number++)) { if (-1 == sprintf_s(pipe_name, MAX_PATH, "\\\\.\\Pipe\\W32PosixPipe.%08x.%08x",
GetCurrentProcessId(), pipe_counter++)) {
errno = EOTHER; errno = EOTHER;
debug("ERROR:%d", errno); debug("ERROR:%d", errno);
goto error; goto error;
@ -57,6 +69,7 @@ fileio_pipe(struct w32_io* pio[2]) {
sec_attributes.lpSecurityDescriptor = NULL; sec_attributes.lpSecurityDescriptor = NULL;
sec_attributes.nLength = 0; sec_attributes.nLength = 0;
/* create named pipe */
read_handle = CreateNamedPipeA(pipe_name, read_handle = CreateNamedPipeA(pipe_name,
PIPE_ACCESS_INBOUND | FILE_FLAG_OVERLAPPED, PIPE_ACCESS_INBOUND | FILE_FLAG_OVERLAPPED,
PIPE_TYPE_BYTE | PIPE_WAIT, PIPE_TYPE_BYTE | PIPE_WAIT,
@ -71,6 +84,7 @@ fileio_pipe(struct w32_io* pio[2]) {
goto error; goto error;
} }
/* connect to named pipe */
write_handle = CreateFileA(pipe_name, write_handle = CreateFileA(pipe_name,
GENERIC_WRITE, GENERIC_WRITE,
0, 0,
@ -84,6 +98,7 @@ fileio_pipe(struct w32_io* pio[2]) {
goto error; goto error;
} }
/* create w32_io objects encapsulating above handles */
pio_read = (struct w32_io*)malloc(sizeof(struct w32_io)); pio_read = (struct w32_io*)malloc(sizeof(struct w32_io));
pio_write = (struct w32_io*)malloc(sizeof(struct w32_io)); pio_write = (struct w32_io*)malloc(sizeof(struct w32_io));
@ -125,28 +140,33 @@ struct createFile_flags {
DWORD dwFlagsAndAttributes; DWORD dwFlagsAndAttributes;
}; };
/* maps open() file modes and flags to ones needed by CreateFile */
static int static int
createFile_flags_setup(int flags, int mode, struct createFile_flags* cf_flags) { createFile_flags_setup(int flags, int mode, struct createFile_flags* cf_flags) {
//check flags /* check flags */
int rwflags = flags & 0xf; int rwflags = flags & 0xf;
int c_s_flags = flags & 0xfffffff0; int c_s_flags = flags & 0xfffffff0;
//should be one of one of the following access modes: O_RDONLY, O_WRONLY, or O_RDWR /*
* should be one of one of the following access modes:
* O_RDONLY, O_WRONLY, or O_RDWR
*/
if ((rwflags != O_RDONLY) && (rwflags != O_WRONLY) && (rwflags != O_RDWR)) { if ((rwflags != O_RDONLY) && (rwflags != O_WRONLY) && (rwflags != O_RDWR)) {
debug("ERROR: wrong rw flags"); debug("ERROR: wrong rw flags");
errno = EINVAL; errno = EINVAL;
return -1; return -1;
} }
//only following create and status flags currently supported /*only following create and status flags currently supported*/
if (c_s_flags & ~(O_NONBLOCK | O_APPEND | O_CREAT | O_TRUNC | O_EXCL | O_BINARY)) { if (c_s_flags & ~(O_NONBLOCK | O_APPEND | O_CREAT | O_TRUNC
| O_EXCL | O_BINARY)) {
debug("ERROR: Unsupported flags: %d", flags); debug("ERROR: Unsupported flags: %d", flags);
errno = ENOTSUP; errno = ENOTSUP;
return -1; return -1;
} }
//validate mode /*validate mode*/
if (mode &~(S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH)) { if (mode &~(S_IRUSR | S_IWUSR | S_IRGRP | S_IROTH)) {
debug("ERROR: unsupported mode: %d", mode); debug("ERROR: unsupported mode: %d", mode);
errno = ENOTSUP; errno = ENOTSUP;
@ -186,18 +206,20 @@ createFile_flags_setup(int flags, int mode, struct createFile_flags* cf_flags) {
cf_flags->dwFlagsAndAttributes = FILE_FLAG_OVERLAPPED | SECURITY_IMPERSONATION; cf_flags->dwFlagsAndAttributes = FILE_FLAG_OVERLAPPED | SECURITY_IMPERSONATION;
//TODO - map mode /*TODO - map mode */
return 0; return 0;
} }
/* open() implementation. Uses CreateFile to open file, console, device, etc */
struct w32_io* struct w32_io*
fileio_open(const char *pathname, int flags, int mode) { fileio_open(const char *pathname, int flags, int mode) {
struct w32_io* pio = NULL; struct w32_io* pio = NULL;
struct createFile_flags cf_flags; struct createFile_flags cf_flags;
HANDLE handle; HANDLE handle;
//check input params debug2("pathname:%s, flags:%d, mode:%d", pathname, flags, mode);
/* check input params*/
if (pathname == NULL) { if (pathname == NULL) {
errno = EINVAL; errno = EINVAL;
debug("ERROR:%d", errno); debug("ERROR:%d", errno);
@ -208,7 +230,10 @@ fileio_open(const char *pathname, int flags, int mode) {
if (createFile_flags_setup(flags, mode, &cf_flags) == -1) if (createFile_flags_setup(flags, mode, &cf_flags) == -1)
return NULL; return NULL;
handle = CreateFileA(pathname, cf_flags.dwDesiredAccess, cf_flags.dwShareMode, &cf_flags.securityAttributes, cf_flags.dwCreationDisposition, cf_flags.dwFlagsAndAttributes, NULL); /* TODO - Use unicode version.*/
handle = CreateFileA(pathname, cf_flags.dwDesiredAccess, cf_flags.dwShareMode,
&cf_flags.securityAttributes, cf_flags.dwCreationDisposition,
cf_flags.dwFlagsAndAttributes, NULL);
if (handle == INVALID_HANDLE_VALUE) { if (handle == INVALID_HANDLE_VALUE) {
errno = errno_from_Win32LastError(); errno = errno_from_Win32LastError();
@ -238,38 +263,35 @@ VOID CALLBACK ReadCompletionRoutine(
_In_ DWORD dwNumberOfBytesTransfered, _In_ DWORD dwNumberOfBytesTransfered,
_Inout_ LPOVERLAPPED lpOverlapped _Inout_ LPOVERLAPPED lpOverlapped
) { ) {
struct w32_io* pio = (struct w32_io*)((char*)lpOverlapped - offsetof(struct w32_io, read_overlapped)); struct w32_io* pio =
debug2("pio:%p, pending_state:%d, error:%d, transferred:%d", pio, pio->read_details.pending, dwErrorCode, dwNumberOfBytesTransfered); (struct w32_io*)((char*)lpOverlapped - offsetof(struct w32_io, read_overlapped));
debug2("pio:%p, pending_state:%d, error:%d, received:%d",
pio, pio->read_details.pending, dwErrorCode, dwNumberOfBytesTransfered);
pio->read_details.error = dwErrorCode; pio->read_details.error = dwErrorCode;
pio->read_details.remaining = dwNumberOfBytesTransfered; pio->read_details.remaining = dwNumberOfBytesTransfered;
pio->read_details.completed = 0; pio->read_details.completed = 0;
pio->read_details.pending = FALSE; pio->read_details.pending = FALSE;
} }
#define READ_BUFFER_SIZE 100*1024 /* initiate an async read */
int int
fileio_ReadFileEx(struct w32_io* pio) { fileio_ReadFileEx(struct w32_io* pio) {
if (pio->read_details.buf == NULL) debug2("io:%p", pio);
{ if (pio->read_details.buf == NULL){
pio->read_details.buf = malloc(READ_BUFFER_SIZE); pio->read_details.buf = malloc(READ_BUFFER_SIZE);
if (!pio->read_details.buf) {
if (!pio->read_details.buf)
{
errno = ENOMEM; errno = ENOMEM;
debug2("ERROR: %d", errno);
return -1; return -1;
} }
pio->read_details.buf_size = READ_BUFFER_SIZE; pio->read_details.buf_size = READ_BUFFER_SIZE;
} }
if (ReadFileEx(pio->handle, pio->read_details.buf, pio->read_details.buf_size, &pio->read_overlapped, &ReadCompletionRoutine)) if (ReadFileEx(pio->handle, pio->read_details.buf, pio->read_details.buf_size,
{ &pio->read_overlapped, &ReadCompletionRoutine))
pio->read_details.pending = TRUE; pio->read_details.pending = TRUE;
} else {
else
{
errno = errno_from_Win32LastError(); errno = errno_from_Win32LastError();
return -1; return -1;
} }
@ -277,27 +299,36 @@ fileio_ReadFileEx(struct w32_io* pio) {
return 0; return 0;
} }
/* read() implementation */
int int
fileio_read(struct w32_io* pio, void *dst, unsigned int max) { fileio_read(struct w32_io* pio, void *dst, unsigned int max) {
int bytes_copied; int bytes_copied;
debug2("io:%p remaining:%d", pio, pio->read_details.remaining);
if ((pio->type == PIPE_FD) && (pio->internal.state == PIPE_WRITE_END)) { if ((pio->type == PIPE_FD) && (pio->internal.state == PIPE_WRITE_END)) {
debug("ERROR: read called on write end of pipe, io:%p", pio);
errno = EBADF; errno = EBADF;
return -1; return -1;
} }
//if read is pending /* if read is pending */
if (pio->read_details.pending) { if (pio->read_details.pending) {
if (w32_io_is_blocking(pio)) {
debug2("io is pending, blocking call made, io:%p", pio);
while (fileio_is_io_available(pio, TRUE) == FALSE) {
if (-1 == wait_for_any_event(NULL, 0, INFINITE))
return -1;
}
}
errno = EAGAIN; errno = EAGAIN;
debug2("IO is already pending"); debug2("io is already pending");
return -1; return -1;
} }
if (fileio_is_io_available(pio, TRUE) == FALSE) if (fileio_is_io_available(pio, TRUE) == FALSE) {
{
if (-1 == fileio_ReadFileEx(pio)) { if (-1 == fileio_ReadFileEx(pio)) {
if ((pio->type == PIPE_FD) && (errno == ERROR_NEGATIVE_SEEK)) {//write end of the pipe closed if ((pio->type == PIPE_FD) && (errno == ERROR_NEGATIVE_SEEK)) {
/* write end of the pipe closed */
debug2("no more data"); debug2("no more data");
errno = 0; errno = 0;
return 0; return 0;
@ -305,8 +336,9 @@ fileio_read(struct w32_io* pio, void *dst, unsigned int max) {
return -1; return -1;
} }
//Pick up APC if IO has completed /* pick up APC if IO has completed */
SleepEx(0, TRUE); if (-1 == wait_for_any_event(NULL, 0, 0))
return -1;
if (w32_io_is_blocking(pio)) { if (w32_io_is_blocking(pio)) {
while (fileio_is_io_available(pio, TRUE) == FALSE) { while (fileio_is_io_available(pio, TRUE) == FALSE) {
@ -323,8 +355,8 @@ fileio_read(struct w32_io* pio, void *dst, unsigned int max) {
if (pio->read_details.error) { if (pio->read_details.error) {
errno = errno_from_Win32Error(pio->read_details.error); errno = errno_from_Win32Error(pio->read_details.error);
if ((errno == ERROR_BROKEN_PIPE) || //write end of the pipe is closed or pipe broken /*write end of the pipe is closed or pipe broken or eof reached*/
(errno == ERROR_HANDLE_EOF)) { //end of file reached if ((errno == ERROR_BROKEN_PIPE) || (errno == ERROR_HANDLE_EOF)) {
debug2("no more data"); debug2("no more data");
errno = 0; errno = 0;
pio->read_details.error = 0; pio->read_details.error = 0;
@ -339,7 +371,8 @@ fileio_read(struct w32_io* pio, void *dst, unsigned int max) {
memcpy(dst, pio->read_details.buf + pio->read_details.completed, bytes_copied); memcpy(dst, pio->read_details.buf + pio->read_details.completed, bytes_copied);
pio->read_details.remaining -= bytes_copied; pio->read_details.remaining -= bytes_copied;
pio->read_details.completed += bytes_copied; pio->read_details.completed += bytes_copied;
debug2("read %d bytes", bytes_copied); debug2("io:%p read: %d remaining: %d", pio, bytes_copied,
pio->read_details.remaining);
return bytes_copied; return bytes_copied;
} }
@ -348,20 +381,25 @@ VOID CALLBACK WriteCompletionRoutine(
_In_ DWORD dwNumberOfBytesTransfered, _In_ DWORD dwNumberOfBytesTransfered,
_Inout_ LPOVERLAPPED lpOverlapped _Inout_ LPOVERLAPPED lpOverlapped
) { ) {
struct w32_io* pio = (struct w32_io*)((char*)lpOverlapped - offsetof(struct w32_io, write_overlapped)); struct w32_io* pio =
debug2("pio:%p, pending_state:%d, remaining:%d, error:%d, transferred:%d", pio, pio->write_details.pending, pio->write_details.remaining, dwErrorCode, dwNumberOfBytesTransfered); (struct w32_io*)((char*)lpOverlapped - offsetof(struct w32_io, write_overlapped));
debug2("pio:%p, pending_state:%d, error:%d, transferred:%d of remaining: %d",
pio, pio->write_details.pending, dwErrorCode, dwNumberOfBytesTransfered,
pio->write_details.remaining);
pio->write_details.error = dwErrorCode; pio->write_details.error = dwErrorCode;
//assert that remaining == dwNumberOfBytesTransfered /* TODO - assert that remaining == dwNumberOfBytesTransfered */
pio->write_details.remaining -= dwNumberOfBytesTransfered; pio->write_details.remaining -= dwNumberOfBytesTransfered;
pio->write_details.pending = FALSE; pio->write_details.pending = FALSE;
} }
#define WRITE_BUFFER_SIZE 100*1024 /* write() implementation */
int int
fileio_write(struct w32_io* pio, const void *buf, unsigned int max) { fileio_write(struct w32_io* pio, const void *buf, unsigned int max) {
int bytes_copied; int bytes_copied;
debug2("io:%p", pio);
if ((pio->type == PIPE_FD) && (pio->internal.state == PIPE_READ_END)) { if ((pio->type == PIPE_FD) && (pio->internal.state == PIPE_READ_END)) {
debug("ERROR: write called on a read end of pipe, io:%p", pio);
errno = EBADF; errno = EBADF;
return -1; return -1;
} }
@ -369,9 +407,7 @@ fileio_write(struct w32_io* pio, const void *buf, unsigned int max) {
if (pio->write_details.pending) { if (pio->write_details.pending) {
if (w32_io_is_blocking(pio)) if (w32_io_is_blocking(pio))
{ {
//this covers the scenario when the fd was previously non blocking (and hence io is still pending) debug2("io pending, blocking call made, io:%p", pio);
//wait for previous io to complete
debug2("waiting for prior unblocking write to complete before proceeding with this blocking write");
while (pio->write_details.pending) { while (pio->write_details.pending) {
if (wait_for_any_event(NULL, 0, INFINITE) == -1) if (wait_for_any_event(NULL, 0, INFINITE) == -1)
return -1; return -1;
@ -404,11 +440,12 @@ fileio_write(struct w32_io* pio, const void *buf, unsigned int max) {
bytes_copied = min(max, pio->write_details.buf_size); bytes_copied = min(max, pio->write_details.buf_size);
memcpy(pio->write_details.buf, buf, bytes_copied); memcpy(pio->write_details.buf, buf, bytes_copied);
if (WriteFileEx(pio->handle, pio->write_details.buf, bytes_copied, &pio->write_overlapped, &WriteCompletionRoutine)) if (WriteFileEx(pio->handle, pio->write_details.buf, bytes_copied,
{ &pio->write_overlapped, &WriteCompletionRoutine)) {
pio->write_details.pending = TRUE; pio->write_details.pending = TRUE;
//let APC execute if IO was complete /* execute APC if write has completed */
SleepEx(0, TRUE); if (wait_for_any_event(NULL, 0, 0) == -1)
return -1;
if (w32_io_is_blocking(pio)) { if (w32_io_is_blocking(pio)) {
while (pio->write_details.pending) { while (pio->write_details.pending) {
@ -422,12 +459,13 @@ fileio_write(struct w32_io* pio, const void *buf, unsigned int max) {
debug("ERROR:%d", errno); debug("ERROR:%d", errno);
return -1; return -1;
} }
debug2("reporting %d bytes written", bytes_copied);
return bytes_copied; return bytes_copied;
} }
else else {
{
errno = errno_from_Win32LastError(); errno = errno_from_Win32LastError();
if ((pio->type == PIPE_FD) && (errno == ERROR_NEGATIVE_SEEK)) {//read end of the pipe closed /* read end of the pipe closed */
if ((pio->type == PIPE_FD) && (errno == ERROR_NEGATIVE_SEEK)) {
debug("ERROR:read end of the pipe closed"); debug("ERROR:read end of the pipe closed");
errno = EPIPE; errno = EPIPE;
} }
@ -437,6 +475,7 @@ fileio_write(struct w32_io* pio, const void *buf, unsigned int max) {
} }
/* fstat() implemetation */
int int
fileio_fstat(struct w32_io* pio, struct stat *buf) { fileio_fstat(struct w32_io* pio, struct stat *buf) {
@ -453,13 +492,13 @@ fileio_fstat(struct w32_io* pio, struct stat *buf) {
return _fstat(fd, (struct _stat*)&buf); return _fstat(fd, (struct _stat*)&buf);
} }
/* isatty() implementation */
int int
fileio_isatty(struct w32_io* pio) { fileio_isatty(struct w32_io* pio) {
return (GetFileType(pio->handle) == FILE_TYPE_CHAR) ? TRUE : FALSE; return (GetFileType(pio->handle) == FILE_TYPE_CHAR) ? TRUE : FALSE;
} }
/* fdopen implementation */
FILE* FILE*
fileio_fdopen(struct w32_io* pio, const char *mode) { fileio_fdopen(struct w32_io* pio, const char *mode) {

View File

@ -21,26 +21,32 @@
#define STDOUT_FILENO 1 #define STDOUT_FILENO 1
#define STDERR_FILENO 2 #define STDERR_FILENO 2
//fcntl commands /*fcntl commands*/
#define F_GETFL 0x1 #define F_GETFL 0x1
#define F_SETFL 0x2 #define F_SETFL 0x2
#define F_GETFD 0x4 #define F_GETFD 0x4
#define F_SETFD 0x8 #define F_SETFD 0x8
//fd flags /*fd flags*/
#define FD_CLOEXEC 0x1 #define FD_CLOEXEC 0x1
//open access modes. only one of these can be specified /*open access modes. only one of these can be specified*/
#define O_RDONLY 0x1 #define O_RDONLY 0x1
#define O_WRONLY 0x2 #define O_WRONLY 0x2
#define O_RDWR 0x4 #define O_RDWR 0x4
//open file creation and file status flags // can be bitwise-or'd /*open file creation and file status flags // can be bitwise-or'd*/
#define O_NONBLOCK 0x10 //io operations wont block #define O_NONBLOCK 0x10 /*io operations wont block*/
#define O_APPEND 0x20 //file is opened in append mode #define O_APPEND 0x20 /*file is opened in append mode*/
#define O_CREAT 0x40 //If the file does not exist it will be created #define O_CREAT 0x40 /*If the file does not exist it will be created*/
#define O_TRUNC 0x80 //If the file exists and is a regular file, and the file is successfully opened O_RDWR or O_WRONLY, its length shall be truncated to 0, and the mode and owner shall be unchanged /*
#define O_EXCL 0x100 //If O_CREAT and O_EXCL are set, open() shall fail if the file exists * If the file exists and is a regular file, and the file is successfully
* opened O_RDWR or O_WRONLY, its length shall be truncated to 0, and the mode
* and owner shall be unchanged
*/
#define O_TRUNC 0x80
//If O_CREAT and O_EXCL are set, open() shall fail if the file exists
#define O_EXCL 0x100
#define O_BINARY 0x200 //Gives raw data (while O_TEXT normalises line endings #define O_BINARY 0x200 //Gives raw data (while O_TEXT normalises line endings
// open modes // open modes
#define S_IRUSR 00400 //user has read permission #define S_IRUSR 00400 //user has read permission

View File

@ -46,7 +46,8 @@ FILE* w32_fdopen(int fd, const char *mode);
/*common i/o*/ /*common i/o*/
int w32_close(int fd); int w32_close(int fd);
int w32_select(int fds, fd_set* readfds, fd_set* writefds, fd_set* exceptfds, const struct timeval *timeout); int w32_select(int fds, fd_set* readfds, fd_set* writefds, fd_set* exceptfds,
const struct timeval *timeout);
int w32_fcntl(int fd, int cmd, ... /* arg */); int w32_fcntl(int fd, int cmd, ... /* arg */);
int w32_dup(int oldfd); int w32_dup(int oldfd);
int w32_dup2(int oldfd, int newfd); int w32_dup2(int oldfd, int newfd);

View File

@ -5,24 +5,28 @@
#include "w32fd.h" #include "w32fd.h"
#include <errno.h> #include <errno.h>
//signal handlers /* signal handlers */
//signal queue /* signal queue */
//wakes on /*
// - any signals (errno = EINTR ) - TODO * Main wait routine used by all blocking calls.
// - any of the supplied events set * It wakes up on
// - any APCs caused by IO completions * - any signals (errno = EINTR ) - TODO
// - time out (errno = ETIMEOUT) * - any of the supplied events set
// Returns 0 on IO completion and -1 on rest * - any APCs caused by IO completions
// if milli_seconds is 0, this function returns 0, its called with 0 to execute any scheduled APCs * - time out (errno = ETIMEOUT)
* - Returns 0 on IO completion and -1 on rest
* if milli_seconds is 0, this function returns 0, its called with 0
* to execute any scheduled APCs
*/
int int
wait_for_any_event(HANDLE* events, int num_events, DWORD milli_seconds) wait_for_any_event(HANDLE* events, int num_events, DWORD milli_seconds)
{ {
//todo - implement signal catching and handling /* TODO - implement signal catching and handling */
if (num_events) if (num_events) {
{ DWORD ret = WaitForMultipleObjectsEx(num_events, events, FALSE,
DWORD ret = WaitForMultipleObjectsEx(num_events, events, FALSE, milli_seconds, TRUE); milli_seconds, TRUE);
if ((ret >= WAIT_OBJECT_0) && (ret <= WAIT_OBJECT_0 + num_events - 1)) { if ((ret >= WAIT_OBJECT_0) && (ret <= WAIT_OBJECT_0 + num_events - 1)) {
//woken up by event signalled //woken up by event signalled
return 0; return 0;
@ -37,14 +41,14 @@ wait_for_any_event(HANDLE* events, int num_events, DWORD milli_seconds)
debug("ERROR: wait timed out"); debug("ERROR: wait timed out");
return -1; return -1;
} }
else { //some other error /* some other error*/
else {
errno = EOTHER; errno = EOTHER;
debug("ERROR: unxpected wait end with error: %d", ret); debug("ERROR: unxpected wait end with error: %d", ret);
return -1; return -1;
} }
} }
else else {
{
DWORD ret = SleepEx(milli_seconds, TRUE); DWORD ret = SleepEx(milli_seconds, TRUE);
if (ret == WAIT_IO_COMPLETION) { if (ret == WAIT_IO_COMPLETION) {
return 0; return 0;

View File

@ -15,6 +15,7 @@
#define errno_from_WSALastError() errno_from_WSAError(WSAGetLastError()) #define errno_from_WSALastError() errno_from_WSAError(WSAGetLastError())
/* maps WSAError to errno */
static static
int errno_from_WSAError(int wsaerrno) int errno_from_WSAError(int wsaerrno)
{ {
@ -32,18 +33,21 @@ int errno_from_WSAError(int wsaerrno)
} }
} }
/* called before any other calls to socketio_ functions */
int int
socketio_initialize() { socketio_initialize() {
WSADATA wsaData = { 0 }; WSADATA wsaData = { 0 };
return WSAStartup(MAKEWORD(2, 2), &wsaData); return WSAStartup(MAKEWORD(2, 2), &wsaData);
} }
/* cleanup */
int int
socketio_done() { socketio_done() {
WSACleanup(); WSACleanup();
return 0; return 0;
} }
/* state info that needs to be persisted for an inprocess acceptEx call*/
struct acceptEx_context { struct acceptEx_context {
char lpOutputBuf[1024]; char lpOutputBuf[1024];
SOCKET accept_socket; SOCKET accept_socket;
@ -51,7 +55,7 @@ struct acceptEx_context {
DWORD bytes_received; DWORD bytes_received;
}; };
/* initiate async acceptEx*/
int int
socketio_acceptEx(struct w32_io* pio) { socketio_acceptEx(struct w32_io* pio) {
struct acceptEx_context *context; struct acceptEx_context *context;
@ -61,14 +65,16 @@ socketio_acceptEx(struct w32_io* pio) {
GUID GuidAcceptEx = WSAID_ACCEPTEX; GUID GuidAcceptEx = WSAID_ACCEPTEX;
DWORD dwBytes; DWORD dwBytes;
context = (struct acceptEx_context*)malloc(sizeof(struct acceptEx_context)); context =
(struct acceptEx_context*)malloc(sizeof(struct acceptEx_context));
if (context == NULL) { if (context == NULL) {
errno = ENOMEM; errno = ENOMEM;
debug("ERROR:%d, io:%p", errno, pio); debug("ERROR:%d, io:%p", errno, pio);
return -1; return -1;
} }
memset(context, 0, sizeof(struct acceptEx_context)); memset(context, 0, sizeof(struct acceptEx_context));
if (SOCKET_ERROR == WSAIoctl(pio->sock, SIO_GET_EXTENSION_FUNCTION_POINTER, if (SOCKET_ERROR == WSAIoctl(pio->sock,
SIO_GET_EXTENSION_FUNCTION_POINTER,
&GuidAcceptEx, sizeof(GuidAcceptEx), &GuidAcceptEx, sizeof(GuidAcceptEx),
&context->lpfnAcceptEx, sizeof(context->lpfnAcceptEx), &context->lpfnAcceptEx, sizeof(context->lpfnAcceptEx),
&dwBytes, NULL, NULL)) &dwBytes, NULL, NULL))
@ -85,9 +91,10 @@ socketio_acceptEx(struct w32_io* pio) {
else else
context = (struct acceptEx_context *)pio->internal.context; context = (struct acceptEx_context *)pio->internal.context;
//init overlapped event /* init overlapped event */
if (pio->read_overlapped.hEvent == NULL) { if (pio->read_overlapped.hEvent == NULL) {
if ((pio->read_overlapped.hEvent = CreateEvent(NULL, TRUE, FALSE, NULL)) == NULL) { pio->read_overlapped.hEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
if ((pio->read_overlapped.hEvent) == NULL) {
errno = ENOMEM; errno = ENOMEM;
debug("ERROR:%d, io:%p", errno, pio); debug("ERROR:%d, io:%p", errno, pio);
return -1; return -1;
@ -95,7 +102,7 @@ socketio_acceptEx(struct w32_io* pio) {
} }
ResetEvent(pio->read_overlapped.hEvent); ResetEvent(pio->read_overlapped.hEvent);
//create accepting socket /* create accepting socket */
context->accept_socket = socket(AF_UNSPEC, SOCK_STREAM, IPPROTO_TCP); context->accept_socket = socket(AF_UNSPEC, SOCK_STREAM, IPPROTO_TCP);
if (context->accept_socket == INVALID_SOCKET) { if (context->accept_socket == INVALID_SOCKET) {
errno = errno_from_WSALastError(); errno = errno_from_WSALastError();
@ -112,11 +119,11 @@ socketio_acceptEx(struct w32_io* pio) {
&context->bytes_received, &context->bytes_received,
&pio->read_overlapped)) &pio->read_overlapped))
{ {
//we are already connected. Set event so subsequent select will catch /* we are already connected. Set event so subsequent select will catch */
SetEvent(pio->read_overlapped.hEvent); SetEvent(pio->read_overlapped.hEvent);
} }
else { else {
//if overlapped io is in progress, we are good /* if overlapped io is in progress, we are good */
if (WSAGetLastError() != ERROR_IO_PENDING) { if (WSAGetLastError() != ERROR_IO_PENDING) {
errno = errno_from_WSALastError(); errno = errno_from_WSALastError();
debug("ERROR:%d, io:%p", errno, pio); debug("ERROR:%d, io:%p", errno, pio);
@ -136,7 +143,8 @@ CALLBACK WSARecvCompletionRoutine(
IN DWORD dwFlags IN DWORD dwFlags
) )
{ {
struct w32_io* pio = (struct w32_io*)((char*)lpOverlapped - offsetof(struct w32_io, read_overlapped)); struct w32_io* pio =
(struct w32_io*)((char*)lpOverlapped - offsetof(struct w32_io, read_overlapped));
debug2("io:%p, pending_state:%d, flags:%d, error:%d, received:%d", debug2("io:%p, pending_state:%d, flags:%d, error:%d, received:%d",
pio, pio->read_details.pending, dwFlags, dwError, cbTransferred); pio, pio->read_details.pending, dwFlags, dwError, cbTransferred);
if (!dwError && !cbTransferred) if (!dwError && !cbTransferred)
@ -147,6 +155,7 @@ CALLBACK WSARecvCompletionRoutine(
pio->read_details.pending = FALSE; pio->read_details.pending = FALSE;
} }
/* initiates async receive operation*/
int int
socketio_WSARecv(struct w32_io* pio, BOOL* completed) { socketio_WSARecv(struct w32_io* pio, BOOL* completed) {
int ret = 0; int ret = 0;
@ -157,7 +166,7 @@ socketio_WSARecv(struct w32_io* pio, BOOL* completed) {
if (completed) if (completed)
*completed = FALSE; *completed = FALSE;
//initialize recv buffers if needed /* initialize recv buffers if needed */
wsabuf.len = INTERNAL_RECV_BUFFER_SIZE; wsabuf.len = INTERNAL_RECV_BUFFER_SIZE;
if (pio->read_details.buf == NULL) if (pio->read_details.buf == NULL)
{ {
@ -177,23 +186,24 @@ socketio_WSARecv(struct w32_io* pio, BOOL* completed) {
wsabuf.buf = pio->read_details.buf; wsabuf.buf = pio->read_details.buf;
ret = WSARecv(pio->sock, &wsabuf, 1, NULL, &recv_flags, &pio->read_overlapped, &WSARecvCompletionRoutine); ret = WSARecv(pio->sock, &wsabuf, 1, NULL, &recv_flags, &pio->read_overlapped,
&WSARecvCompletionRoutine);
if (ret == 0) if (ret == 0)
{ {
pio->read_details.pending = TRUE; pio->read_details.pending = TRUE;
//receive has completed but APC is pending to be scheduled /* receive has completed but APC is pending to be scheduled */
debug2("WSARecv returned 0, io:%p", pio); debug2("WSARecv returned 0, io:%p", pio);
if (completed) if (completed)
*completed = TRUE; *completed = TRUE;
} }
else { //(ret == SOCKET_ERROR) else { /* (ret == SOCKET_ERROR) */
if (WSAGetLastError() == WSA_IO_PENDING) if (WSAGetLastError() == WSA_IO_PENDING)
{ {
//io is initiated and pending /* io is initiated and pending */
debug2("WSARecv reported IO pending"); debug2("WSARecv reported IO pending");
pio->read_details.pending = TRUE; pio->read_details.pending = TRUE;
} }
else { //failed else {
errno = errno_from_WSALastError(); errno = errno_from_WSALastError();
debug("ERROR: io:%p %d", pio, errno); debug("ERROR: io:%p %d", pio, errno);
return -1; return -1;
@ -203,6 +213,7 @@ socketio_WSARecv(struct w32_io* pio, BOOL* completed) {
return 0; return 0;
} }
/* implements socket() */
struct w32_io* struct w32_io*
socketio_socket(int domain, int type, int protocol) { socketio_socket(int domain, int type, int protocol) {
struct w32_io *pio = (struct w32_io*)malloc(sizeof(struct w32_io)); struct w32_io *pio = (struct w32_io*)malloc(sizeof(struct w32_io));
@ -234,9 +245,12 @@ socketio_socket(int domain, int type, int protocol) {
return ret; \ return ret; \
} while (0) } while (0)
/* implements setsockopt() */
int int
socketio_setsockopt(struct w32_io* pio, int level, int optname, const char* optval, int optlen) { socketio_setsockopt(struct w32_io* pio, int level, int optname, const char* optval,
if ((optname == SO_KEEPALIVE) || (optname == SO_REUSEADDR) || (optname == TCP_NODELAY) || (optname == IPV6_V6ONLY)) int optlen) {
if ((optname == SO_KEEPALIVE) || (optname == SO_REUSEADDR) ||
(optname == TCP_NODELAY) || (optname == IPV6_V6ONLY))
SET_ERRNO_ON_ERROR(setsockopt(pio->sock, level, optname, optval, optlen)); SET_ERRNO_ON_ERROR(setsockopt(pio->sock, level, optname, optval, optlen));
else { else {
debug("ERROR: unsupported optname:%d io:%p", optname, pio); debug("ERROR: unsupported optname:%d io:%p", optname, pio);
@ -245,21 +259,25 @@ socketio_setsockopt(struct w32_io* pio, int level, int optname, const char* optv
} }
} }
/* implements getsockopt() */
int int
socketio_getsockopt(struct w32_io* pio, int level, int optname, char* optval, int* optlen) { socketio_getsockopt(struct w32_io* pio, int level, int optname, char* optval, int* optlen) {
SET_ERRNO_ON_ERROR(getsockopt(pio->sock, level, optname, optval, optlen)); SET_ERRNO_ON_ERROR(getsockopt(pio->sock, level, optname, optval, optlen));
} }
/* implements getsockname() */
int int
socketio_getsockname(struct w32_io* pio, struct sockaddr* name, int* namelen) { socketio_getsockname(struct w32_io* pio, struct sockaddr* name, int* namelen) {
SET_ERRNO_ON_ERROR(getsockname(pio->sock, name, namelen)); SET_ERRNO_ON_ERROR(getsockname(pio->sock, name, namelen));
} }
/* implements getpeername */
int int
socketio_getpeername(struct w32_io* pio, struct sockaddr* name, int* namelen) { socketio_getpeername(struct w32_io* pio, struct sockaddr* name, int* namelen) {
SET_ERRNO_ON_ERROR(getpeername(pio->sock, name, namelen)); SET_ERRNO_ON_ERROR(getpeername(pio->sock, name, namelen));
} }
/* implements listen() */
int int
socketio_listen(struct w32_io* pio, int backlog) { socketio_listen(struct w32_io* pio, int backlog) {
if (SOCKET_ERROR == listen(pio->sock, backlog)) { if (SOCKET_ERROR == listen(pio->sock, backlog)) {
@ -271,11 +289,13 @@ socketio_listen(struct w32_io* pio, int backlog) {
return 0; return 0;
} }
/* implements bind() */
int int
socketio_bind(struct w32_io* pio, const struct sockaddr *name, int namelen) { socketio_bind(struct w32_io* pio, const struct sockaddr *name, int namelen) {
SET_ERRNO_ON_ERROR(bind(pio->sock, name, namelen)); SET_ERRNO_ON_ERROR(bind(pio->sock, name, namelen));
} }
/* implements recv() */
int int
socketio_recv(struct w32_io* pio, void *buf, size_t len, int flags) { socketio_recv(struct w32_io* pio, void *buf, size_t len, int flags) {
BOOL completed = FALSE; BOOL completed = FALSE;
@ -294,11 +314,11 @@ socketio_recv(struct w32_io* pio, void *buf, size_t len, int flags) {
return -1; return -1;
} }
//if io is already pending /* /io is initiated and pending */
if (pio->read_details.pending) { if (pio->read_details.pending) {
//if recv is now in blocking mode, wait for data to be available /* if recv is now in blocking mode, wait for data to be available */
if (w32_io_is_blocking(pio)) { if (w32_io_is_blocking(pio)) {
debug2("socket was previously non-blocing but now in blocking mode, waiting for io"); debug2("io is pending, call is blocking, io:%p", pio);
while (socketio_is_io_available(pio, TRUE) == FALSE) { while (socketio_is_io_available(pio, TRUE) == FALSE) {
if (0 != wait_for_any_event(NULL, 0, INFINITE)) if (0 != wait_for_any_event(NULL, 0, INFINITE))
return -1; return -1;
@ -311,23 +331,25 @@ socketio_recv(struct w32_io* pio, void *buf, size_t len, int flags) {
} }
} }
//if we have some buffer copy it and return #bytes copied /* if we have some buffer copy it and return #bytes copied */
if (pio->read_details.remaining) if (pio->read_details.remaining)
{ {
int num_bytes_copied = min((int)len, pio->read_details.remaining); int num_bytes_copied = min((int)len, pio->read_details.remaining);
memcpy(buf, pio->read_details.buf + pio->read_details.completed, num_bytes_copied); memcpy(buf, pio->read_details.buf + pio->read_details.completed,
num_bytes_copied);
pio->read_details.remaining -= num_bytes_copied; pio->read_details.remaining -= num_bytes_copied;
pio->read_details.completed += num_bytes_copied; pio->read_details.completed += num_bytes_copied;
debug2("returning %d bytes from prior completed IO, remaining:%d, io:%p", num_bytes_copied, pio->read_details.remaining, pio); debug2("returning %d bytes from prior completed IO, remaining:%d, io:%p",
num_bytes_copied, pio->read_details.remaining, pio);
return num_bytes_copied; return num_bytes_copied;
} }
//if there was an error on async call, return /* if there was an error on async call, return */
if (pio->read_details.error) { if (pio->read_details.error) {
if (pio->read_details.error == ERROR_GRACEFUL_DISCONNECT) { if (pio->read_details.error == ERROR_GRACEFUL_DISCONNECT) {
debug2("connection closed, io:%p", pio); debug2("connection closed, io:%p", pio);
//connection is closed /* connection is closed */
return 0; return 0;
} }
else { else {
@ -342,11 +364,12 @@ socketio_recv(struct w32_io* pio, void *buf, size_t len, int flags) {
return -1; return -1;
if (completed) { if (completed) {
//Let APC be scheduled /* Let APC be scheduled */
debug2("Letting APC to execute"); debug2("Letting APC to execute");
SleepEx(1, TRUE); if (wait_for_any_event(NULL, 0, 0) == -1)
return -1;
if (pio->read_details.pending) { if (pio->read_details.pending) {
//this shouldn't be happening /* this shouldn't be happening */
errno = EOTHER; errno = EOTHER;
debug("ERROR: Unexpected IO state, io:%p", pio); debug("ERROR: Unexpected IO state, io:%p", pio);
return -1; return -1;
@ -355,7 +378,7 @@ socketio_recv(struct w32_io* pio, void *buf, size_t len, int flags) {
if (w32_io_is_blocking(pio)) if (w32_io_is_blocking(pio))
{ {
//wait until io is done /* wait until io is done */
debug3("socket in blocking mode"); debug3("socket in blocking mode");
while (socketio_is_io_available(pio, TRUE) == FALSE) { while (socketio_is_io_available(pio, TRUE) == FALSE) {
if (0 != wait_for_any_event(NULL, 0, INFINITE)) if (0 != wait_for_any_event(NULL, 0, INFINITE))
@ -370,11 +393,14 @@ socketio_recv(struct w32_io* pio, void *buf, size_t len, int flags) {
} }
} }
//by this time we should have some bytes in internal buffer or an error from callback /*
* by this time we should have some bytes in internal buffer
* or an error from callback
*/
if (pio->read_details.error) if (pio->read_details.error)
{ {
if (pio->read_details.error == ERROR_GRACEFUL_DISCONNECT) { if (pio->read_details.error == ERROR_GRACEFUL_DISCONNECT) {
//connection is closed /* connection is closed */
debug2("connection closed, io:%p", pio); debug2("connection closed, io:%p", pio);
return 0; return 0;
} }
@ -391,11 +417,12 @@ socketio_recv(struct w32_io* pio, void *buf, size_t len, int flags) {
memcpy(buf, pio->read_details.buf, num_bytes_copied); memcpy(buf, pio->read_details.buf, num_bytes_copied);
pio->read_details.remaining -= num_bytes_copied; pio->read_details.remaining -= num_bytes_copied;
pio->read_details.completed = num_bytes_copied; pio->read_details.completed = num_bytes_copied;
debug2("returning %d bytes from completed IO, remaining:%d, io:%p", num_bytes_copied, pio->read_details.remaining, pio); debug2("returning %d bytes from completed IO, remaining:%d, io:%p",
num_bytes_copied, pio->read_details.remaining, pio);
return num_bytes_copied; return num_bytes_copied;
} }
else { else {
//this should not happen /* this should not happen */
errno = EOTHER; errno = EOTHER;
debug("ERROR:Unexpected IO stated, io:%p", pio); debug("ERROR:Unexpected IO stated, io:%p", pio);
return -1; return -1;
@ -411,14 +438,18 @@ CALLBACK WSASendCompletionRoutine(
IN DWORD dwFlags IN DWORD dwFlags
) )
{ {
struct w32_io* pio = (struct w32_io*)((char*)lpOverlapped - offsetof(struct w32_io, write_overlapped)); struct w32_io* pio =
debug2("io:%p, pending_state:%d, error:%d, sent:%d of remaining:%d", pio, pio->write_details.pending, dwError, cbTransferred, pio->write_details.remaining); (struct w32_io*)((char*)lpOverlapped - offsetof(struct w32_io, write_overlapped));
debug2("io:%p, pending_state:%d, error:%d, sent:%d of remaining:%d",
pio, pio->write_details.pending, dwError, cbTransferred,
pio->write_details.remaining);
pio->write_details.error = dwError; pio->write_details.error = dwError;
//assert that remaining == cbTransferred /* TODO - assert that remaining == cbTransferred */
pio->write_details.remaining -= cbTransferred; pio->write_details.remaining -= cbTransferred;
pio->write_details.pending = FALSE; pio->write_details.pending = FALSE;
} }
/* implementation of send() */
int int
socketio_send(struct w32_io* pio, const void *buf, size_t len, int flags) { socketio_send(struct w32_io* pio, const void *buf, size_t len, int flags) {
int ret = 0; int ret = 0;
@ -438,14 +469,12 @@ socketio_send(struct w32_io* pio, const void *buf, size_t len, int flags) {
return -1; return -1;
} }
//if io is already pending /* if io is already pending */
if (pio->write_details.pending) if (pio->write_details.pending)
{ {
if (w32_io_is_blocking(pio)) if (w32_io_is_blocking(pio))
{ {
//this covers the scenario when the fd was previously non blocking (and hence io is still pending) debug2("io is pending, call is blocking, io:%p", pio);
//wait for previous io to complete
debug2("waiting for IO on a previous nonblocking send to complete, io:%p", pio);
while (pio->write_details.pending) { while (pio->write_details.pending) {
if (wait_for_any_event(NULL, 0, INFINITE) == -1) if (wait_for_any_event(NULL, 0, INFINITE) == -1)
return -1; return -1;
@ -465,7 +494,7 @@ socketio_send(struct w32_io* pio, const void *buf, size_t len, int flags) {
return -1; return -1;
} }
//initialize buffers if needed /* initialize buffers if needed */
wsabuf.len = INTERNAL_SEND_BUFFER_SIZE; wsabuf.len = INTERNAL_SEND_BUFFER_SIZE;
if (pio->write_details.buf == NULL) if (pio->write_details.buf == NULL)
{ {
@ -487,63 +516,66 @@ socketio_send(struct w32_io* pio, const void *buf, size_t len, int flags) {
wsabuf.len = min(wsabuf.len, (int)len); wsabuf.len = min(wsabuf.len, (int)len);
memcpy(wsabuf.buf, buf, wsabuf.len); memcpy(wsabuf.buf, buf, wsabuf.len);
//implement flags support if needed /* implement flags support if needed */
ret = WSASend(pio->sock, &wsabuf, 1, NULL, 0, &pio->write_overlapped, &WSASendCompletionRoutine); ret = WSASend(pio->sock, &wsabuf, 1, NULL, 0, &pio->write_overlapped,
&WSASendCompletionRoutine);
if (ret == 0) if (ret == 0)
{ {
//send has completed and APC is scheduled, let it run /* send has completed and APC is scheduled, let it run */
debug2("WSASend returned 0, APC scheduled io:%p", pio); debug2("WSASend returned 0, APC scheduled io:%p", pio);
pio->write_details.pending = TRUE; pio->write_details.pending = TRUE;
pio->write_details.remaining = wsabuf.len; pio->write_details.remaining = wsabuf.len;
SleepEx(1, TRUE); if (wait_for_any_event(NULL, 0, 0) == -1)
return -1;
if ((pio->write_details.pending) || (pio->write_details.remaining != 0)) { if ((pio->write_details.pending) || (pio->write_details.remaining != 0)) {
errno = EOTHER; errno = EOTHER;
debug("ERROR: Unexpected IO state, io:%p", pio); debug("ERROR: Unexpected IO state, io:%p", pio);
return -1; return -1;
} }
//return num of bytes written /* return num of bytes written */
return wsabuf.len; return wsabuf.len;
} }
else { //(ret == SOCKET_ERROR) else {
if (WSAGetLastError() == WSA_IO_PENDING) if (WSAGetLastError() == WSA_IO_PENDING)
{ {
//io is initiated and pending /* io is initiated and pending */
debug2("WSASend reported IO pending, io:%p", pio); debug2("WSASend reported IO pending, io:%p", pio);
pio->write_details.pending = TRUE; pio->write_details.pending = TRUE;
pio->write_details.remaining = wsabuf.len; pio->write_details.remaining = wsabuf.len;
if (w32_io_is_blocking(pio)) if (w32_io_is_blocking(pio))
{ {
//wait until io is done /* wait until io is done */
debug3("waiting as socket is in blocking mode, io:%p", pio); debug3("waiting as socket is in blocking mode, io:%p", pio);
while (pio->write_details.pending) while (pio->write_details.pending)
SleepEx(INFINITE, TRUE); if (wait_for_any_event(NULL, 0,INFINITE) == -1)
return -1;
} }
debug3("returning %d", wsabuf.len); debug3("returning %d", wsabuf.len);
return wsabuf.len; return wsabuf.len;
} }
else { //failed else {
errno = errno_from_WSALastError(); errno = errno_from_WSALastError();
debug("ERROR:%d, io:%p", errno, pio); debug("ERROR:%d, io:%p", errno, pio);
return -1; return -1;
} }
} }
} }
/* shutdown() implementation */
int int
socketio_shutdown(struct w32_io* pio, int how) { socketio_shutdown(struct w32_io* pio, int how) {
SET_ERRNO_ON_ERROR(shutdown(pio->sock, how)); SET_ERRNO_ON_ERROR(shutdown(pio->sock, how));
} }
/* socket close() implementation */
int int
socketio_close(struct w32_io* pio) { socketio_close(struct w32_io* pio) {
debug2("io:%p", pio); debug2("io:%p", pio);
closesocket(pio->sock); closesocket(pio->sock);
//wait for pending io to abort /* wait for pending io to abort */
SleepEx(0, TRUE); SleepEx(0, TRUE);
if (pio->read_details.pending || pio->write_details.pending) if (pio->read_details.pending || pio->write_details.pending)
debug2("IO is still pending on closed socket. read:%d, write:%d, io:%p", pio->read_details.pending, pio->write_details.pending, pio); debug2("IO is still pending on closed socket. read:%d, write:%d, io:%p", pio->read_details.pending, pio->write_details.pending, pio);
@ -557,7 +589,7 @@ socketio_close(struct w32_io* pio) {
closesocket(ctx->accept_socket); closesocket(ctx->accept_socket);
if (ctx->lpOutputBuf) if (ctx->lpOutputBuf)
free(ctx->lpOutputBuf); free(ctx->lpOutputBuf);
//TODO: check why this is crashing /* TODO: check why this is crashing */
//free(pio->internal.context); //free(pio->internal.context);
} }
@ -578,6 +610,7 @@ socketio_close(struct w32_io* pio) {
return 0; return 0;
} }
/* accept() implementation */
struct w32_io* struct w32_io*
socketio_accept(struct w32_io* pio, struct sockaddr* addr, int* addrlen) { socketio_accept(struct w32_io* pio, struct sockaddr* addr, int* addrlen) {
struct w32_io *accept_io = NULL; struct w32_io *accept_io = NULL;
@ -585,7 +618,7 @@ socketio_accept(struct w32_io* pio, struct sockaddr* addr, int* addrlen) {
struct acceptEx_context* context; struct acceptEx_context* context;
debug2("io:%p", pio); debug2("io:%p", pio);
//start io if not already started /* start io if not already started */
if (pio->read_details.pending == FALSE) { if (pio->read_details.pending == FALSE) {
if (socketio_acceptEx(pio) != 0) { if (socketio_acceptEx(pio) != 0) {
return NULL; return NULL;
@ -593,17 +626,18 @@ socketio_accept(struct w32_io* pio, struct sockaddr* addr, int* addrlen) {
} }
if (w32_io_is_blocking(pio)) { if (w32_io_is_blocking(pio)) {
// block until accept io is complete /* block until accept io is complete */
while (FALSE == socketio_is_io_available(pio, TRUE)) while (FALSE == socketio_is_io_available(pio, TRUE))
{ {
if (0 != wait_for_any_event(&pio->read_overlapped.hEvent, 1, INFINITE)) if (0 != wait_for_any_event(&pio->read_overlapped.hEvent,
1, INFINITE))
{ {
return NULL; return NULL;
} }
} }
} }
else { else {
//if i/o is not ready /* if i/o is not ready */
if (FALSE == socketio_is_io_available(pio, TRUE)) { if (FALSE == socketio_is_io_available(pio, TRUE)) {
errno = EAGAIN; errno = EAGAIN;
debug2("accept is pending, io:%p", pio); debug2("accept is pending, io:%p", pio);
@ -616,23 +650,21 @@ socketio_accept(struct w32_io* pio, struct sockaddr* addr, int* addrlen) {
pio->read_details.pending = FALSE; pio->read_details.pending = FALSE;
ResetEvent(pio->read_overlapped.hEvent); ResetEvent(pio->read_overlapped.hEvent);
if (pio->read_details.error) if (pio->read_details.error) {
{
errno = errno_from_WSAError(pio->read_details.error); errno = errno_from_WSAError(pio->read_details.error);
debug("ERROR: async io completed with error: %d, io:%p", errno, pio); debug("ERROR: async io completed with error: %d, io:%p", errno, pio);
goto on_error; goto on_error;
} }
if (0 != setsockopt(context->accept_socket, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, (char*)&pio->sock, sizeof(pio->sock))) if (0 != setsockopt(context->accept_socket, SOL_SOCKET,
{ SO_UPDATE_ACCEPT_CONTEXT, (char*)&pio->sock, sizeof(pio->sock))) {
errno = errno_from_WSALastError(); errno = errno_from_WSALastError();
debug("ERROR: setsockopt failed:%d, io:%p", errno, pio); debug("ERROR: setsockopt failed:%d, io:%p", errno, pio);
goto on_error; goto on_error;
} }
accept_io = (struct w32_io*)malloc(sizeof(struct w32_io)); accept_io = (struct w32_io*)malloc(sizeof(struct w32_io));
if (!accept_io) if (!accept_io) {
{
errno = ENOMEM; errno = ENOMEM;
debug("ERROR:%d, io:%p", errno, pio); debug("ERROR:%d, io:%p", errno, pio);
goto on_error; goto on_error;
@ -644,7 +676,7 @@ socketio_accept(struct w32_io* pio, struct sockaddr* addr, int* addrlen) {
context->accept_socket = INVALID_SOCKET; context->accept_socket = INVALID_SOCKET;
debug2("accept io:%p", accept_io); debug2("accept io:%p", accept_io);
//TODO : fill in addr /* TODO : fill in addr */
return accept_io; return accept_io;
on_error: on_error:
@ -656,6 +688,7 @@ on_error:
return NULL; return NULL;
} }
/* initiates an async connect*/
int int
socketio_connectex(struct w32_io* pio, const struct sockaddr* name, int namelen) { socketio_connectex(struct w32_io* pio, const struct sockaddr* name, int namelen) {
@ -667,6 +700,7 @@ socketio_connectex(struct w32_io* pio, const struct sockaddr* name, int namelen)
GUID connectex_guid = WSAID_CONNECTEX; GUID connectex_guid = WSAID_CONNECTEX;
LPFN_CONNECTEX ConnectEx; LPFN_CONNECTEX ConnectEx;
debug("io:%p", pio);
if (name->sa_family == AF_INET6) { if (name->sa_family == AF_INET6) {
ZeroMemory(&tmp_addr6, sizeof(tmp_addr6)); ZeroMemory(&tmp_addr6, sizeof(tmp_addr6));
tmp_addr6.sin6_family = AF_INET6; tmp_addr6.sin6_family = AF_INET6;
@ -704,22 +738,21 @@ socketio_connectex(struct w32_io* pio, const struct sockaddr* name, int namelen)
return -1; return -1;
} }
if ((!pio->write_overlapped.hEvent) && ((pio->write_overlapped.hEvent = CreateEvent(NULL, TRUE, FALSE, NULL)) == NULL)) { if ((!pio->write_overlapped.hEvent)
&& ((pio->write_overlapped.hEvent = CreateEvent(NULL, TRUE, FALSE, NULL)) == NULL)) {
errno = ENOMEM; errno = ENOMEM;
debug("ERROR:%d, io:%p", errno, pio); debug("ERROR:%d, io:%p", errno, pio);
return -1; return -1;
} }
ResetEvent(pio->write_overlapped.hEvent); ResetEvent(pio->write_overlapped.hEvent);
if (TRUE == ConnectEx(pio->sock, name, namelen, NULL, 0, NULL, &pio->write_overlapped)) if (TRUE == ConnectEx(pio->sock, name, namelen, NULL, 0, NULL,
{ &pio->write_overlapped)) {
//set completion event that indicates to other routines that async connect has completed /* set completion event to indicates that async connect has completed */
SetEvent(pio->write_overlapped.hEvent); SetEvent(pio->write_overlapped.hEvent);
} }
else else {
{ if (WSAGetLastError() != ERROR_IO_PENDING) {
if (WSAGetLastError() != ERROR_IO_PENDING)
{
CloseHandle(pio->write_overlapped.hEvent); CloseHandle(pio->write_overlapped.hEvent);
pio->write_overlapped.hEvent = 0; pio->write_overlapped.hEvent = 0;
errno = errno_from_WSALastError(); errno = errno_from_WSALastError();
@ -733,27 +766,25 @@ socketio_connectex(struct w32_io* pio, const struct sockaddr* name, int namelen)
return 0; return 0;
} }
/* connect implementation */
int int
socketio_connect(struct w32_io* pio, const struct sockaddr* name, int namelen) { socketio_connect(struct w32_io* pio, const struct sockaddr* name, int namelen) {
if (pio->write_details.pending == FALSE) if (pio->write_details.pending == FALSE) {
{
if (-1 == socketio_connectex(pio, name, namelen)) if (-1 == socketio_connectex(pio, name, namelen))
return -1; return -1;
} }
if (w32_io_is_blocking(pio)) { if (w32_io_is_blocking(pio)) {
// block until connect io is complete /* block until connect io is complete */
while (FALSE == socketio_is_io_available(pio, TRUE)) while (FALSE == socketio_is_io_available(pio, TRUE)) {
{ if (0 != wait_for_any_event(&pio->write_overlapped.hEvent,
if (0 != wait_for_any_event(&pio->write_overlapped.hEvent, 1, INFINITE)) 1, INFINITE))
{
return -1; return -1;
} }
} }
}
else { else {
//if i/o is not ready /* if i/o is not ready */
if (FALSE == socketio_is_io_available(pio, TRUE)) { if (FALSE == socketio_is_io_available(pio, TRUE)) {
errno = EINPROGRESS; errno = EINPROGRESS;
debug2("connect is in progress, io:%p", pio); debug2("connect is in progress, io:%p", pio);
@ -768,34 +799,37 @@ socketio_connect(struct w32_io* pio, const struct sockaddr* name, int namelen) {
return -1; return -1;
} }
if (0 != setsockopt(pio->sock, SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, NULL, 0)) if (0 != setsockopt(pio->sock, SOL_SOCKET, SO_UPDATE_CONNECT_CONTEXT, NULL, 0)) {
{
errno = errno_from_WSALastError(); errno = errno_from_WSALastError();
debug("ERROR: setsockopt failed:%d, io:%p", errno, pio); debug("ERROR: setsockopt failed:%d, io:%p", errno, pio);
return -1; return -1;
} }
//Reset any state used during connect /* Reset any state used during connect */
//close event handle /* close event handle */
CloseHandle(pio->write_overlapped.hEvent); CloseHandle(pio->write_overlapped.hEvent);
ZeroMemory(&pio->write_details, sizeof(pio->write_details)); ZeroMemory(&pio->write_details, sizeof(pio->write_details));
pio->internal.state = SOCK_CONNECTED; pio->internal.state = SOCK_CONNECTED;
return 0; return 0;
} }
/* checks if a given io is ready/available */
BOOL BOOL
socketio_is_io_available(struct w32_io* pio, BOOL rd) { socketio_is_io_available(struct w32_io* pio, BOOL rd) {
if ((pio->internal.state == SOCK_LISTENING) || (pio->internal.state == SOCK_CONNECTING)) { if ((pio->internal.state == SOCK_LISTENING) ||
(pio->internal.state == SOCK_CONNECTING)) {
DWORD numBytes = 0; DWORD numBytes = 0;
DWORD flags; DWORD flags;
BOOL sock_listening = (pio->internal.state == SOCK_LISTENING); BOOL sock_listening = (pio->internal.state == SOCK_LISTENING);
OVERLAPPED *overlapped = sock_listening ? &pio->read_overlapped : &pio->write_overlapped; OVERLAPPED *overlapped =
BOOL pending = sock_listening ? pio->read_details.pending : pio->write_details.pending; sock_listening ? &pio->read_overlapped : &pio->write_overlapped;
BOOL pending =
sock_listening ? pio->read_details.pending : pio->write_details.pending;
if (pending && WSAGetOverlappedResult(pio->sock, overlapped, &numBytes, FALSE, &flags)) { if (pending && WSAGetOverlappedResult(pio->sock, overlapped,
&numBytes, FALSE, &flags))
return TRUE; return TRUE;
}
else { else {
if (pending && WSAGetLastError() != WSA_IO_INCOMPLETE) { if (pending && WSAGetLastError() != WSA_IO_INCOMPLETE) {
if (sock_listening) if (sock_listening)
@ -813,7 +847,7 @@ socketio_is_io_available(struct w32_io* pio, BOOL rd) {
else else
return FALSE; return FALSE;
} }
else { //write else {
return (pio->write_details.pending == FALSE) ? TRUE : FALSE; return (pio->write_details.pending == FALSE) ? TRUE : FALSE;
} }

View File

@ -22,7 +22,6 @@ static struct w32fd_table fd_table;
/* static table entries representing std in, out and error*/ /* static table entries representing std in, out and error*/
static struct w32_io w32_io_stdin, w32_io_stdout, w32_io_stderr; static struct w32_io w32_io_stdin, w32_io_stdout, w32_io_stderr;
/* maps pio to fd (specified by index)*/
void fd_table_set(struct w32_io* pio, int index); void fd_table_set(struct w32_io* pio, int index);
#pragma warning(disable:4312) #pragma warning(disable:4312)
@ -73,6 +72,7 @@ fd_table_get_min_index() {
return min_index; return min_index;
} }
/* maps pio to fd (specified by index)*/
static void static void
fd_table_set(struct w32_io* pio, int index) { fd_table_set(struct w32_io* pio, int index) {
fd_table.w32_ios[index] = pio; fd_table.w32_ios[index] = pio;
@ -81,6 +81,7 @@ fd_table_set(struct w32_io* pio, int index) {
FD_SET(index, &(fd_table.occupied)); FD_SET(index, &(fd_table.occupied));
} }
/* removes entry at index from mapping table*/
static void static void
fd_table_clear(int index) fd_table_clear(int index)
{ {
@ -103,12 +104,17 @@ w32posix_done() {
socketio_done(); socketio_done();
} }
/* Check if the corresponding fd is set blocking */
BOOL BOOL
w32_io_is_blocking(struct w32_io* pio) w32_io_is_blocking(struct w32_io* pio)
{ {
return (pio->fd_status_flags & O_NONBLOCK) ? FALSE : TRUE; return (pio->fd_status_flags & O_NONBLOCK) ? FALSE : TRUE;
} }
/*
* Check if io is ready/available. This function is primarily used by select()
* as it decides on what fds can be set.
*/
BOOL BOOL
w32_io_is_io_available(struct w32_io* pio, BOOL rd) { w32_io_is_io_available(struct w32_io* pio, BOOL rd) {
if (pio->type == SOCK_FD) if (pio->type == SOCK_FD)
@ -362,7 +368,8 @@ w32_close(int fd) {
CHECK_FD(fd); CHECK_FD(fd);
pio = fd_table.w32_ios[fd]; pio = fd_table.w32_ios[fd];
debug("io:%p, type:%d, fd:%d, table_index:%d", pio, pio->type, fd, pio->table_index); debug("io:%p, type:%d, fd:%d, table_index:%d", pio, pio->type, fd,
pio->table_index);
fd_table_clear(pio->table_index); fd_table_clear(pio->table_index);
if ((pio->type == SOCK_FD)) if ((pio->type == SOCK_FD))
return socketio_close(pio); return socketio_close(pio);
@ -397,7 +404,8 @@ w32_fcntl(int fd, int cmd, ... /* arg */) {
} }
int int
w32_select(int fds, fd_set* readfds, fd_set* writefds, fd_set* exceptfds, const struct timeval *timeout) { w32_select(int fds, fd_set* readfds, fd_set* writefds, fd_set* exceptfds,
const struct timeval *timeout) {
ULONGLONG ticks_start = GetTickCount64(), ticks_now; ULONGLONG ticks_start = GetTickCount64(), ticks_now;
fd_set read_ready_fds, write_ready_fds; fd_set read_ready_fds, write_ready_fds;
HANDLE events[32]; HANDLE events[32];
@ -443,7 +451,7 @@ w32_select(int fds, fd_set* readfds, fd_set* writefds, fd_set* exceptfds, const
} }
} }
//if none of input fds are set return error /* if none of input fds are set return error */
if (in_set_fds == 0) { if (in_set_fds == 0) {
errno = EINVAL; errno = EINVAL;
debug("ERROR: empty fd_sets"); debug("ERROR: empty fd_sets");
@ -451,13 +459,17 @@ w32_select(int fds, fd_set* readfds, fd_set* writefds, fd_set* exceptfds, const
} }
debug2("Total in fds:%d", in_set_fds); debug2("Total in fds:%d", in_set_fds);
//start async io on selected fds if needed and pick up any events that select needs to listen on /*
* start async io on selected fds if needed and pick up any events
* that select needs to listen on
*/
for (int i = 0; i < fds; i++) { for (int i = 0; i < fds; i++) {
if (readfds && FD_ISSET(i, readfds)) { if (readfds && FD_ISSET(i, readfds)) {
if (w32_io_on_select(fd_table.w32_ios[i], TRUE) == -1) if (w32_io_on_select(fd_table.w32_ios[i], TRUE) == -1)
return -1; return -1;
if ((fd_table.w32_ios[i]->type == SOCK_FD) && (fd_table.w32_ios[i]->internal.state == SOCK_LISTENING)) { if ((fd_table.w32_ios[i]->type == SOCK_FD)
&& (fd_table.w32_ios[i]->internal.state == SOCK_LISTENING)) {
events[num_events++] = fd_table.w32_ios[i]->read_overlapped.hEvent; events[num_events++] = fd_table.w32_ios[i]->read_overlapped.hEvent;
} }
} }
@ -465,17 +477,18 @@ w32_select(int fds, fd_set* readfds, fd_set* writefds, fd_set* exceptfds, const
if (writefds && FD_ISSET(i, writefds)) { if (writefds && FD_ISSET(i, writefds)) {
if (w32_io_on_select(fd_table.w32_ios[i], FALSE) == -1) if (w32_io_on_select(fd_table.w32_ios[i], FALSE) == -1)
return -1; return -1;
if ((fd_table.w32_ios[i]->type == SOCK_FD) && (fd_table.w32_ios[i]->internal.state == SOCK_CONNECTING)) { if ((fd_table.w32_ios[i]->type == SOCK_FD)
&& (fd_table.w32_ios[i]->internal.state == SOCK_CONNECTING)) {
events[num_events++] = fd_table.w32_ios[i]->write_overlapped.hEvent; events[num_events++] = fd_table.w32_ios[i]->write_overlapped.hEvent;
} }
} }
} }
//excute any scheduled APCs /* excute any scheduled APCs */
if (0 != wait_for_any_event(NULL, 0, 0)) if (0 != wait_for_any_event(NULL, 0, 0))
return -1; return -1;
//see if any io is ready /* see if any io is ready */
for (i = 0; i < fds; i++) { for (i = 0; i < fds; i++) {
if (readfds && FD_ISSET(i, readfds)) { if (readfds && FD_ISSET(i, readfds)) {
@ -493,7 +506,7 @@ w32_select(int fds, fd_set* readfds, fd_set* writefds, fd_set* exceptfds, const
} }
} }
//if io on some fds is already ready, return /* if io on some fds is already ready, return */
if (out_ready_fds) { if (out_ready_fds) {
if (readfds) if (readfds)
*readfds = read_ready_fds; *readfds = read_ready_fds;
@ -511,10 +524,11 @@ w32_select(int fds, fd_set* readfds, fd_set* writefds, fd_set* exceptfds, const
return -1; return -1;
} }
if (0 != wait_for_any_event(events, num_events, time_milliseconds - ((ticks_now - ticks_start) & 0xffffffff))) if (0 != wait_for_any_event(events, num_events,
time_milliseconds - ((ticks_now - ticks_start) & 0xffffffff)))
return -1; return -1;
//check on fd status /* check on fd status */
out_ready_fds = 0; out_ready_fds = 0;
for (int i = 0; i < fds; i++) { for (int i = 0; i < fds; i++) {

View File

@ -32,7 +32,8 @@ enum w32_io_pipe_state {
}; };
/* /*
* This sturcture encapsulates the state info needed to map a File Descriptor to Win32 Handle * This sturcture encapsulates the state info needed to map a File Descriptor
* to Win32 Handle
*/ */
struct w32_io { struct w32_io {
OVERLAPPED read_overlapped; OVERLAPPED read_overlapped;
@ -41,8 +42,10 @@ struct w32_io {
/*internal read buffer*/ /*internal read buffer*/
char *buf; char *buf;
DWORD buf_size; DWORD buf_size;
DWORD remaining; /*bytes in internal buffer remaining to be read by application*/ /*bytes in internal buffer remaining to be read by application*/
DWORD completed; /*bytes in internal buffer already read by application*/ DWORD remaining;
/*bytes in internal buffer already read by application*/
DWORD completed;
BOOL pending; /*waiting on a read operation to complete*/ BOOL pending; /*waiting on a read operation to complete*/
DWORD error; /*error reported on async read or accept completion*/ DWORD error; /*error reported on async read or accept completion*/
}read_details; }read_details;
@ -50,13 +53,16 @@ struct w32_io {
/*internal write buffer*/ /*internal write buffer*/
char *buf; char *buf;
DWORD buf_size; DWORD buf_size;
DWORD remaining; /*bytes in internal buffer remaining to be written to network*/ /*bytes in internal buffer remaining to be written to network*/
DWORD completed; /*bytes in internal buffer already written to network*/ DWORD remaining;
/*bytes in internal buffer already written to network*/
DWORD completed;
BOOL pending; /*waiting on a write operation to complete*/ BOOL pending; /*waiting on a write operation to complete*/
DWORD error; /*error reported on async write or connect completion*/ DWORD error; /*error reported on async write or connect completion*/
}write_details; }write_details;
int table_index; /*index at which this object is stored in fd_table*/ /*index at which this object is stored in fd_table*/
int table_index;
enum w32_io_type type; /*hanldle type*/ enum w32_io_type type; /*hanldle type*/
DWORD fd_flags; /*fd flags from POSIX*/ DWORD fd_flags; /*fd flags from POSIX*/
DWORD fd_status_flags; /*fd status flags from POSIX*/ DWORD fd_status_flags; /*fd status flags from POSIX*/
@ -74,16 +80,8 @@ struct w32_io {
}internal; }internal;
}; };
/* Check if the corresponding fd is set blocking */
BOOL w32_io_is_blocking(struct w32_io*); BOOL w32_io_is_blocking(struct w32_io*);
/*
* Check if io is ready/available. This function is primarily used by select() as it decides on what fds can be set.
*/
BOOL w32_io_is_io_available(struct w32_io* pio, BOOL rd); BOOL w32_io_is_io_available(struct w32_io* pio, BOOL rd);
/*
* Main wait routine used by all blocking calls. It wakes up on IO completions, timers, timeouts and signals.
*/
int wait_for_any_event(HANDLE* events, int num_events, DWORD milli_seconds); int wait_for_any_event(HANDLE* events, int num_events, DWORD milli_seconds);
/*POSIX mimic'ing socket API*/ /*POSIX mimic'ing socket API*/
@ -93,8 +91,10 @@ BOOL socketio_is_io_available(struct w32_io* pio, BOOL rd);
int socketio_on_select(struct w32_io* pio, BOOL rd); int socketio_on_select(struct w32_io* pio, BOOL rd);
struct w32_io* socketio_socket(int domain, int type, int protocol); struct w32_io* socketio_socket(int domain, int type, int protocol);
struct w32_io* socketio_accept(struct w32_io* pio, struct sockaddr* addr, int* addrlen); struct w32_io* socketio_accept(struct w32_io* pio, struct sockaddr* addr, int* addrlen);
int socketio_setsockopt(struct w32_io* pio, int level, int optname, const char* optval, int optlen); int socketio_setsockopt(struct w32_io* pio, int level, int optname,
int socketio_getsockopt(struct w32_io* pio, int level, int optname, char* optval, int* optlen); const char* optval, int optlen);
int socketio_getsockopt(struct w32_io* pio, int level, int optname,
char* optval, int* optlen);
int socketio_getsockname(struct w32_io* pio, struct sockaddr* name, int* namelen); int socketio_getsockname(struct w32_io* pio, struct sockaddr* name, int* namelen);
int socketio_getpeername(struct w32_io* pio, struct sockaddr* name, int* namelen); int socketio_getpeername(struct w32_io* pio, struct sockaddr* name, int* namelen);
int socketio_listen(struct w32_io* pio, int backlog); int socketio_listen(struct w32_io* pio, int backlog);
@ -105,7 +105,6 @@ int socketio_send(struct w32_io* pio, const void *buf, size_t len, int flags);
int socketio_shutdown(struct w32_io* pio, int how); int socketio_shutdown(struct w32_io* pio, int how);
int socketio_close(struct w32_io* pio); int socketio_close(struct w32_io* pio);
/*POSIX mimic'ing file API*/ /*POSIX mimic'ing file API*/
BOOL fileio_is_io_available(struct w32_io* pio, BOOL rd); BOOL fileio_is_io_available(struct w32_io* pio, BOOL rd);
int fileio_on_select(struct w32_io* pio, BOOL rd); int fileio_on_select(struct w32_io* pio, BOOL rd);