Background worker processing

UI lags during texture loading are improved with Web Workers.

What is the community interest in a similar feature in BABYLON?

I’ve developed a JavaScript WorkerPool that makes processing and texture preparation in the background very easy for my use case. Should I expend (more) effort toward making it generically useful (and TypeScriptifying)? The "TODO"s listed below indicate some of the current limitations.

Features: 
Manages several workers executing the same background task simultaneously.
A single method, postPromise(), to execute task on the next available worker
Based around Promise() and easily integrated.
The current method of tracking worker busy/free keeps pool array from
changing frequently, at the expense of efficiently reducing number of workers.
Uses ECMA 2019 feature: function.toString()

Usage:
1. create payloadFunction(argObj) to use argObj properties and return value, Promise, or TypedArray.
2. retain the WorkerPool returned from FromWorkerFunction(payloadFunction)
3. pool.postPromise(argObject).then((value)=>{[use payloadFunction return value]}) 

Example:
const pool = FromWorkerFunction(payloadFunction);

const result = await pool.postPromise(argObj,transferableArray);
          or 
postPromise(argObj,transferableArray).then((value)=>{[use value]}, rejectFn)
// for use of transferableArray, see Web Worker documentation.

TODO TypeScriptify
TODO ? Need help testing other browsers and environments.
TODO ? works when return value itself is transferable, but not object *containing* transferable 
TODO ? develop example transferring pixel data as TypedArray instead of ImageBitmap
TODO ? handle worker onerror and onmessageerror
TODO ? allow abort and terminate
TODO ? add worker Timeout?
TODO ? onAllFreeObservable? other observables?
TODO ? Add optional resource removal on maxworkers reduction
TODO ? Allow more than 32 workers
TODO ? Verify Blob revokeURL handling. Improve?
TODO ? Verify Promise rejectFn handling.

I wonder how general this is versus more niche to your need.

The core WorkerPool class is very generic, while ImageDownloadPool is specific to my need. The primary function is to track workers busy and free, push tasks to free worker, and resolve user’s promise when worker is done. Here is the current JavaScript code:

class WorkerPool {
    // Features: 
    // Manages several workers executing the same background task simultaneously.
    // A single method, postPromise(), to execute task on the next available worker
    // Based around Promise() and easily integrated.
    // The current method of tracking worker busy/free keeps pool array from
    // changing frequently, at the expense of efficiently reducing number of workers.
    // Uses ECMA 2019 feature: function.toString()
    
    // Usage:
    // 1. create payloadFunction(argObj) to use argObj properties and return value, Promise, or TypedArray.
    // 2. retain the WorkerPool returned from FromWorkerFunction(payloadFunction)
    // 3. pool.postPromise(argObject).then((value)=>{[use payloadFunction return value]}) 
    
    // Example:
    // const pool = FromWorkerFunction(payloadFunction);
    
    // const result = await pool.postPromise(argObj,transferableArray);
    //           or 
    // postPromise(argObj,transferableArray).then((value)=>{[use value]}, rejectFn)
    // // for use of transferableArray, see Web Worker documentation.
    
    // TODO TypeScriptify
    // TODO ? Need help testing other browsers and environments.
    // TODO ? works when return value itself is transferable, but not object *containing* transferable 
    // TODO ? develop example transferring pixel data as TypedArray instead of ImagBitmap
    // TODO ? handle worker onerror and onmessageerror
    // TODO ? allow abort and terminate
    // TODO ? add worker Timeout?
    // TODO ? onAllFreeObservable? other observables?
    // TODO ? Add optional resource removal on maxworkers reduction
    // TODO ? Allow more than 32 workers
    // TODO ? Verify Blob revokeURL handling. Improve?
    // TODO ? Verify Promise rejectFn handling.
    constructor(url,maxworkers=3,lazy=true) {
        this.lazy = lazy;
        this.terminateOverMax = false; // untested
        this.pendingResolvers = new Map();
        this.pendingRequests = new Array();
        this.messageId = 0;
        this.maxWorkers = maxworkers;
        this.pool = new Array();
        this.verbose = false;
        if (!this.lazy)
            for (let i=0; i<this.maxWorkers; i++)
                this.pool.push(this.getNewWorker())
        this.initAllFree();
        this.url = url;
        this.revokeWhenDone = url;
    }
    getStats() {
        return {
            working: pendingResolvers.length,
            pending: pendingRequests.length,
            standby: this.pool.length-pendingResolvers.length,
            maximum: this.maxWorkers,
            created: this.pool.length,
            started: this.messageId,
            statusField: this.busyBitfield.toString(2),
        }
    }
    // this equation finds the index of the rightmost zero
    
    // rightmost zero bit: ~x&(x+1); javascript: x&-x
    // Index of rightmost zero: Math.log2(x&-x);
    // Faster: 31 - Math.clz32(~x&(x+1)); or x=~x; x&(-x)
    // 31-Math.clz32(~x&(x+1));
    // 
    // bitfield: track worker status with bit location = pool index 
    // bit is 0 if busy, 1 if free (or not created yet) 
    // initBitField = () => this.busyBitfield = ~0;
    // setBusy = (index)=>(~(1<<index))&this.busyBitfield
    // setFree = (index)=>(1<<index)|this.busyBitfield
    // isBusy = (index)=>0==((1<<index)&this.busyBitfield)
    // isFree = (index)=>0!=((1<<index)&this.busyBitfield)
    // 
    
    // // for Busy = 1
    // initAllFree = ()=>this.busyBitfield = 0; // all zeros
    // setFree = index=>this.busyBitfield &= ~(1 << index);
    // setBusy = index=>this.busyBitfield |= (1 << index);
    // getFree = ()=>31-Math.clz32(~this.busyBitfield&(this.busyBitfield+1));
    
    // for Busy = 0
    initAllFree = ()=>this.busyBitfield = ~0; // all ones
    setBusy = index=>this.busyBitfield &= ~(1 << index);
    setFree = index=>this.busyBitfield |= (1 << index);
    getFree = ()=>31-Math.clz32(this.busyBitfield&(-this.busyBitfield));
    
    // untested code to remove 32 worker limit:
    // if busy is array (lsbyte first), find first not-all-ones in busyBF array,
    // getFree() {
    // const bfindex = busyBF.findIndex(e=>v!=~0); 
    // if (bfindex==-1) {bfindex=busyBF.length; busyBF.push(~0};}
    // return (bfindex+1)*(1<indexBitsPerField)-Math.clz32(this.busyBitfield[bfindex]&(-this.busyBitfield[bfindex]))-1;
    // }

    // indexBitsPerField = 5; // how many bits used in clz32?
    // indexBitsPerField = Math.clz32(0) - (Math.clz32(Math.clz32(0))
    // initAllFree = ()=>this.busyBitfield = [~0]; // all ones
    // setBusy = index=>this.busyBitfield[index>>5] &= ~(1 << index&(1<<5-1);
    // setFree = index=>this.busyBitfield[index>>5] |= (1 << index&(1<<5-1);
    // getFree() {
    // let freeIndex = 0, i;
    // for (let i=0;i<this.busyBitfield.length;i++) {
    //   const current = 1<<5 - Math.clz32(this.busyBitfield&(-this.busyBitfield))
    //   freeIndex+=current;
    //   if (current) < 1<<5)break;}
    // if (i==this.busyBitfield.length) {
    //     this.busyBitfield.push(-0);
    //     return freeIndex;}
    // return freeIndex-1;}

    startNextRequest() {
        // this occurs on final worker message to determine if pendingRequests and worker available
        if (this.verbose) console.log('check pending requests: found',this.pendingRequests.length)
        if (!this.pendingRequests.length) return
        
        const index = this.getNextWorkerIndex();
        if (index < 0) {
            if (this.verbose) console.log('no available workers')
            return index;
        }
        const {args:[payload,arr],resolve,reject} = this.pendingRequests.pop();
        
        const id = this.messageId++;
        if (this.verbose) console.log('request to resolve id', id, payload,arr)
        this.pendingResolvers.set(id,{index,resolve,reject})
        this.pool[index].postMessage({id,index,payload},arr);
        return index;
    }
    
    onMessageFn(event) {
        const {id,index,data,error} = event.data;
        if (this.verbose) console.log('worker',index,'done, id',id,event)
        const resolver = this.pendingResolvers.get(id);
        //console.log('resolver',resolver,error,resolver.error)
        this.pendingResolvers.delete(id);
        this.setFree(index);
        // Note: workers with index below maxWorkers are not removed and continue to process requests.
        if (index >= this.maxWorkers) { // untested
            this.pool[index].terminate();
            this.pool[index] = null;
        }
        this.startNextRequest();
        error ? resolver.reject(error) : resolver.resolve(data);
    }
    
    getNewWorker() {
        const worker = new Worker(this.url);
        worker.onmessage = this.onMessageFn.bind(this);
        return worker;
    }
    getNextWorkerIndex() {
        const index = this.getFree();
        if (index >= this.maxWorkers  || index < 0) return -1;
        if (this.pool.length <= index) {
            this.pool.length = index+1;
            this.pool[index] = this.getNewWorker();
        } 
        this.setBusy(index);
        return index;
    }
    postPromise(payload,arr) {
        const index = this.getNextWorkerIndex();
        if (index >= 0) {
            const worker = this.pool[index];
            const id = this.messageId++
            if (this.verbose) console.log('posting id',id,'to index',index,payload)//,arr)
            return new Promise((resolve,reject)=>{
                this.pendingResolvers.set(id,{index,resolve,reject})
                this.pool[index].postMessage({id,index,payload},arr);
            })
        } else {
            if (this.verbose) console.log('posting to requests',payload,arr)
            return new Promise((resolve,reject)=>{
                this.pendingRequests.push({args:[payload,arr],resolve,reject});
            })
        }
    }
    
    // removeWorker() {
    //     if (noMoreWorkers) {
    //         //If the objectURL argument passed is not a currently-active object URL — for example if it is an invalid URL, non-object URL, or is already revoked — then calling this method does nothing.
    //         // URL.createObjectURL(blob);
    //         URL.revokeObjectURL(objectURL);
    //     }
    // }
    static FromJavaScript(javascript,maxworkers,lazy) {
        const blob = new Blob([javascript], {type: 'text/javascript'})
        const url = URL.createObjectURL(blob); // should also URL.revokeObjectURL(this.url)
        return new this(url,maxworkers,lazy);
    }
    
    static FromEventListenerFunction(workerFunction,maxworkers,lazy){
        // ECMA 2019 requires source code from fn.toString() 
        return this.FromJavaScript(`self.addEventListener('message',${workerFunction.toString()});`,maxworkers,lazy);
    }
    
    // 1. create a payloadFunction(argObject) that uses argObject properties and returns value, Promise, or TypedArray.
    // 2. retain the WorkerPool returned from FromWorkerFunction(payloadFunction)
    // 3. pool.postPromise(argObject).then((argObject)=>{[function using argObject properties]})
    static FromWorkerFunction(payloadFunction,maxworkers,lazy){
        // ECMA 2019 requires source code from fn.toString() 
        //const fnString = payloadFunction.toString();
        
        const eventFn = `async function workerFunction(event) {
            const payload = event.data.payload;
            let value = ${payloadFunction.toString()}
            value = value(payload);
            const isPromise=(obj)=>!!obj && (typeof obj === 'object' || typeof obj === 'function') && typeof obj.then === 'function';
            if (!isPromise(value)) value = Promise.resolve(value);

            value.then((value)=>{
                try {
                    self.postMessage({ id:event.data.id,index:event.data.index,data: value},[value.BYTES_PER_ELEMENT?value.buffer:value]);
                } catch (e) {
                    self.postMessage({ id:event.data.id,index:event.data.index,data: value});
                }
            })
            .catch(error=>self.postMessage({ id:event.data.id,index:event.data.index, error: error.message }))
        }`
                console.log('function',eventFn)
        return this.FromJavaScript(`self.addEventListener('message',${eventFn});`,maxworkers,lazy);
    }
    
}

Specific to image bitmap:

class ImageDownloadPool extends WorkerPool {
    // Blob URLs should not be reused, they should be recreated if needed.

    requestImageBitmap(imageUrl,maxsize) {
        const nonrelative = URL.canParse(imageUrl)?imageUrl:new URL(imageUrl, document.location).toString()
        const argObject = {url:nonrelative,maxsize,}
        return this.postPromise(argObject)
    }
    static GetImagePool(maxworkers,lazy) {
        async function workerFunction(payload) {
            const response = await fetch(payload.url);
            const blob = await response.blob();
            let bitmap = await createImageBitmap(blob);
            if (payload.maxsize !=0 && bitmap.width !=0 && bitmap.height !=0 && bitmap.width>payload.maxsize || bitmap.height>payload.maxsize) {
                const scale = Math.min(payload.maxsize/bitmap.width , payload.maxsize/bitmap.height)
                bitmap = await createImageBitmap(bitmap, { 
                    resizeWidth: bitmap.width*scale,
                    resizeHeight: bitmap.height*scale,
                    resizeQuality: 'high'});
            }
            // Transfer back to main thread
            return bitmap;
        }
        return this.FromWorkerFunction(workerFunction,maxworkers,lazy);
    }
}

Example Usage:

imagepool = ImageDownloadPool.GetImagePool(2)
imagepool.requestImageBitmap( imageUrl, this.mesh.getEngine().getCaps().maxTextureSize).then(function(imageBitmap){
            const texture = BABYLON.RawTexture.CreateRGBATexture(
                imageBitmap, 
                imageBitmap.width, 
                imageBitmap.height, 
                scene,
                true,false // generateMipMaps, invertY
            );
...

Edit to add: a single WorkerPool only handles a single function (repeated to multiple workers in the pool). It is totally agnostic to worker functionality. If another “function” is needed, then another WorkerPool can be defined.

Alternatively, additional functions can be added from the beginning inside the workerFunction, with decision logic based on the incoming argumentObject within the user-provided workerFunction. Work is then completed on a first-come, first-served basis within the maxworkers limit.

The input, output, and functionality/work is completely defined by the user-provided matching pair of postPromise() and workerFunction. In the code above, the ImageDownloadPool defines the “user-provided” postPromise (“requestImageBitmap()”) and workerFunction (“workerFunction()” within the GetImagePool static method) while the underlying WorkerPool manages busy/free workers and sending work to next available worker while tracking the resolve/reject promise functions associated with each submitted argument object. When a specific worker completes and returns the resulting object, workerpool “agnosticly” resolves the associated promise with the return value from the user-provided workerFunction.

Further, the specific main thread action is completely user-defined by resolving the promise returned by postPromise(), or the user function calling postPromise (e.g. requestImageBitmap() method within class ImageDownloadPool above).