Fix in async Recv and other updates

This commit is contained in:
Manoj Ampalam 2016-03-18 14:37:40 -07:00
parent 03f9475b85
commit 809d39871a
4 changed files with 67 additions and 24 deletions

View File

@ -20,7 +20,13 @@ char recvbuf[DEFAULT_BUFLEN];
char sendbuf[DEFAULT_BUFLEN]; char sendbuf[DEFAULT_BUFLEN];
bool keep_going = true; bool keep_going = true;
__int64 rec_bytes = 0, sent_bytes = 0; __int64 rec_bytes = 0, sent_bytes = 0;
bool server = true; enum _mode {
server,
client,
child
};
enum _mode mode = server;
bool rec_live = true, send_live = true;
void prep_send_buf() void prep_send_buf()
@ -40,6 +46,7 @@ DWORD WINAPI RecvThread(
rec = recv(ConnectSocket, recvbuf, DEFAULT_BUFLEN, 0); rec = recv(ConnectSocket, recvbuf, DEFAULT_BUFLEN, 0);
rec_bytes += rec; rec_bytes += rec;
} }
rec_live = false;
return 0; return 0;
} }
@ -52,6 +59,7 @@ DWORD WINAPI SendThread(
rec = send(ConnectSocket, sendbuf + rnd, DEFAULT_BUFLEN - rnd, 0); rec = send(ConnectSocket, sendbuf + rnd, DEFAULT_BUFLEN - rnd, 0);
sent_bytes += rec; sent_bytes += rec;
} }
send_live = false;
return 0; return 0;
} }
@ -64,7 +72,7 @@ int __cdecl main(int argc, char **argv)
hints; hints;
int iResult; int iResult;
// Validate the parameters // Validate the parameters
if ((argc < 2) || (strlen(argv[1]) > 1)) { if ((argc < 2) || (strlen(argv[1]) > 1)) {
@ -73,7 +81,9 @@ int __cdecl main(int argc, char **argv)
} }
if (argv[1][0] == 'c') if (argv[1][0] == 'c')
server = false; mode = client;
else if (argv[1][0] == 'o')
mode = child;
// Initialize Winsock // Initialize Winsock
iResult = WSAStartup(MAKEWORD(2, 2), &wsaData); iResult = WSAStartup(MAKEWORD(2, 2), &wsaData);
@ -87,15 +97,19 @@ int __cdecl main(int argc, char **argv)
hints.ai_socktype = SOCK_STREAM; hints.ai_socktype = SOCK_STREAM;
hints.ai_protocol = IPPROTO_TCP; hints.ai_protocol = IPPROTO_TCP;
prep_send_buf();
// Resolve the server address and port // Resolve the server address and port
iResult = getaddrinfo(argv[2], argv[3], &hints, &result); if (mode != child) {
if (iResult != 0) { iResult = getaddrinfo(argv[2], argv[3], &hints, &result);
printf("getaddrinfo failed with error: %d\n", iResult); if (iResult != 0) {
WSACleanup(); printf("getaddrinfo failed with error: %d\n", iResult);
return 1; WSACleanup();
return 1;
}
} }
if (!server) { if (mode == client) {
// Attempt to connect to an address until one succeeds // Attempt to connect to an address until one succeeds
for (ptr = result; ptr != NULL; ptr = ptr->ai_next) { for (ptr = result; ptr != NULL; ptr = ptr->ai_next) {
@ -124,7 +138,11 @@ int __cdecl main(int argc, char **argv)
return 1; return 1;
} }
} }
else if (mode == child) {
ConnectSocket = atoi(argv[2]);
}
else { else {
SOCKET ListenSocket; SOCKET ListenSocket;
// Create a SOCKET for connecting to server // Create a SOCKET for connecting to server
ListenSocket = socket(result->ai_family, result->ai_socktype, result->ai_protocol); ListenSocket = socket(result->ai_family, result->ai_socktype, result->ai_protocol);
@ -154,17 +172,34 @@ int __cdecl main(int argc, char **argv)
} }
// Accept a client socket // Accept a client socket
ConnectSocket = accept(ListenSocket, NULL, NULL); while (1) {
if (ConnectSocket == INVALID_SOCKET) { ConnectSocket = accept(ListenSocket, NULL, NULL);
printf("accept failed with error: %d\n", WSAGetLastError()); if (ConnectSocket == INVALID_SOCKET) {
closesocket(ListenSocket); printf("accept failed with error: %d\n", WSAGetLastError());
WSACleanup(); closesocket(ListenSocket);
return 1; WSACleanup();
return 1;
}
if (!SetHandleInformation((HANDLE)ConnectSocket, HANDLE_FLAG_INHERIT, HANDLE_FLAG_INHERIT))
printf("unable to set inheritance on socket handle: %p\n", ConnectSocket);
int sock_int = ConnectSocket & 0xffffffff;
char cmd[MAX_PATH];
STARTUPINFOA si;
PROCESS_INFORMATION pi;
ZeroMemory(&si, sizeof(si)); ZeroMemory(&pi, sizeof(pi));
si.hStdInput = GetStdHandle(STD_INPUT_HANDLE);
si.hStdOutput = GetStdHandle(STD_OUTPUT_HANDLE);
si.hStdError = GetStdHandle(STD_ERROR_HANDLE);
sprintf(cmd, "%s o %d", argv[0], sock_int);
//spawn a child
if (!CreateProcessA(NULL, cmd, NULL, NULL, TRUE, CREATE_NEW_CONSOLE, NULL, NULL, &si, &pi))
printf("child creation failed: %d\n", GetLastError());
closesocket(ConnectSocket);
} }
// No longer need server socket // No longer need server socket
closesocket(ListenSocket); closesocket(ListenSocket);
} }
freeaddrinfo(result); freeaddrinfo(result);
@ -201,10 +236,10 @@ int __cdecl main(int argc, char **argv)
return 1; return 1;
} }
printf("\t Recv(Kb/s) \t\t Sent(Kb/s)\n"); printf("\t %-20s %-20s \n", "Recv(Kbps)", "Send(Kbps)");
__int64 last_recv = 0; __int64 last_recv = 0;
__int64 last_send = 0; __int64 last_send = 0;
while (1) { while (rec_live || send_live) {
if (WAIT_OBJECT_0 != WaitForSingleObject(timer, INFINITE)) { if (WAIT_OBJECT_0 != WaitForSingleObject(timer, INFINITE)) {
printf("wait failed %d\n", GetLastError()); printf("wait failed %d\n", GetLastError());
break; break;
@ -212,12 +247,14 @@ int __cdecl main(int argc, char **argv)
__int64 now_recv = rec_bytes; __int64 now_recv = rec_bytes;
__int64 now_send = sent_bytes; __int64 now_send = sent_bytes;
printf("\r\t %lld \t\t %lld", (now_recv - last_recv) / 2048, (now_send - last_send) / 2048); printf("\r\t %-20lld %-20lld", (now_recv - last_recv) / (2*1048), (now_send - last_send) / (2*1048));
last_recv = now_recv; last_recv = now_recv;
last_send = now_send; last_send = now_send;
} }
printf("\n\n");
closesocket(ConnectSocket); closesocket(ConnectSocket);
WSACleanup(); WSACleanup();

View File

@ -299,6 +299,7 @@ VOID CALLBACK ReadCompletionRoutine(
} }
/* initiate an async read */ /* initiate an async read */
/* TODO: make this a void func, store error in context */
int int
fileio_ReadFileEx(struct w32_io* pio) { fileio_ReadFileEx(struct w32_io* pio) {
debug2("ReadFileEx io:%p", pio); debug2("ReadFileEx io:%p", pio);

View File

@ -92,6 +92,7 @@ struct acceptEx_context {
}; };
/* initiate async acceptEx*/ /* initiate async acceptEx*/
/* TODO - always return 0, set error in context, accept() will pick it up*/
int int
socketio_acceptEx(struct w32_io* pio) { socketio_acceptEx(struct w32_io* pio) {
struct acceptEx_context *context; struct acceptEx_context *context;
@ -207,6 +208,7 @@ CALLBACK WSARecvCompletionRoutine(
} }
/* initiates async receive operation*/ /* initiates async receive operation*/
/* TODO - always return 0, or make this a void func. any error should be put in context*/
int int
socketio_WSARecv(struct w32_io* pio, BOOL* completed) { socketio_WSARecv(struct w32_io* pio, BOOL* completed) {
int ret = 0; int ret = 0;
@ -255,9 +257,10 @@ socketio_WSARecv(struct w32_io* pio, BOOL* completed) {
pio->read_details.pending = TRUE; pio->read_details.pending = TRUE;
} }
else { else {
errno = errno_from_WSALastError(); /* io has completed due to error, recv() will pick it up */
debug("WSARecv - WSARecv() ERROR: io:%p %d", pio, errno); debug("WSARecv - WSARecv() ERROR:%d io:%p", WSAGetLastError(), pio);
return -1; pio->read_details.error = WSAGetLastError();
return 0;
} }
} }
@ -407,8 +410,8 @@ socketio_recv(struct w32_io* pio, void *buf, size_t len, int flags) {
} }
else { else {
errno = errno_from_WSAError(pio->read_details.error); errno = errno_from_WSAError(pio->read_details.error);
pio->read_details.error = 0;
debug("recv - from CB ERROR:%d, io:%p", pio->read_details.error, pio); debug("recv - from CB ERROR:%d, io:%p", pio->read_details.error, pio);
pio->read_details.error = 0;
return -1; return -1;
} }
} }

View File

@ -46,6 +46,7 @@ static DWORD WINAPI ReadThread(
return 0; return 0;
} }
/* TODO - make this a void func*/
int int
termio_initiate_read(struct w32_io* pio) { termio_initiate_read(struct w32_io* pio) {
HANDLE read_thread; HANDLE read_thread;
@ -73,6 +74,7 @@ termio_initiate_read(struct w32_io* pio) {
return 0; return 0;
} }
/* TODO - make this a void func*/
static VOID CALLBACK WriteAPCProc( static VOID CALLBACK WriteAPCProc(
_In_ ULONG_PTR dwParam _In_ ULONG_PTR dwParam
) { ) {