Home > Software engineering >  Convert function using callbacks into Async Iterator variant
Convert function using callbacks into Async Iterator variant

Time:11-01

Scenario

I'm given a function with an asynchronous callback like

let readFile:   (path: string, callback: (line: string, eof: boolean) => void) => void

Though I would prefer a function using AsyncIterable/AsyncGenerator signature instead:

let readFileV2: (path: string) => AsyncIterable<string>

Problem

Without readFileV2, I have to read a file like

let file = await new Promise((res, err) => {
    let file = ''
    readFile('./myfile.txt', (line, eof) => {
        if (eof) { return res(file) }
        file  = line   '\n'
    })
})

.. while readFileV2 allows me to do it cleaner like

let file = '';
for await (let line of readFileV2('./myfile.txt')) {
    file  = line   '\n'
}

Question

Is there a way for me to transform readFile into readFileV2?

Updated for clarification:

Is there a general approach to transform a function with an async callback argument to an AsyncGenerator/AsyncIterable variant?

And can this approach be demonstrated on the readFile function above?

References

I see two related questions here:

However, they don't seem to provide a clear answer.

CodePudding user response:

Disclaimer at the outset: I am answering the following question:

Given a data providing function fn of a form like (...args: A, callback: (data: T, done: boolean) => void) => void for some list of initial argument types A and data type T, how can we transform this function transform(fn) to produce a new function of the form (...args: A) => AsyncIterable<T>?

It is quite possible that this isn't the right thing to be doing in general, since consumers of AsyncIterable<T> may process data slowly or abort early, and a function of type (...args: [...A, (data: T, done: boolean) => void]) => void can't possibly react to that; it will call callback once per piece of data, whenever it wants, and it will not stop until it feels like it.


Still, here is one possible implementation:

const transform = <A extends any[], T>(
    fn: (...args: [...args: A, callback: (val: T, done: boolean) => void]) => void
) => (...args: A): AsyncIterable<T> => {
    let values: Promise<[T, boolean]>[] = [];
    let resolve: (x: [T, boolean]) => void;
    values.push(new Promise(r => { resolve = r; }));
    fn(...args, (val: T, done: boolean) => {
        resolve([val, done]);
        values.push(new Promise(r => { resolve = r; }));
    });
    return async function* () {
        let val: T;
        for (let i = 0, done = false; !done; i  ) {
            [val, done] = await values[i];
            delete values[i];
            yield val;
        }
    }();
}

Essentially we provide a queue of data values, values, which gets written to inside the callback passed to fn, and which gets read from inside a generator function. This is accomplished by a chain of promises; the first promise is created manually, and each time data is available, it resolves the current promise and pushes new values with a new promise onto the queue. The generator function awaits these promises, pulls data off the queue, and removes the consumed data.


To test it, someone needs to provide fn. Here's one possibility:

function sleep(ms: number) {
    return new Promise<void>(r => setTimeout(r, ms));
}

const provideData = async (name: string, callback: (line: string, eof: boolean) => void) => {
    const contents = [
        "This is line 1 of "   name, "and this is line 2",
        "and line 3", "and 4", "5",
        "and that's the end of "   name   "."
    ];
    for (const [line, eof] of contents.map((l, i, a) => [l, i >= a.length - 1] as const)) {
        await sleep(1000); // I guess it takes a second to read each line
        callback(line, eof);
    }
}

The provideData function accepts a callback and calls it once per second with successive lines of an array. And now we transform it:

const provideDataV2 = transform(provideData);
// let provideDataV2: (name: string) => AsyncIterable<string>

And let's test the transformer:

async function foo() {
    console.log(new Date().toLocaleTimeString(), "starting")
    const iter = provideDataV2("my data");
    await sleep(2500); // not ready to read yet, I guess    
    for await (let line of iter) {
        console.log(new Date().toLocaleTimeString(), line)
    }
    console.log(new Date().toLocaleTimeString(), "done")
}
foo()

/* 
[LOG]: "2:48:36 PM",  "starting" 
[LOG]: "2:48:37 PM",  "This is line 1 of my data" 
[LOG]: "2:48:38 PM",  "and this is line 2" 
[LOG]: "2:48:39 PM",  "and line 3" 
[LOG]: "2:48:40 PM",  "and 4" 
[LOG]: "2:48:41 PM",  "5" 
[LOG]: "2:48:42 PM",  "and that's the end of my data." 
[LOG]: "2:48:42 PM",  "done" 
*/

Looks good.

Is it perfect? Does it have weird side effects in response to weird situations (e.g., are you going to iterate it multiple times)? Should it handle errors in a particular way? Are there recommended solutions elsewhere? Not sure. This is just a possible implementation of transform that adheres to the contract laid out in the question as asked.

Playground link to code

CodePudding user response:

This has been a NodeJS-native API since v10, no need reinventing it:

const {createReadStream} = require('fs');
const {createInterface} = require('readline');

function readFileLines(fileName: string): AsyncIterable<string> {
    const input = createReadStream(fileName);
    return createInterface({input, crlfDelay: Infinity});
}

Testing it:

const lines = readFileLines('./test1.js');
for await(const l of lines) {
    console.log(l);
}
  • Related