Various fixes around _on_select for each io to record error (instead of returning it on select)

This commit is contained in:
manojampalam 2016-03-18 22:34:32 -07:00
parent 809d39871a
commit 8601d0ccd6
5 changed files with 130 additions and 106 deletions

View File

@ -594,19 +594,28 @@ fileio_fdopen(struct w32_io* pio, const char *mode) {
return _fdopen(fd, mode);
}
int
void
fileio_on_select(struct w32_io* pio, BOOL rd) {
if (!rd)
return 0;
return;
if (!pio->read_details.pending && !fileio_is_io_available(pio, rd))
if (FILETYPE(pio) == FILE_TYPE_CHAR)
return termio_initiate_read(pio);
else
return fileio_ReadFileEx(pio);
return 0;
/* initiate read, record any error so read() will pick up */
if (FILETYPE(pio) == FILE_TYPE_CHAR) {
if (termio_initiate_read(pio) != 0) {
pio->read_details.error = errno;
errno = 0;
return;
}
}
else {
if (fileio_ReadFileEx(pio) != 0) {
pio->read_details.error = errno;
errno = 0;
return;
}
}
}

View File

@ -98,60 +98,8 @@ socketio_acceptEx(struct w32_io* pio) {
struct acceptEx_context *context;
debug3("acceptEx - io:%p", pio);
if (pio->internal.context == NULL) {
GUID GuidAcceptEx = WSAID_ACCEPTEX;
GUID GuidGetAcceptExSockaddrs = WSAID_GETACCEPTEXSOCKADDRS;
DWORD dwBytes;
context = (struct acceptEx_context *)pio->internal.context;
context =
(struct acceptEx_context*)malloc(sizeof(struct acceptEx_context));
if (context == NULL) {
errno = ENOMEM;
debug("acceptEx - ERROR:%d, io:%p", errno, pio);
return -1;
}
memset(context, 0, sizeof(struct acceptEx_context));
if (SOCKET_ERROR == WSAIoctl(pio->sock,
SIO_GET_EXTENSION_FUNCTION_POINTER,
&GuidAcceptEx, sizeof(GuidAcceptEx),
&context->lpfnAcceptEx, sizeof(context->lpfnAcceptEx),
&dwBytes, NULL, NULL))
{
free(context);
errno = errno_from_WSALastError();
debug("acceptEx - Ioctl1 ERROR:%d, io:%p", errno, pio);
return -1;
}
if (SOCKET_ERROR == WSAIoctl(pio->sock,
SIO_GET_EXTENSION_FUNCTION_POINTER,
&GuidGetAcceptExSockaddrs, sizeof(GuidGetAcceptExSockaddrs),
&context->lpfnGuidGetAcceptExSockaddrs, sizeof(context->lpfnGuidGetAcceptExSockaddrs),
&dwBytes, NULL, NULL))
{
free(context);
errno = errno_from_WSALastError();
debug("acceptEx - Ioctl2 ERROR:%d, io:%p", errno, pio);
return -1;
}
context->accept_socket = INVALID_SOCKET;
pio->internal.context = context;
}
else
context = (struct acceptEx_context *)pio->internal.context;
/* init overlapped event */
if (pio->read_overlapped.hEvent == NULL) {
pio->read_overlapped.hEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
if ((pio->read_overlapped.hEvent) == NULL) {
errno = ENOMEM;
debug("acceptEx - CreateEvent() ERROR:%d, io:%p", errno, pio);
return -1;
}
}
ResetEvent(pio->read_overlapped.hEvent);
/* create accepting socket */
@ -257,10 +205,9 @@ socketio_WSARecv(struct w32_io* pio, BOOL* completed) {
pio->read_details.pending = TRUE;
}
else {
/* io has completed due to error, recv() will pick it up */
debug("WSARecv - WSARecv() ERROR:%d io:%p", WSAGetLastError(), pio);
pio->read_details.error = WSAGetLastError();
return 0;
errno = errno_from_WSALastError();
debug("WSARecv - WSARecv() ERROR: io:%p %d", pio, errno);
return -1;
}
}
@ -334,11 +281,63 @@ socketio_getpeername(struct w32_io* pio, struct sockaddr* name, int* namelen) {
/* implements listen() */
int
socketio_listen(struct w32_io* pio, int backlog) {
struct acceptEx_context* context;
if (SOCKET_ERROR == listen(pio->sock, backlog)) {
errno = errno_from_WSALastError();
debug("listen - listen() ERROR:%d io:%p", errno, pio);
return -1;
}
/* prep for accept*/
{
GUID GuidAcceptEx = WSAID_ACCEPTEX;
GUID GuidGetAcceptExSockaddrs = WSAID_GETACCEPTEXSOCKADDRS;
DWORD dwBytes;
context = (struct acceptEx_context*)malloc(sizeof(struct acceptEx_context));
if (context == NULL) {
errno = ENOMEM;
debug("listen - ERROR:%d, io:%p", errno, pio);
return -1;
}
memset(context, 0, sizeof(struct acceptEx_context));
if (SOCKET_ERROR == WSAIoctl(pio->sock,
SIO_GET_EXTENSION_FUNCTION_POINTER,
&GuidAcceptEx, sizeof(GuidAcceptEx),
&context->lpfnAcceptEx, sizeof(context->lpfnAcceptEx),
&dwBytes, NULL, NULL))
{
free(context);
errno = errno_from_WSALastError();
debug("listen - Ioctl1 ERROR:%d, io:%p", errno, pio);
return -1;
}
if (SOCKET_ERROR == WSAIoctl(pio->sock,
SIO_GET_EXTENSION_FUNCTION_POINTER,
&GuidGetAcceptExSockaddrs, sizeof(GuidGetAcceptExSockaddrs),
&context->lpfnGuidGetAcceptExSockaddrs, sizeof(context->lpfnGuidGetAcceptExSockaddrs),
&dwBytes, NULL, NULL))
{
free(context);
errno = errno_from_WSALastError();
debug("listen - Ioctl2 ERROR:%d, io:%p", errno, pio);
return -1;
}
pio->read_overlapped.hEvent = CreateEvent(NULL, TRUE, FALSE, NULL);
if ((pio->read_overlapped.hEvent) == NULL) {
free(context);
errno = ENOMEM;
debug("listen - CreateEvent() ERROR:%d, io:%p", errno, pio);
return -1;
}
context->accept_socket = INVALID_SOCKET;
pio->internal.context = context;
}
pio->internal.state = SOCK_LISTENING;
return 0;
}
@ -694,13 +693,9 @@ socketio_accept(struct w32_io* pio, struct sockaddr* addr, int* addrlen) {
if (w32_io_is_blocking(pio)) {
/* block until accept io is complete */
while (FALSE == socketio_is_io_available(pio, TRUE))
{
if (-1 == wait_for_any_event(&pio->read_overlapped.hEvent,
1, INFINITE))
{
if (-1 == wait_for_any_event(&pio->read_overlapped.hEvent,
1, INFINITE))
return NULL;
}
}
}
else {
/* if i/o is not ready */
@ -911,19 +906,29 @@ socketio_is_io_available(struct w32_io* pio, BOOL rd) {
BOOL pending =
sock_listening ? pio->read_details.pending : pio->write_details.pending;
if (pending && WSAGetOverlappedResult(pio->sock, overlapped,
&numBytes, FALSE, &flags))
return TRUE;
else {
if (pending && WSAGetLastError() != WSA_IO_INCOMPLETE) {
if (sock_listening)
pio->read_details.error = WSAGetLastError();
else
pio->write_details.error = WSAGetLastError();
return TRUE;
if (pending)
/* if there is an error to be picked up */
if (sock_listening) {
if (pio->read_details.error)
return TRUE;
}
return FALSE;
}
else {
if (pio->write_details.error)
return TRUE;
}
if (WSAGetOverlappedResult(pio->sock, overlapped,
&numBytes, FALSE, &flags))
return TRUE;
else if (WSAGetLastError() != WSA_IO_INCOMPLETE) {
if (sock_listening)
pio->read_details.error = WSAGetLastError();
else
pio->write_details.error = WSAGetLastError();
return TRUE;
}
return FALSE;
}
else if (rd) {
if (pio->read_details.remaining || pio->read_details.error)
@ -937,7 +942,7 @@ socketio_is_io_available(struct w32_io* pio, BOOL rd) {
}
/*start async io (if needed) for accept and recv*/
int
void
socketio_on_select(struct w32_io* pio, BOOL rd) {
enum w32_io_sock_state sock_state = pio->internal.state;
@ -946,20 +951,29 @@ socketio_on_select(struct w32_io* pio, BOOL rd) {
//nothing to do for writes (that includes connect)
if (!rd)
return 0;
return;
//listening socket - acceptEx if needed
if (sock_state == SOCK_LISTENING) {
if ((!pio->read_details.pending) && (socketio_acceptEx(pio) != 0))
return -1;
if (pio->read_details.pending == FALSE)
if (socketio_acceptEx(pio) != 0) {
/* set error, accept will pick it*/
pio->read_details.error = errno;
errno = 0;
pio->read_details.pending = TRUE;
SetEvent(pio->read_overlapped.hEvent);
return;
}
}
else {
//connected socket - WSARecv if needed
if ((!pio->read_details.pending)
&& (!socketio_is_io_available(pio, rd))
&& (socketio_WSARecv(pio, NULL) != 0))
return -1;
if ((!pio->read_details.pending) && (!socketio_is_io_available(pio, rd)))
if (socketio_WSARecv(pio, NULL) != 0) {
/* set error, recv() will pick it */
pio->read_details.error = errno;
errno = 0;
return;
}
}
return 0;
}

View File

@ -41,12 +41,13 @@ static DWORD WINAPI ReadThread(
if (0 == QueueUserAPC(ReadAPCProc, main_thread, (ULONG_PTR)pio)) {
debug("TermRead thread - ERROR QueueUserAPC failed %d, io:%p", GetLastError(), pio);
pio->read_details.pending = FALSE;
pio->read_details.error = GetLastError();
DebugBreak();
}
return 0;
}
/* TODO - make this a void func*/
int
termio_initiate_read(struct w32_io* pio) {
HANDLE read_thread;
@ -74,7 +75,6 @@ termio_initiate_read(struct w32_io* pio) {
return 0;
}
/* TODO - make this a void func*/
static VOID CALLBACK WriteAPCProc(
_In_ ULONG_PTR dwParam
) {
@ -99,11 +99,13 @@ static DWORD WINAPI WriteThread(
if (!WriteFile(WINHANDLE(pio), pio->write_details.buf, write_status.to_transfer,
&write_status.transferred, NULL)) {
write_status.error = GetLastError();
debug("TermWrite thread - ReadFile failed %d, io:%p", GetLastError(), pio);
debug("TermWrite thread - WriteFile failed %d, io:%p", GetLastError(), pio);
}
if (0 == QueueUserAPC(WriteAPCProc, main_thread, (ULONG_PTR)pio)) {
debug("TermWrite thread - ERROR QueueUserAPC failed %d, io:%p", GetLastError(), pio);
pio->write_details.pending = FALSE;
pio->write_details.error = GetLastError();
DebugBreak();
}
return 0;
@ -124,6 +126,7 @@ termio_initiate_write(struct w32_io* pio, DWORD num_bytes) {
}
pio->write_overlapped.hEvent = write_thread;
pio->write_details.pending = TRUE;
return 0;
}

View File

@ -151,13 +151,13 @@ w32_io_is_io_available(struct w32_io* pio, BOOL rd) {
return fileio_is_io_available(pio, rd);
}
int
void
w32_io_on_select(struct w32_io* pio, BOOL rd)
{
if ((pio->type == SOCK_FD))
return socketio_on_select(pio, rd);
return fileio_on_select(pio, rd);
socketio_on_select(pio, rd);
else
fileio_on_select(pio, rd);
}
#define CHECK_FD(fd) do { \
@ -568,8 +568,7 @@ w32_select(int fds, w32_fd_set* readfds, w32_fd_set* writefds, w32_fd_set* excep
for (int i = 0; i < fds; i++) {
if (readfds && FD_ISSET(i, readfds)) {
if (w32_io_on_select(fd_table.w32_ios[i], TRUE) == -1)
return -1;
w32_io_on_select(fd_table.w32_ios[i], TRUE);
if ((fd_table.w32_ios[i]->type == SOCK_FD)
&& (fd_table.w32_ios[i]->internal.state == SOCK_LISTENING)) {
if (num_events == SELECT_EVENT_LIMIT) {
@ -582,8 +581,7 @@ w32_select(int fds, w32_fd_set* readfds, w32_fd_set* writefds, w32_fd_set* excep
}
if (writefds && FD_ISSET(i, writefds)) {
if (w32_io_on_select(fd_table.w32_ios[i], FALSE) == -1)
return -1;
w32_io_on_select(fd_table.w32_ios[i], FALSE);
if ((fd_table.w32_ios[i]->type == SOCK_FD)
&& (fd_table.w32_ios[i]->internal.state == SOCK_CONNECTING)) {
if (num_events == SELECT_EVENT_LIMIT) {
@ -622,7 +620,7 @@ w32_select(int fds, w32_fd_set* readfds, w32_fd_set* writefds, w32_fd_set* excep
/* timeout specified and both fields are 0 - polling mode*/
/* proceed with further wait if not in polling mode*/
if ((timeout == NULL) || (timeout_ms != 0))
/* wait for io if none is already ready */
/* wait for io until any is ready */
while (out_ready_fds == 0) {
ticks_spent = GetTickCount64() - ticks_start;
time_rem = 0;

View File

@ -87,7 +87,7 @@ int wait_for_any_event(HANDLE* events, int num_events, DWORD milli_seconds);
int socketio_initialize();
int socketio_done();
BOOL socketio_is_io_available(struct w32_io* pio, BOOL rd);
int socketio_on_select(struct w32_io* pio, BOOL rd);
void socketio_on_select(struct w32_io* pio, BOOL rd);
struct w32_io* socketio_socket(int domain, int type, int protocol);
struct w32_io* socketio_accept(struct w32_io* pio, struct sockaddr* addr, int* addrlen);
int socketio_setsockopt(struct w32_io* pio, int level, int optname,
@ -107,7 +107,7 @@ int socketio_close(struct w32_io* pio);
/*POSIX mimic'ing file API*/
BOOL fileio_is_io_available(struct w32_io* pio, BOOL rd);
int fileio_on_select(struct w32_io* pio, BOOL rd);
void fileio_on_select(struct w32_io* pio, BOOL rd);
int fileio_close(struct w32_io* pio);
int fileio_pipe(struct w32_io* pio[2]);
struct w32_io* fileio_open(const char *pathname, int flags, int mode);