I want to read asynchronously from a child process's stdin, so I create an IOCP loop that listens for when ReadFile()
is done. But ReadFile()
is never done, it always returns ERROR_IO_PENDING
, and WriteFile()
always returns ERROR_IO_PENDING
.
Node.js can asynchronously read from a child process:
const child_process = require("child_process")
var p = child_process.spawn("gcc")
p.stdout.on('data', data=>console.log(String(data)))
p.stderr.on('data', data=>console.log(String(data)))
How to do this in the Win32 API?
#include <windows.h>
#include <stdio.h>
#include <assert.h>
#include <wchar.h>
#pragma comment (lib, "User32.lib")
static HANDLE iocp = INVALID_HANDLE_VALUE;
typedef struct PROCESS {
OVERLAPPED ol;
HANDLE hProcess,
stdin_read, stdin_write,
stdout_read, stdout_write,
stderr_read, stderr_write;
char buf[100];
} PROCESS, *PPROCESS, *LPROCESS;
DWORD WINAPI Worker(LPVOID param);
BOOL create_pipe(HANDLE* pserver_pipe, HANDLE* pclient_pipe, PROCESS* p) {
static __int64 counter=0;
HANDLE server_pipe, client_pipe;
int err;
WCHAR name[64];
for (;;) {
swprintf(name, sizeof(name), L"\\\\?\\pipe\\child\\%Id.%p", counter, p);
server_pipe = CreateNamedPipeW(
name,
PIPE_ACCESS_OUTBOUND | FILE_FLAG_OVERLAPPED | FILE_FLAG_FIRST_PIPE_INSTANCE | WRITE_DAC,
PIPE_TYPE_BYTE | PIPE_READMODE_BYTE | PIPE_WAIT,
1,
65536,
65536,
0,
NULL);
if (server_pipe != INVALID_HANDLE_VALUE)
break;
err = GetLastError();
if (err != ERROR_PIPE_BUSY && err != ERROR_ACCESS_DENIED) {
return FALSE;
}
counter ;
}
SECURITY_ATTRIBUTES sa;
sa.nLength = sizeof sa;
sa.lpSecurityDescriptor = NULL;
sa.bInheritHandle = 1;
client_pipe = CreateFileW(name,
GENERIC_READ|WRITE_DAC,
0,
&sa,
OPEN_EXISTING,
FILE_FLAG_OVERLAPPED,
NULL);
if (client_pipe==INVALID_HANDLE_VALUE){
return FALSE;
}
if (CreateIoCompletionPort(client_pipe, iocp, (ULONG_PTR)p, 0)==NULL){
return FALSE;
}
if (CreateIoCompletionPort(server_pipe, iocp, (ULONG_PTR)p, 0)==NULL){
return FALSE;
}
*pclient_pipe = client_pipe;
*pserver_pipe = server_pipe;
return TRUE;
}
int wmain()
{
iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
assert(iocp);
PROCESS child_process{};
assert(create_pipe(&child_process.stdout_write, &child_process.stdout_read, &child_process));
assert(create_pipe(&child_process.stderr_write, &child_process.stderr_read, &child_process));
assert(create_pipe(&child_process.stdin_write, &child_process.stdin_read, &child_process));
HANDLE hThread = CreateThread(NULL, 0, Worker, iocp, 0, NULL);
assert(hThread);
WCHAR szCmdline[]=L"cmd";
PROCESS_INFORMATION piProcInfo{};
STARTUPINFOW siStartInfo{.cb = sizeof(STARTUPINFO),
.dwFlags=STARTF_USESTDHANDLES,
.hStdInput=child_process.stdin_read,
.hStdOutput=child_process.stdout_write,
.hStdError=child_process.stderr_write};
assert(CreateProcessW(NULL, szCmdline, NULL, NULL, TRUE, 0, NULL, NULL, &siStartInfo, &piProcInfo));
CloseHandle(piProcInfo.hProcess);
CloseHandle(piProcInfo.hThread);
// CloseHandle(child_process.stdout_write);
// CloseHandle(child_process.stdin_read);
ReadFile(child_process.stdout_read, child_process.buf, sizeof(child_process.buf), NULL, &child_process.ol);
int err = GetLastError();
if (err!=ERROR_IO_PENDING){
printf("Error in ReadFile %d\n", err);
}else{
puts("ReadFile is pending...\n");
}
char buf[100];
DWORD dwIn;
for(;;){
ReadConsoleA(GetStdHandle(STD_INPUT_HANDLE), buf, sizeof buf, &dwIn, NULL);
if (dwIn<=0)
break;
WriteFile(child_process.stdin_write, buf, dwIn, NULL, &child_process.ol);
err = GetLastError();
if (err!=ERROR_IO_PENDING){
printf("Error in WriteFile %d\n", err);
}else{
puts("WriteFile is pending...\n");
}
}
PostQueuedCompletionStatus(iocp, 0, 0, 0);
WaitForSingleObject(hThread, INFINITE);
CloseHandle(hThread);
CloseHandle(iocp);
}
DWORD WINAPI Worker(LPVOID param) {
DWORD dwIoSize = 0;
LPOVERLAPPED ol;
PROCESS* ctx;
for(;;){
BOOL bSuccess = GetQueuedCompletionStatus((HANDLE)param, &dwIoSize,
(PDWORD_PTR)&ctx,
&ol,
INFINITE);
if (ctx == NULL) {
printf("ctx is NULL, maybe you call PostQueuedCompletionStatus? err=%d\n", GetLastError());
break;
}
if (bSuccess==FALSE || dwIoSize == 0) {
printf("GetQueuedCompletionStatus does not success(maybe EOF reached?) err=%d\n", GetLastError());
break;
}
WriteConsoleA(GetStdHandle(STD_OUTPUT_HANDLE), ctx->buf, dwIoSize, NULL, NULL);
ReadFile(ctx->stdout_read, ctx->buf, sizeof(ctx->buf), NULL, &ctx->ol);
}
return 0;
}
CodePudding user response:
After search on Github, it's quite easy. Create a Named Pipe for current process(read), and create a File for child process(write).
Github links:
redirect stdout and stderr
#include <windows.h>
#include <assert.h>
#include <stdio.h>
struct Stdio;
typedef void (*Callback)(struct Stdio* self, DWORD len);
struct Stdio {
OVERLAPPED ol;
HANDLE pipe;
BYTE buf[100];
Callback callback;
};
typedef struct CTX {
struct Stdio Stdout, Stderr, Stdin;
} CTX, *LCTX, *LPCTX;
HANDLE Hstdout=INVALID_HANDLE_VALUE;
HANDLE Hstderr=INVALID_HANDLE_VALUE;
DWORD WINAPI Worker(LPVOID iocp){
struct Stdio *stdio;
OVERLAPPED *ol;
DWORD dwIoSize;
for(;;){
if (!GetQueuedCompletionStatus(iocp, &dwIoSize, (PDWORD_PTR)&stdio, &ol, INFINITE) || dwIoSize==0 || ol==NULL || stdio==NULL){
switch (GetLastError()){
case ERROR_BROKEN_PIPE:
puts("the process has been exited, exit thread...");
break;
default:
printf("error = %d, exit thread\n", GetLastError());
}
break;
}
stdio->callback(stdio, dwIoSize);
}
return 0;
}
void OnStdoutRead(struct Stdio *self, DWORD len){
WriteConsoleA(Hstdout, self->buf, len, NULL, NULL);
ReadFile(self->pipe, self->buf, sizeof(self->buf), NULL, &self->ol);
}
void OnStderrRead(struct Stdio *self, DWORD len){
WriteConsoleA(Hstderr, self->buf, len, NULL, NULL);
ReadFile(self->pipe, self->buf, sizeof(self->buf), NULL, &self->ol);
}
int wmain(){
assert(Hstdout = GetStdHandle(STD_OUTPUT_HANDLE));
assert(Hstderr = GetStdHandle(STD_ERROR_HANDLE));
HANDLE Pstdout, Pstderr; // child process's
CTX ctx{.Stdout=Stdio{.callback=OnStdoutRead}, .Stderr=Stdio{.callback=OnStderrRead}};
HANDLE iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
HANDLE hThread = CreateThread(NULL, 0, Worker, iocp, 0, NULL); // Worker thread
SECURITY_ATTRIBUTES sa{.nLength=sizeof(SECURITY_ATTRIBUTES), .bInheritHandle=TRUE};
const WCHAR* pipe_name1 = L"\\\\.\\Pipe\\child-1";
assert((ctx.Stdout.pipe = CreateNamedPipeW(
pipe_name1,
PIPE_ACCESS_INBOUND | FILE_FLAG_OVERLAPPED,
PIPE_TYPE_BYTE,
1,
4096,
4096,
5000,
NULL))!=INVALID_HANDLE_VALUE);
assert(INVALID_HANDLE_VALUE != (Pstdout = CreateFileW(
pipe_name1,
GENERIC_WRITE,
0,
&sa,
OPEN_EXISTING,
FILE_FLAG_OVERLAPPED,
NULL)));
const WCHAR *pipe_name2 = L"\\\\.\\Pipe\\child-2";
assert((ctx.Stderr.pipe = CreateNamedPipeW(
pipe_name2,
PIPE_ACCESS_INBOUND | FILE_FLAG_OVERLAPPED,
PIPE_TYPE_BYTE,
1,
4096,
4096,
5000,
NULL))!=INVALID_HANDLE_VALUE);
assert((Pstderr = CreateFileW(
pipe_name2,
GENERIC_WRITE,
0,
&sa,
OPEN_EXISTING,
FILE_FLAG_OVERLAPPED,
NULL))!=INVALID_HANDLE_VALUE);
STARTUPINFOW si{.cb = sizeof(si),
.dwFlags = STARTF_USESTDHANDLES | STARTF_USESHOWWINDOW,
.wShowWindow = SW_HIDE,
.hStdInput = GetStdHandle(STD_INPUT_HANDLE), // use current stdin
.hStdOutput = Pstdout,
.hStdError = Pstderr};
WCHAR cmd[] = L"powershell"; // or cmd, py, bash...
PROCESS_INFORMATION pInfo{};
assert(CreateProcessW(
NULL, cmd, NULL, NULL,
TRUE, 0, NULL, NULL,
&si, &pInfo));
assert(CloseHandle(Pstdout)); // we don't need this
assert(CloseHandle(Pstderr)); // we don't need this
assert(CreateIoCompletionPort(ctx.Stdout.pipe, iocp, (ULONG_PTR)&ctx.Stdout, 0));
assert(CreateIoCompletionPort(ctx.Stderr.pipe, iocp, (ULONG_PTR)&ctx.Stderr, 0));
ReadFile(ctx.Stdout.pipe, ctx.Stdout.buf, sizeof(ctx.Stdout.buf), NULL, &ctx.Stdout.ol);
ReadFile(ctx.Stderr.pipe, ctx.Stdout.buf, sizeof(ctx.Stderr.buf), NULL, &ctx.Stderr.ol);
WaitForSingleObject(pInfo.hProcess, INFINITE); // wait for process exit
PostQueuedCompletionStatus(iocp, 0, 0, NULL); // tell IOCP Worker exit
WaitForSingleObject(hThread, INFINITE); // wait for thread exit
assert(CloseHandle(hThread));
assert(CloseHandle(ctx.Stderr.pipe));
assert(CloseHandle(ctx.Stdout.pipe));
assert(CloseHandle(pInfo.hProcess));
assert(CloseHandle(iocp));
assert(CloseHandle(Hstderr));
puts("exit main function..."); // !!important: before close stdout
assert(CloseHandle(Hstdout));
}
Redirect stdin, stdout, stderr
#include <windows.h>
#include <assert.h>
#include <stdio.h>
static HANDLE Hstdout, Hstderr, Hstdin, HstopEvent;
struct Stdio;
typedef void (*Callback)(struct Stdio* self, DWORD len);
struct Stdio {
OVERLAPPED ol;
HANDLE pipe;
BYTE buf[100];
Callback callback;
};
typedef struct CTX {
struct Stdio Stdout, Stderr, Stdin;
} CTX, *LCTX, *LPCTX;
DWORD WINAPI Worker(LPVOID iocp){
struct Stdio *stdio;
OVERLAPPED *ol;
DWORD dwIoSize;
for(;;){
if (!GetQueuedCompletionStatus(iocp, &dwIoSize, (PDWORD_PTR)&stdio, &ol, INFINITE) || dwIoSize==0 || ol==NULL || stdio==NULL){
switch (GetLastError()){
case ERROR_BROKEN_PIPE:
SetEvent(HstopEvent);
puts("the process has been exited, exit thread...");
break;
default:
printf("error = %d, exit thread\n", GetLastError());
}
break;
}
stdio->callback(stdio, dwIoSize);
}
return 0;
}
void OnStdoutRead(struct Stdio *self, DWORD len){
WriteConsoleA(Hstdout, self->buf, len, NULL, NULL);
ReadFile(self->pipe, self->buf, sizeof(self->buf), NULL, &self->ol);
}
void OnStderrRead(struct Stdio *self, DWORD len){
WriteConsoleA(Hstderr, self->buf, len, NULL, NULL);
ReadFile(self->pipe, self->buf, sizeof(self->buf), NULL, &self->ol);
}
void OnStdinWriteComplete(struct Stdio *self, DWORD len){
printf("[%u bytes write to stdin]\n", len);
}
int wmain(){
assert((Hstdout = GetStdHandle(STD_OUTPUT_HANDLE))!=INVALID_HANDLE_VALUE);
assert((Hstderr = GetStdHandle(STD_ERROR_HANDLE))!=INVALID_HANDLE_VALUE);
assert((Hstdin = GetStdHandle(STD_INPUT_HANDLE))!=INVALID_HANDLE_VALUE);
assert((HstopEvent=CreateEventW(NULL, FALSE, FALSE, NULL))!=INVALID_HANDLE_VALUE);
CTX ctx{.Stdout=Stdio{.callback=OnStdoutRead}, .Stderr=Stdio{.callback=OnStderrRead}, .Stdin=Stdio{.callback=OnStdinWriteComplete}};
STARTUPINFOW si{.cb = sizeof(si), .dwFlags = STARTF_USESTDHANDLES};
HANDLE iocp = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
HANDLE hThread = CreateThread(NULL, 0, Worker, iocp, 0, NULL); // Worker thread
SECURITY_ATTRIBUTES sa{.nLength=sizeof(SECURITY_ATTRIBUTES), .bInheritHandle=TRUE};
const WCHAR* pipe_name1 = L"\\\\.\\Pipe\\child-1";
assert((ctx.Stdout.pipe = CreateNamedPipeW(
pipe_name1,
PIPE_ACCESS_INBOUND | FILE_FLAG_OVERLAPPED,
PIPE_TYPE_BYTE,
1,
4096,
4096,
5000,
NULL))!=INVALID_HANDLE_VALUE);
assert((si.hStdOutput = CreateFileW(
pipe_name1,
GENERIC_WRITE,
0,
&sa,
OPEN_EXISTING,
FILE_FLAG_OVERLAPPED,
NULL))!=INVALID_HANDLE_VALUE);
const WCHAR *pipe_name2 = L"\\\\.\\Pipe\\child-2";
assert((ctx.Stderr.pipe = CreateNamedPipeW(
pipe_name2,
PIPE_ACCESS_INBOUND | FILE_FLAG_OVERLAPPED,
PIPE_TYPE_BYTE,
1,
4096,
4096,
5000,
NULL))!=INVALID_HANDLE_VALUE);
assert((si.hStdError = CreateFileW(
pipe_name2,
GENERIC_WRITE,
0,
&sa,
OPEN_EXISTING,
FILE_FLAG_OVERLAPPED,
NULL))!=INVALID_HANDLE_VALUE);
const WCHAR *pipe_name3 = L"\\\\.\\Pipe\\child-3";
assert((ctx.Stdin.pipe = CreateNamedPipeW(
pipe_name3,
PIPE_ACCESS_OUTBOUND,
PIPE_TYPE_BYTE,
1,
4096,
4096,
5000,
NULL))!=INVALID_HANDLE_VALUE);
assert((si.hStdInput = CreateFileW(
pipe_name3,
GENERIC_READ,
0,
&sa,
OPEN_EXISTING,
FILE_FLAG_OVERLAPPED,
NULL))!=INVALID_HANDLE_VALUE);
WCHAR cmd[] = L"powershell"; // or cmd, py, bash...
PROCESS_INFORMATION pInfo{};
assert(CreateProcessW(
NULL, cmd, NULL, NULL,
TRUE, 0, NULL, NULL,
&si, &pInfo));
assert(CloseHandle(si.hStdError)); // we don't need this
assert(CloseHandle(si.hStdInput)); // we don't need this
assert(CloseHandle(si.hStdOutput)); // we don't need this
assert(CreateIoCompletionPort(ctx.Stdout.pipe, iocp, (ULONG_PTR)&ctx.Stdout, 0));
assert(CreateIoCompletionPort(ctx.Stderr.pipe, iocp, (ULONG_PTR)&ctx.Stderr, 0));
ReadFile(ctx.Stdout.pipe, ctx.Stdout.buf, sizeof(ctx.Stdout.buf), NULL, &ctx.Stdout.ol);
ReadFile(ctx.Stderr.pipe, ctx.Stdout.buf, sizeof(ctx.Stderr.buf), NULL, &ctx.Stderr.ol);
DWORD dwIoSize;
for(;;){
CHAR buf[100];
ReadConsoleA(Hstdin, buf, sizeof(buf), &dwIoSize, NULL);
if (WaitForSingleObject(HstopEvent, 0)==WAIT_OBJECT_0)
break;
WriteFile(ctx.Stdin.pipe, buf, dwIoSize, NULL, &ctx.Stdin.ol);
if (WaitForSingleObject(HstopEvent, 0)==WAIT_OBJECT_0)
break;
}
PostQueuedCompletionStatus(iocp, 0, 0, NULL); // tell IOCP Worker exit
WaitForSingleObject(hThread, INFINITE); // wait for thread exit
assert(CloseHandle(hThread));
assert(CloseHandle(ctx.Stderr.pipe));
assert(CloseHandle(ctx.Stdout.pipe));
assert(CloseHandle(pInfo.hProcess));
assert(CloseHandle(iocp));
assert(CloseHandle(Hstderr));
assert(CloseHandle(HstopEvent));
puts("exit main function..."); // !!important: before close stdout
assert(CloseHandle(Hstdout));
}