// Copyright (c) 2020, Compiler Explorer Authors // All rights reserved. // // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions are met: // // * Redistributions of source code must retain the above copyright notice, // this list of conditions and the following disclaimer. // * Redistributions in binary form must reproduce the above copyright // notice, this list of conditions and the following disclaimer in the // documentation and/or other materials provided with the distribution. // // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" // AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE // ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE // LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR // CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF // SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS // INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN // CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) // ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE // POSSIBILITY OF SUCH DAMAGE. import {executionAsyncId} from 'async_hooks'; import {default as Queue} from 'p-queue'; export class CompilationQueue { constructor(concurrency, timeout) { this._running = new Set(); this._queue = new Queue({ concurrency, timeout, throwOnTimeout: true, }); } static fromProps(ceProps) { return new CompilationQueue( ceProps('maxConcurrentCompiles', 1), ceProps('compilationEnvTimeoutMs')); } enqueue(job) { const enqueueAsyncId = executionAsyncId(); // If we're asked to enqueue a job when we're already in a async queued job context, just run it. // This prevents a deadlock. if (this._running.has(enqueueAsyncId)) return job(); return this._queue.add(() => { const jobAsyncId = executionAsyncId(); if (this._running.has(jobAsyncId)) throw new Error('somehow we entered the context twice'); try { this._running.add(jobAsyncId); return job(); } finally { this._running.delete(jobAsyncId); } }); } status() { const pending = this._queue.pending; const size = this._queue.size; return { busy: pending > 0 || size > 0, pending, size, }; } }