This commit is contained in:
Manoj Ampalam 2016-01-08 15:09:53 -08:00
parent c2da6ce9a4
commit fa9331e2d5
2 changed files with 87 additions and 67 deletions

View File

@ -5,6 +5,7 @@
#include "w32fd.h" #include "w32fd.h"
#include <stddef.h> #include <stddef.h>
#define INTERNAL_BUFFER_SIZE 100*1024 //100KB
static int getWSAErrno() static int getWSAErrno()
{ {
@ -110,8 +111,9 @@ void CALLBACK WSARecvCompletionRoutine(
int socketio_recv(struct w32_io* pio, void *buf, size_t len, int flags) { int socketio_recv(struct w32_io* pio, void *buf, size_t len, int flags) {
int ret = 0; int ret = 0;
WSABUF* wsabuf; WSABUF wsabuf;
DWORD recv_flags = 0;
//if io is already pending //if io is already pending
if (pio->read_details.pending) if (pio->read_details.pending)
{ {
@ -120,49 +122,46 @@ int socketio_recv(struct w32_io* pio, void *buf, size_t len, int flags) {
} }
//initialize recv buffers if needed //initialize recv buffers if needed
if (pio->read_overlapped.hEvent == NULL) wsabuf.len = INTERNAL_BUFFER_SIZE;
if (pio->read_details.buf == NULL)
{ {
wsabuf = malloc(sizeof(WSABUF)); wsabuf.buf = malloc(wsabuf.len);
if (wsabuf) {
wsabuf->len = 1024; if (!wsabuf.buf)
wsabuf->buf = malloc(wsabuf->len);
}
if (!wsabuf || !wsabuf->buf)
{ {
if (wsabuf)
free(wsabuf);
errno = ENOMEM; errno = ENOMEM;
return -1; return -1;
} }
pio->read_overlapped.hEvent = (HANDLE)wsabuf; pio->read_details.buf = wsabuf.buf;
pio->read_details.buf_size = wsabuf.len;
} }
else else
wsabuf = (WSABUF*)pio->read_overlapped.hEvent; wsabuf.buf = pio->read_details.buf;
//if we have some buffer copy it and retun #bytes copied //if we have some buffer copy it and retun #bytes copied
if (pio->read_details.remaining) if (pio->read_details.remaining)
{ {
int num_bytes_copied = min(len, pio->read_details.remaining); int num_bytes_copied = min(len, pio->read_details.remaining);
memcpy(buf, pio->read_overlapped.hEvent, num_bytes_copied); memcpy(buf, pio->read_details.buf + pio->read_details.completed, num_bytes_copied);
pio->read_details.remaining -= num_bytes_copied; pio->read_details.remaining -= num_bytes_copied;
pio->read_details.completed += num_bytes_copied;
return num_bytes_copied; return num_bytes_copied;
} }
//TODO - implement flags if any needed for OpenSSH //TODO - implement flags if any needed for OpenSSH
ret = WSARecv(pio->sock, wsabuf, 1, NULL, 0, &pio->read_overlapped, &WSARecvCompletionRoutine); ret = WSARecv(pio->sock, &wsabuf, 1, NULL, &recv_flags, &pio->read_overlapped, &WSARecvCompletionRoutine);
if (ret == 0) if (ret == 0)
{ {
//receive has completed and APC is scheduled, let it run //receive has completed and APC is scheduled, let it run
pio->read_details.pending = TRUE; pio->read_details.pending = TRUE;
SleepEx(1, TRUE); SleepEx(1, TRUE);
if ((pio->read_details.pending == FALSE) || (pio->read_details.remaining == 0)) { if (pio->read_details.pending) {
//unexpected internal error
errno = EOTHER; errno = EOTHER;
return -1; return -1;
} }
//we should have some bytes copied to internal buffer
} }
else { //(ret == SOCKET_ERROR) else { //(ret == SOCKET_ERROR)
if (WSAGetLastError() == WSA_IO_PENDING) if (WSAGetLastError() == WSA_IO_PENDING)
@ -187,20 +186,24 @@ int socketio_recv(struct w32_io* pio, void *buf, size_t len, int flags) {
} }
} }
//by this time we should have some bytes in internal buffer //by this time we should have some bytes in internal buffer or an error from callback
//if we have some buffer copy it and retun #bytes copied if (pio->read_details.error)
if (pio->read_details.remaining)
{ {
int num_bytes_copied = min(len, pio->read_details.remaining);
memcpy(buf, ((WSABUF*)pio->read_overlapped.hEvent)->buf, num_bytes_copied);
pio->read_details.remaining -= num_bytes_copied;
return num_bytes_copied;
}
else {
errno = EOTHER; errno = EOTHER;
return -1; return -1;
} }
if (pio->read_details.remaining) {
int num_bytes_copied = min(len, pio->read_details.remaining);
memcpy(buf, pio->read_details.buf, num_bytes_copied);
pio->read_details.remaining -= num_bytes_copied;
pio->read_details.completed = num_bytes_copied;
return num_bytes_copied;
}
else { //connection is closed
return 0;
}
} }
void CALLBACK WSASendCompletionRoutine( void CALLBACK WSASendCompletionRoutine(
@ -219,8 +222,8 @@ void CALLBACK WSASendCompletionRoutine(
int socketio_send(struct w32_io* pio, const void *buf, size_t len, int flags) { int socketio_send(struct w32_io* pio, const void *buf, size_t len, int flags) {
int ret = 0; int ret = 0;
WSABUF* wsabuf; WSABUF wsabuf;
//if io is already pending //if io is already pending
if (pio->write_details.pending) if (pio->write_details.pending)
{ {
@ -229,53 +232,49 @@ int socketio_send(struct w32_io* pio, const void *buf, size_t len, int flags) {
} }
//initialize buffers if needed //initialize buffers if needed
if (pio->write_overlapped.hEvent == NULL) wsabuf.len = INTERNAL_BUFFER_SIZE;
if (pio->write_details.buf == NULL)
{ {
wsabuf = malloc(sizeof(WSABUF)); wsabuf.buf = malloc(wsabuf.len);
if (wsabuf) { if (!wsabuf.buf)
wsabuf->len = 1024;
wsabuf->buf = malloc(wsabuf->len);
}
if (!wsabuf || !wsabuf->buf)
{ {
if (wsabuf)
free(wsabuf);
errno = ENOMEM; errno = ENOMEM;
return -1; return -1;
} }
pio->write_overlapped.hEvent = (HANDLE)wsabuf; pio->write_details.buf = wsabuf.buf;
pio->write_details.buf_size = wsabuf.len;
} }
else { else {
wsabuf = (WSABUF*)pio->write_overlapped.hEvent; wsabuf.buf = pio->write_details.buf;
} }
wsabuf->len = min(1024, len); wsabuf.len = min(wsabuf.len, len);
memcpy(wsabuf->buf, buf, wsabuf->len); memcpy(wsabuf.buf, buf, wsabuf.len);
ret = WSASend(pio->sock, wsabuf, 1, NULL, 0, &pio->write_overlapped, &WSASendCompletionRoutine); //implement flags support if needed
ret = WSASend(pio->sock, &wsabuf, 1, NULL, 0, &pio->write_overlapped, &WSASendCompletionRoutine);
if (ret == 0) if (ret == 0)
{ {
//send has completed and APC is scheduled, let it run //send has completed and APC is scheduled, let it run
pio->write_details.pending = TRUE; pio->write_details.pending = TRUE;
pio->write_details.remaining = wsabuf->len; pio->write_details.remaining = wsabuf.len;
SleepEx(1, TRUE); SleepEx(1, TRUE);
if ((pio->write_details.pending == FALSE) || (pio->write_details.remaining != 0)) { if ((pio->write_details.pending) || (pio->write_details.remaining != 0)) {
errno = EOTHER; errno = EOTHER;
return -1; return -1;
} }
//return num of bytes written //return num of bytes written
return wsabuf->len; return wsabuf.len;
} }
else { //(ret == SOCKET_ERROR) else { //(ret == SOCKET_ERROR)
if (WSAGetLastError() == WSA_IO_PENDING) if (WSAGetLastError() == WSA_IO_PENDING)
{ {
//io is initiated and pending //io is initiated and pending
pio->write_details.pending = TRUE; pio->write_details.pending = TRUE;
pio->write_details.remaining = wsabuf->len; pio->write_details.remaining = wsabuf.len;
if (w32_io_is_blocking(pio)) if (w32_io_is_blocking(pio))
{ {
//wait until io is done //wait until io is done
@ -283,7 +282,7 @@ int socketio_send(struct w32_io* pio, const void *buf, size_t len, int flags) {
SleepEx(INFINITE, TRUE); SleepEx(INFINITE, TRUE);
} }
return wsabuf->len; return wsabuf.len;
} }
else { //failed else { //failed
errno = getWSAErrno(); errno = getWSAErrno();
@ -300,6 +299,20 @@ int socketio_shutdown(struct w32_io* pio, int how) {
int socketio_close(struct w32_io* pio) { int socketio_close(struct w32_io* pio) {
closesocket(pio->sock); closesocket(pio->sock);
if (pio->type == LISTEN_FD) {
if (pio->read_overlapped.hEvent)
CloseHandle(pio->read_overlapped.hEvent);
if (pio->context)
free(pio->context);
}
else {
if (pio->read_details.buf)
free(pio->read_details.buf);
if (pio->write_details.buf)
free(pio->write_details.buf);
}
//todo- wait for pending io to abort //todo- wait for pending io to abort
free(pio); free(pio);
return 0; return 0;
@ -360,12 +373,13 @@ BOOL socketio_is_ioready(struct w32_io* pio, BOOL rd) {
if (pio->type == LISTEN_FD) { if (pio->type == LISTEN_FD) {
DWORD numBytes = 0; DWORD numBytes = 0;
if (pio->read_details.pending && GetOverlappedResult(pio->read_overlapped.hEvent, &pio->read_overlapped, &numBytes, FALSE)) { DWORD flags;
if (pio->read_details.pending && WSAGetOverlappedResult(pio->sock, &pio->read_overlapped, &numBytes, FALSE, &flags)) {
return TRUE; return TRUE;
} }
else { else {
if (pio->read_details.pending && GetLastError() != ERROR_IO_INCOMPLETE) { if (pio->read_details.pending && WSAGetLastError() != WSA_IO_INCOMPLETE) {
//unexpected error; //unexpected error; log an event
} }
return FALSE; return FALSE;
} }

View File

@ -1,7 +1,7 @@
#include <Windows.h> #include <Windows.h>
enum w32_io_type { enum w32_io_type {
UNKOWN_FD, UNKOWN_FD = 0,
LISTEN_FD, LISTEN_FD,
SOCK_FD, SOCK_FD,
FILE_FD FILE_FD
@ -11,16 +11,26 @@ struct w32_io {
OVERLAPPED read_overlapped; OVERLAPPED read_overlapped;
OVERLAPPED write_overlapped; OVERLAPPED write_overlapped;
struct { struct {
DWORD error; //internal buffer details
DWORD remaining; char *buf;
DWORD completed; DWORD buf_size;
BOOL pending;
//async io details
DWORD error; //error reported on async read completion
DWORD remaining; //bytes in internal buffer remaining to be read by application
DWORD completed; //bytes in internal buffer already read by application
BOOL pending; //waiting on async io to complete
}read_details; }read_details;
struct { struct {
DWORD error; //internal buffer details
DWORD remaining; char* buf;
DWORD completed; DWORD buf_size;
BOOL pending;
//async io details
DWORD error; //error reported on async write completion
DWORD remaining; //bytes in internal buffer that are not yet successfully written on i/o
DWORD completed; //bytes in internal buffer that have been successfully written on i/o
BOOL pending; //waiting on async io to complete
}write_details; }write_details;
//-1 if not indexed //-1 if not indexed
@ -43,10 +53,6 @@ struct w32_io {
BOOL w32_io_is_blocking(struct w32_io*); BOOL w32_io_is_blocking(struct w32_io*);
BOOL w32_io_is_ioready(struct w32_io* pio, BOOL rd); BOOL w32_io_is_ioready(struct w32_io* pio, BOOL rd);
int fd_table_initialize();
int fd_table_add(struct w32_io*);
int fd_table_delete(struct w32_io*);
int socketio_initialize(); int socketio_initialize();
int socketio_done(); int socketio_done();
BOOL socketio_is_ioready(struct w32_io* pio, BOOL rd); BOOL socketio_is_ioready(struct w32_io* pio, BOOL rd);