This commit is contained in:
Manoj Ampalam 2016-01-12 14:38:05 -08:00
parent fa9331e2d5
commit 36c5c9e89f
9 changed files with 447 additions and 284 deletions

View File

@ -83,15 +83,7 @@
</Link> </Link>
</ItemDefinitionGroup> </ItemDefinitionGroup>
<ItemGroup> <ItemGroup>
<ClInclude Include="stdafx.h" /> <ClCompile Include="socketiotests.cpp" />
<ClInclude Include="targetver.h" />
</ItemGroup>
<ItemGroup>
<ClCompile Include="stdafx.cpp">
<PrecompiledHeader Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">Create</PrecompiledHeader>
<PrecompiledHeader Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">Create</PrecompiledHeader>
</ClCompile>
<ClCompile Include="unittest1.cpp" />
</ItemGroup> </ItemGroup>
<Import Project="$(VCTargetsPath)\Microsoft.Cpp.targets" /> <Import Project="$(VCTargetsPath)\Microsoft.Cpp.targets" />
<ImportGroup Label="ExtensionTargets"> <ImportGroup Label="ExtensionTargets">

View File

@ -15,18 +15,7 @@
</Filter> </Filter>
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<ClInclude Include="stdafx.h"> <ClCompile Include="socketiotests.cpp">
<Filter>Header Files</Filter>
</ClInclude>
<ClInclude Include="targetver.h">
<Filter>Header Files</Filter>
</ClInclude>
</ItemGroup>
<ItemGroup>
<ClCompile Include="stdafx.cpp">
<Filter>Source Files</Filter>
</ClCompile>
<ClCompile Include="unittest1.cpp">
<Filter>Source Files</Filter> <Filter>Source Files</Filter>
</ClCompile> </ClCompile>
</ItemGroup> </ItemGroup>

View File

@ -5,12 +5,69 @@ extern "C" {
using namespace Microsoft::VisualStudio::CppUnitTestFramework; using namespace Microsoft::VisualStudio::CppUnitTestFramework;
#define DEFAULT_PORT "27015"
namespace UnitTests namespace UnitTests
{ {
TEST_CLASS(UnitTest1) TEST_CLASS(SocketIOTests)
{ {
public: public:
struct addrinfo *result = NULL;
struct addrinfo hints;
int ListenSocket = -1;
TEST_METHOD_INITIALIZE(TestMethodInitialize)
{
int iResult;
w32posix_initialize();
ZeroMemory(&hints, sizeof(hints));
hints.ai_family = AF_INET;
hints.ai_socktype = SOCK_STREAM;
hints.ai_protocol = IPPROTO_TCP;
hints.ai_flags = AI_PASSIVE;
iResult = getaddrinfo(NULL, DEFAULT_PORT, &hints, &result);
if (iResult != 0) {
printf("getaddrinfo failed with error: %d\n", iResult);
return;
}
ListenSocket = socket(result->ai_family, result->ai_socktype, result->ai_protocol);
if (ListenSocket == -1) {
printf("socket failed with error: %ld\n", errno);
return;
}
// Setup the TCP listening socket
iResult = bind(ListenSocket, result->ai_addr, (int)result->ai_addrlen);
if (iResult == -1) {
printf("bind failed with error: %d\n", errno);
return ;
}
iResult = listen(ListenSocket, SOMAXCONN);
if (iResult == -1) {
printf("listen failed with error: %d\n", errno);
return;
}
freeaddrinfo(result);
}
TEST_METHOD_CLEANUP(TestMethodCleanup)
{
if (result)
freeaddrinfo(result);
if (ListenSocket != -1)
close(ListenSocket);
w32posix_done();
}
TEST_METHOD(TestMethod1) TEST_METHOD(TestMethod1)
{ {
// TODO: Your test code here // TODO: Your test code here

View File

@ -0,0 +1,50 @@
#include "w32fd.h"
#include <errno.h>
//signal handlers
//signal queue
//wakes on
// - any signals (errno = EINTR )
// - any of the supplied events set
// - any APCs caused by IO completions
int wait_for_any_event(HANDLE* events, int num_events, DWORD milli_seconds)
{
//todo - implement signal catching and handling
if (num_events)
{
DWORD ret = WaitForMultipleObjectsEx(num_events, events, FALSE, milli_seconds, TRUE);
if ((ret >= WAIT_OBJECT_0) && (ret <= WAIT_OBJECT_0 + num_events - 1)) {
//woken up by event signalled
return 0;
}
else if (ret == WAIT_IO_COMPLETION) {
return 0;
}
else if (ret == WAIT_TIMEOUT) {
errno = ETIMEDOUT;
return -1;
}
else { //some other error
errno = EOTHER;
return -1;
}
}
else
{
DWORD ret = SleepEx(milli_seconds, TRUE);
if (ret == WAIT_IO_COMPLETION) {
return 0;
}
else if (ret == 0) {
//timed out
errno = ETIMEDOUT;
return -1;
}
else { //some other error
errno = EOTHER;
return -1;
}
}
}

View File

@ -7,6 +7,8 @@
#define INTERNAL_BUFFER_SIZE 100*1024 //100KB #define INTERNAL_BUFFER_SIZE 100*1024 //100KB
#define INTERNAL_RECV_BUFFER_SIZE 70*1024 //70KB
static int getWSAErrno() static int getWSAErrno()
{ {
int wsaerrno = WSAGetLastError(); int wsaerrno = WSAGetLastError();
@ -47,6 +49,153 @@ int socketio_done() {
return 0; return 0;
} }
struct acceptEx_context {
char lpOutputBuf[1024];
SOCKET accept_socket;
LPFN_ACCEPTEX lpfnAcceptEx;
DWORD bytes_received;
};
int socketio_acceptEx(struct w32_io* pio) {
struct acceptEx_context *context;
if (pio->context == NULL) {
GUID GuidAcceptEx = WSAID_ACCEPTEX;
DWORD dwBytes;
context = (struct acceptEx_context*)malloc(sizeof(struct acceptEx_context));
if (context == NULL) {
errno = ENOMEM;
return -1;
}
if (SOCKET_ERROR == WSAIoctl(pio->sock, SIO_GET_EXTENSION_FUNCTION_POINTER,
&GuidAcceptEx, sizeof (GuidAcceptEx),
&context->lpfnAcceptEx, sizeof (context->lpfnAcceptEx),
&dwBytes, NULL, NULL))
{
free(context);
errno = getWSAErrno();
return -1;
}
context->accept_socket = INVALID_SOCKET;
pio->context = context;
}
else
context = (struct acceptEx_context *)pio->context;
//init overlapped event
if (pio->read_overlapped.hEvent == NULL) {
if ((pio->read_overlapped.hEvent = CreateEvent(NULL, TRUE, FALSE, NULL)) == NULL) {
errno = ENOMEM;
return -1;
}
}
ResetEvent(pio->read_overlapped.hEvent);
//create accepting socket
//todo - get socket parameters from listening socket
context->accept_socket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (context->accept_socket == INVALID_SOCKET) {
errno = getWSAErrno();
return -1;
}
if (TRUE == context->lpfnAcceptEx(pio->sock,
context->accept_socket,
context->lpOutputBuf,
0,
sizeof(struct sockaddr_in) + 16,
sizeof(struct sockaddr_in) + 16,
&context->bytes_received,
&pio->read_overlapped))
{
//we are already connected. Set event so subsequent select will catch
SetEvent(pio->read_overlapped.hEvent);
}
else {
//if overlapped io is in progress, we are good
if (WSAGetLastError() != ERROR_IO_PENDING) {
errno = getWSAErrno();
return -1;
}
}
pio->read_details.pending = TRUE;
return 0;
}
#define RECV_CONNECTION_CLOSED 1234
void CALLBACK WSARecvCompletionRoutine(
IN DWORD dwError,
IN DWORD cbTransferred,
IN LPWSAOVERLAPPED lpOverlapped,
IN DWORD dwFlags
)
{
struct w32_io* pio = (struct w32_io*)((char*)lpOverlapped - offsetof(struct w32_io, read_overlapped));
if (!dwError && !cbTransferred)
dwError = RECV_CONNECTION_CLOSED;
pio->read_details.error = dwError;
pio->read_details.remaining = cbTransferred;
pio->read_details.completed = 0;
pio->read_details.pending = FALSE;
}
int socketio_WSARecv(struct w32_io* pio, BOOL* completed) {
int ret = 0;
WSABUF wsabuf;
DWORD recv_flags = 0;
if (completed)
*completed = FALSE;
//initialize recv buffers if needed
wsabuf.len = INTERNAL_RECV_BUFFER_SIZE;
if (pio->read_details.buf == NULL)
{
wsabuf.buf = malloc(wsabuf.len);
if (!wsabuf.buf)
{
errno = ENOMEM;
return -1;
}
pio->read_details.buf = wsabuf.buf;
pio->read_details.buf_size = wsabuf.len;
}
else
wsabuf.buf = pio->read_details.buf;
//TODO - implement flags if any needed for OpenSSH
ret = WSARecv(pio->sock, &wsabuf, 1, NULL, &recv_flags, &pio->read_overlapped, &WSARecvCompletionRoutine);
if (ret == 0)
{
pio->read_details.pending = TRUE;
//receive has completed but APC is pending to be scheduled
if (completed)
*completed = TRUE;
}
else { //(ret == SOCKET_ERROR)
if (WSAGetLastError() == WSA_IO_PENDING)
{
//io is initiated and pending
pio->read_details.pending = TRUE;
}
else { //failed
errno = getWSAErrno();
return -1;
}
}
return 0;
}
struct w32_io* socketio_socket(int domain, int type, int protocol) { struct w32_io* socketio_socket(int domain, int type, int protocol) {
struct w32_io *pio = (struct w32_io*)malloc(sizeof(struct w32_io)); struct w32_io *pio = (struct w32_io*)malloc(sizeof(struct w32_io));
if (!pio) { if (!pio) {
@ -95,51 +244,21 @@ int socketio_connect(struct w32_io* pio, const struct sockaddr* name, int namele
return set_errno_on_error(connect(pio->sock, name, namelen)); return set_errno_on_error(connect(pio->sock, name, namelen));
} }
void CALLBACK WSARecvCompletionRoutine(
IN DWORD dwError,
IN DWORD cbTransferred,
IN LPWSAOVERLAPPED lpOverlapped,
IN DWORD dwFlags
)
{
struct w32_io* pio = (struct w32_io*)((char*)lpOverlapped - offsetof(struct w32_io, read_overlapped));
pio->read_details.error = dwError;
pio->read_details.remaining = cbTransferred;
pio->read_details.completed = 0;
pio->read_details.pending = FALSE;
}
int socketio_recv(struct w32_io* pio, void *buf, size_t len, int flags) { int socketio_recv(struct w32_io* pio, void *buf, size_t len, int flags) {
int ret = 0; BOOL completed = FALSE;
WSABUF wsabuf;
DWORD recv_flags = 0; if ((buf == NULL) || (len == 0)){
errno = EPERM;
return -1;
}
//if io is already pending //if io is already pending
if (pio->read_details.pending) if (pio->read_details.pending) {
{
errno = EAGAIN; errno = EAGAIN;
return -1; return -1;
} }
//initialize recv buffers if needed //if we have some buffer copy it and return #bytes copied
wsabuf.len = INTERNAL_BUFFER_SIZE;
if (pio->read_details.buf == NULL)
{
wsabuf.buf = malloc(wsabuf.len);
if (!wsabuf.buf)
{
errno = ENOMEM;
return -1;
}
pio->read_details.buf = wsabuf.buf;
pio->read_details.buf_size = wsabuf.len;
}
else
wsabuf.buf = pio->read_details.buf;
//if we have some buffer copy it and retun #bytes copied
if (pio->read_details.remaining) if (pio->read_details.remaining)
{ {
int num_bytes_copied = min(len, pio->read_details.remaining); int num_bytes_copied = min(len, pio->read_details.remaining);
@ -149,39 +268,44 @@ int socketio_recv(struct w32_io* pio, void *buf, size_t len, int flags) {
return num_bytes_copied; return num_bytes_copied;
} }
//TODO - implement flags if any needed for OpenSSH //if there was an error on async call, return
ret = WSARecv(pio->sock, &wsabuf, 1, NULL, &recv_flags, &pio->read_overlapped, &WSARecvCompletionRoutine); if (pio->read_details.error) {
if (ret == 0) if (pio->read_details.error == RECV_CONNECTION_CLOSED) {
{ //connection is closed
//receive has completed and APC is scheduled, let it run return 0;
pio->read_details.pending = TRUE; }
else {
//todo - get qualified error
errno = EOTHER;
pio->read_details.error = 0;
return -1;
}
}
if (0 != socketio_WSARecv(pio, &completed))
return -1;
if (completed) {
//Let APC be scheduled
SleepEx(1, TRUE); SleepEx(1, TRUE);
if (pio->read_details.pending) { if (pio->read_details.pending) {
//unexpected internal error //this shouldn't be happening
errno = EOTHER; errno = EOTHER;
return -1; return -1;
} }
} }
else { //(ret == SOCKET_ERROR)
if (WSAGetLastError() == WSA_IO_PENDING)
{
//io is initiated and pending
pio->read_details.pending = TRUE;
if (w32_io_is_blocking(pio)) if (w32_io_is_blocking(pio))
{ {
//wait until io is done //wait until io is done
while (pio->read_details.pending) while (socketio_is_io_available(pio, TRUE) == FALSE) {
SleepEx(INFINITE, TRUE); if (0 != wait_for_any_event(NULL, 0, INFINITE))
}
else {
errno = EAGAIN;
return -1; return -1;
}
} }
else { //failed }
errno = getWSAErrno(); else {
if (socketio_is_io_available(pio, TRUE) == FALSE) {
errno = EAGAIN;
return -1; return -1;
} }
} }
@ -189,8 +313,16 @@ int socketio_recv(struct w32_io* pio, void *buf, size_t len, int flags) {
//by this time we should have some bytes in internal buffer or an error from callback //by this time we should have some bytes in internal buffer or an error from callback
if (pio->read_details.error) if (pio->read_details.error)
{ {
errno = EOTHER; if (pio->read_details.error == RECV_CONNECTION_CLOSED) {
return -1; //connection is closed
return 0;
}
else {
//todo - get qualified error
errno = EOTHER;
pio->read_details.error = 0;
return -1;
}
} }
if (pio->read_details.remaining) { if (pio->read_details.remaining) {
@ -200,8 +332,10 @@ int socketio_recv(struct w32_io* pio, void *buf, size_t len, int flags) {
pio->read_details.completed = num_bytes_copied; pio->read_details.completed = num_bytes_copied;
return num_bytes_copied; return num_bytes_copied;
} }
else { //connection is closed else {
return 0; //this should not happen
errno = EOTHER;
return -1;
} }
} }
@ -231,6 +365,11 @@ int socketio_send(struct w32_io* pio, const void *buf, size_t len, int flags) {
return -1; return -1;
} }
if (pio->write_details.error) {
errno = EOTHER;
return -1;
}
//initialize buffers if needed //initialize buffers if needed
wsabuf.len = INTERNAL_BUFFER_SIZE; wsabuf.len = INTERNAL_BUFFER_SIZE;
if (pio->write_details.buf == NULL) if (pio->write_details.buf == NULL)
@ -299,6 +438,9 @@ int socketio_shutdown(struct w32_io* pio, int how) {
int socketio_close(struct w32_io* pio) { int socketio_close(struct w32_io* pio) {
closesocket(pio->sock); closesocket(pio->sock);
//todo- wait for pending io to abort
SleepEx(1, TRUE);
if (pio->type == LISTEN_FD) { if (pio->type == LISTEN_FD) {
if (pio->read_overlapped.hEvent) if (pio->read_overlapped.hEvent)
CloseHandle(pio->read_overlapped.hEvent); CloseHandle(pio->read_overlapped.hEvent);
@ -313,21 +455,46 @@ int socketio_close(struct w32_io* pio) {
free(pio->write_details.buf); free(pio->write_details.buf);
} }
//todo- wait for pending io to abort
free(pio); free(pio);
return 0; return 0;
} }
struct acceptEx_context {
char lpOutputBuf[1024];
SOCKET accept_socket;
LPFN_ACCEPTEX lpfnAcceptEx;
DWORD bytes_received;
};
struct w32_io* socketio_accept(struct w32_io* pio, struct sockaddr* addr, int* addrlen) { struct w32_io* socketio_accept(struct w32_io* pio, struct sockaddr* addr, int* addrlen) {
struct w32_io *accept_io = NULL; struct w32_io *accept_io = NULL;
int iResult = 0;
struct acceptEx_context* context = (struct acceptEx_context*)pio->context;
//start io if not already started
if (pio->read_details.pending == FALSE) {
if (socketio_acceptEx(pio) != 0) {
return NULL;
}
}
if (w32_io_is_blocking(pio)) {
// block until accept io is complete
while (FALSE == socketio_is_io_available(pio, TRUE))
{
if (0 != wait_for_any_event(&pio->read_overlapped.hEvent, 1, INFINITE))
{
return NULL;
}
}
}
else {
//if i/o is not ready
if (FALSE == socketio_is_io_available(pio, TRUE)) {
errno = EAGAIN;
return NULL;
}
}
if (0 != setsockopt(context->accept_socket, SOL_SOCKET, SO_UPDATE_ACCEPT_CONTEXT, (char*)&pio->sock, sizeof(pio->sock)))
{
errno = getWSAErrno();
return NULL;
}
accept_io = (struct w32_io*)malloc(sizeof(struct w32_io)); accept_io = (struct w32_io*)malloc(sizeof(struct w32_io));
if (!accept_io) if (!accept_io)
@ -337,40 +504,17 @@ struct w32_io* socketio_accept(struct w32_io* pio, struct sockaddr* addr, int* a
} }
memset(accept_io, 0, sizeof(struct w32_io)); memset(accept_io, 0, sizeof(struct w32_io));
if (w32_io_is_blocking(pio)) { accept_io->sock = context->accept_socket;
accept_io->sock = accept(pio->sock, addr, addrlen); accept_io->type = SOCK_FD;
if (accept_io->sock == INVALID_SOCKET) { context->accept_socket = INVALID_SOCKET;
errno = getWSAErrno(); pio->read_details.pending = FALSE;
free(accept_io); ResetEvent(pio->read_overlapped.hEvent);
return NULL;
}
}
else {
//ensure i/o is ready
if (FALSE == socketio_is_ioready(pio, TRUE)) {
free(accept_io);
errno = EAGAIN;
return NULL;
}
struct acceptEx_context* context = (struct acceptEx_context*)pio->context;
accept_io->sock = context->accept_socket;
context->accept_socket = INVALID_SOCKET;
pio->read_details.pending = FALSE;
ResetEvent(pio->read_overlapped.hEvent);
}
pio->type = SOCK_FD;
return accept_io; return accept_io;
} }
BOOL socketio_is_ioready(struct w32_io* pio, BOOL rd) { BOOL socketio_is_io_available(struct w32_io* pio, BOOL rd) {
struct acceptEx_context* context = (struct acceptEx_context*)pio->context; struct acceptEx_context* context = (struct acceptEx_context*)pio->context;
if (w32_io_is_blocking(pio))
return FALSE;
if (pio->type == LISTEN_FD) { if (pio->type == LISTEN_FD) {
DWORD numBytes = 0; DWORD numBytes = 0;
DWORD flags; DWORD flags;
@ -384,95 +528,38 @@ BOOL socketio_is_ioready(struct w32_io* pio, BOOL rd) {
return FALSE; return FALSE;
} }
} }
else { //regular socket else if (rd){
//todo if (pio->read_details.remaining || pio->read_details.error)
return FALSE; return TRUE;
else
return FALSE;
}
else { //write
return (pio->write_details.pending == FALSE)? TRUE : FALSE;
} }
} }
int socketio_start_asyncio(struct w32_io* pio, BOOL rd) { int socketio_on_select(struct w32_io* pio, BOOL rd) {
if (w32_io_is_blocking(pio)) { if (rd && pio->read_details.pending)
errno = EPERM; return 0;
return -1;
} if (!rd && pio->write_details.pending)
return 0;
if (pio->type == LISTEN_FD) { if (pio->type == LISTEN_FD) {
if (!pio->read_details.pending) { if (socketio_acceptEx(pio) != 0)
struct acceptEx_context *context; return -1;
return 0;
if (pio->context == NULL) {
GUID GuidAcceptEx = WSAID_ACCEPTEX;
DWORD dwBytes;
context = (struct acceptEx_context*)malloc(sizeof(struct acceptEx_context));
if (context == NULL) {
errno = ENOMEM;
return -1;
}
if (SOCKET_ERROR == WSAIoctl(pio->sock, SIO_GET_EXTENSION_FUNCTION_POINTER,
&GuidAcceptEx, sizeof (GuidAcceptEx),
&context->lpfnAcceptEx, sizeof (context->lpfnAcceptEx),
&dwBytes, NULL, NULL))
{
free(context);
errno = getWSAErrno();
return -1;
}
context->accept_socket = INVALID_SOCKET;
pio->context = context;
}
else
context = (struct acceptEx_context *)pio->context;
//init overlapped event
if (pio->read_overlapped.hEvent == NULL) {
if ((pio->read_overlapped.hEvent = CreateEvent(NULL, TRUE, FALSE, NULL)) == NULL) {
errno = ENOMEM;
return -1;
}
}
ResetEvent(pio->read_overlapped.hEvent);
//create accepting socket
//todo - get socket parameters from listening socket
context->accept_socket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (context->accept_socket == INVALID_SOCKET) {
errno = getWSAErrno();
return -1;
}
if (TRUE == context->lpfnAcceptEx(pio->sock,
context->accept_socket,
context->lpOutputBuf,
0,
sizeof(struct sockaddr_in) + 16,
sizeof(struct sockaddr_in) + 16,
&context->bytes_received,
&pio->read_overlapped))
{
//we are already connected. Set event so subsequent select will catch
SetEvent(pio->read_overlapped.hEvent);
}
else {
//if overlapped io is in progress, we are good
if (WSAGetLastError() != ERROR_IO_PENDING) {
errno = getWSAErrno();
return -1;
}
}
pio->read_details.pending = TRUE;
return 0;
}
else //io is already pending
return 0;
} }
else { //type == SOCK_FD else if (rd) {
return -1; if (socketio_WSARecv(pio, NULL) != 0)
return -1;
return 0;
}
else {
//nothing to start for write
return 0;
} }
} }

View File

@ -70,9 +70,9 @@ BOOL w32_io_is_blocking(struct w32_io* pio)
return (pio->fd_status_flags & O_NONBLOCK) ? FALSE : TRUE; return (pio->fd_status_flags & O_NONBLOCK) ? FALSE : TRUE;
} }
BOOL w32_io_is_ioready(struct w32_io* pio, BOOL rd) { BOOL w32_io_is_io_available(struct w32_io* pio, BOOL rd) {
if ((pio->type == LISTEN_FD) || (pio->type == SOCK_FD)) { if ((pio->type == LISTEN_FD) || (pio->type == SOCK_FD)) {
return socketio_is_ioready(pio, rd); return socketio_is_io_available(pio, rd);
} }
else { else {
//return fileio_is_ready(pio); //return fileio_is_ready(pio);
@ -81,13 +81,13 @@ BOOL w32_io_is_ioready(struct w32_io* pio, BOOL rd) {
} }
int w32_io_start_asyncio(struct w32_io* pio, BOOL rd) int w32_io_on_select(struct w32_io* pio, BOOL rd)
{ {
if ((pio->type == LISTEN_FD) || (pio->type == SOCK_FD)) { if ((pio->type == LISTEN_FD) || (pio->type == SOCK_FD)) {
return socketio_start_asyncio(pio, rd); return socketio_on_select(pio, rd);
} }
else { else {
//return fileio_is_ready(pio); //return fileio_start_io(pio);
return -1; return -1;
} }
@ -214,30 +214,36 @@ int w32_select(int fds, fd_set* readfds, fd_set* writefds, fd_set* exceptfds, co
return -1; return -1;
} }
if (!readfds && !writefds && !exceptfds) {
errno = EPERM;
return -1;
}
//see if any io is ready //see if any io is ready
for (int i = 0; i <= fds; i++) { for (int i = 0; i <= fds; i++) {
if FD_ISSET(i, readfds) { if (readfds && FD_ISSET(i, readfds)) {
if (fd_table.w32_ios[i] == NULL) { if (fd_table.w32_ios[i] == NULL) {
errno = EPERM; errno = EPERM;
return -1; return -1;
} }
in_ready_fds++; in_ready_fds++;
if (w32_io_is_ioready(fd_table.w32_ios[i], TRUE)) { if (w32_io_is_io_available(fd_table.w32_ios[i], TRUE)) {
FD_SET(i, &read_ready_fds); FD_SET(i, &read_ready_fds);
out_ready_fds++; out_ready_fds++;
} }
} }
if FD_ISSET(i, writefds) { if (writefds && FD_ISSET(i, writefds)) {
if (fd_table.w32_ios[i] == NULL) { if (fd_table.w32_ios[i] == NULL) {
errno = EPERM; errno = EPERM;
return -1; return -1;
} }
in_ready_fds++; in_ready_fds++;
if (w32_io_is_ioready(fd_table.w32_ios[i], FALSE)) { if (w32_io_is_io_available(fd_table.w32_ios[i], FALSE)) {
FD_SET(i, &write_ready_fds); FD_SET(i, &write_ready_fds);
out_ready_fds++; out_ready_fds++;
} }
@ -254,94 +260,68 @@ int w32_select(int fds, fd_set* readfds, fd_set* writefds, fd_set* exceptfds, co
//if some fds are already ready, return //if some fds are already ready, return
if (out_ready_fds) if (out_ready_fds)
{ {
*readfds = read_ready_fds; if (readfds)
*writefds = write_ready_fds; *readfds = read_ready_fds;
if (writefds)
*writefds = write_ready_fds;
return out_ready_fds; return out_ready_fds;
} }
//start async io on selected fds //start async io on selected fds
for (int i = 0; i <= fds; i++) { for (int i = 0; i <= fds; i++) {
if FD_ISSET(i, readfds) { if (readfds && FD_ISSET(i, readfds)) {
if (w32_io_start_asyncio(fd_table.w32_ios[i], TRUE) == -1) if (w32_io_on_select(fd_table.w32_ios[i], TRUE) == -1)
return -1; return -1;
if (fd_table.w32_ios[i]->type == LISTEN_FD) { if (fd_table.w32_ios[i]->type == LISTEN_FD) {
events[num_events++] = fd_table.w32_ios[i]->read_overlapped.hEvent; events[num_events++] = fd_table.w32_ios[i]->read_overlapped.hEvent;
} }
} }
if FD_ISSET(i, writefds) { if (writefds && FD_ISSET(i, writefds)) {
if (w32_io_start_asyncio(fd_table.w32_ios[i], FALSE) == -1) if (w32_io_on_select(fd_table.w32_ios[i], FALSE) == -1)
return -1; return -1;
} }
} }
//wait for any io to complete do {
if (num_events) //to-do cut down wait time on subsequent waits
{ if (0 != wait_for_any_event(events, num_events, ((timeout->tv_sec) * 1000) + ((timeout->tv_usec) / 1000))) {
DWORD ret = WaitForMultipleObjectsEx(num_events, events, FALSE, ((timeout->tv_sec) * 1000) + ((timeout->tv_usec) / 1000), TRUE);
if ((ret >= WAIT_OBJECT_0) && (ret <= WAIT_OBJECT_0 + num_events - 1)) {
//woken up by event signalled
}
else if (ret == WAIT_IO_COMPLETION) {
//woken up by APC from IO completion
}
else if (ret == WAIT_TIMEOUT) {
errno = ETIMEDOUT;
return -1; return -1;
} }
else { //some other error
errno = EOTHER;
return -1;
}
}
else
{
DWORD ret = SleepEx(((timeout->tv_sec) * 1000) + ((timeout->tv_usec) / 1000), TRUE);
if (ret == WAIT_IO_COMPLETION) {
//worken up by APC from IO completion
}
else if (ret == 0) {
//timed out
errno = ETIMEDOUT;
return -1;
}
else { //some other error
errno = EOTHER;
return -1;
}
}
//check on fd status //check on fd status
out_ready_fds = 0; out_ready_fds = 0;
for (int i = 0; i <= fds; i++) { for (int i = 0; i <= fds; i++) {
if FD_ISSET(i, readfds) { if (readfds && FD_ISSET(i, readfds)) {
in_ready_fds++; in_ready_fds++;
if (w32_io_is_ioready(fd_table.w32_ios[i], TRUE)) { if (w32_io_is_io_available(fd_table.w32_ios[i], TRUE)) {
FD_SET(i, &read_ready_fds); FD_SET(i, &read_ready_fds);
out_ready_fds++; out_ready_fds++;
}
}
if (writefds && FD_ISSET(i, writefds)) {
in_ready_fds++;
if (w32_io_is_io_available(fd_table.w32_ios[i], FALSE)) {
FD_SET(i, &write_ready_fds);
out_ready_fds++;
}
} }
} }
if FD_ISSET(i, writefds) { if (out_ready_fds)
in_ready_fds++; break;
if (w32_io_is_ioready(fd_table.w32_ios[i], FALSE)) {
FD_SET(i, &write_ready_fds);
out_ready_fds++;
}
}
} } while (1);
if (out_ready_fds) if (readfds)
{
*readfds = read_ready_fds; *readfds = read_ready_fds;
if (writefds)
*writefds = write_ready_fds; *writefds = write_ready_fds;
return out_ready_fds;
}
errno = EOTHER; return out_ready_fds;
return -1;
} }

View File

@ -51,12 +51,16 @@ struct w32_io {
}; };
BOOL w32_io_is_blocking(struct w32_io*); BOOL w32_io_is_blocking(struct w32_io*);
BOOL w32_io_is_ioready(struct w32_io* pio, BOOL rd); BOOL w32_io_is_io_available(struct w32_io* pio, BOOL rd);
//signal
int wait_for_any_event(HANDLE* events, int num_events, DWORD milli_seconds);
//socket io
int socketio_initialize(); int socketio_initialize();
int socketio_done(); int socketio_done();
BOOL socketio_is_ioready(struct w32_io* pio, BOOL rd); BOOL socketio_is_io_available(struct w32_io* pio, BOOL rd);
int socketio_start_asyncio(struct w32_io* pio, BOOL rd); int socketio_on_select(struct w32_io* pio, BOOL rd);
struct w32_io* socketio_socket(int domain, int type, int protocol); struct w32_io* socketio_socket(int domain, int type, int protocol);
struct w32_io* socketio_accept(struct w32_io* pio, struct sockaddr* addr, int* addrlen); struct w32_io* socketio_accept(struct w32_io* pio, struct sockaddr* addr, int* addrlen);
int socketio_setsockopt(struct w32_io* pio, int level, int optname, const char* optval, int optlen); int socketio_setsockopt(struct w32_io* pio, int level, int optname, const char* optval, int optlen);

View File

@ -74,6 +74,7 @@
<Text Include="ReadMe.txt" /> <Text Include="ReadMe.txt" />
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<ClCompile Include="signal.c" />
<ClCompile Include="socketio.c" /> <ClCompile Include="socketio.c" />
<ClCompile Include="w32fd.c" /> <ClCompile Include="w32fd.c" />
</ItemGroup> </ItemGroup>

View File

@ -24,6 +24,9 @@
<ClCompile Include="w32fd.c"> <ClCompile Include="w32fd.c">
<Filter>Source Files</Filter> <Filter>Source Files</Filter>
</ClCompile> </ClCompile>
<ClCompile Include="signal.c">
<Filter>Source Files</Filter>
</ClCompile>
</ItemGroup> </ItemGroup>
<ItemGroup> <ItemGroup>
<ClInclude Include="w32fd.h"> <ClInclude Include="w32fd.h">