angular.module('signalboost').factory('signalStreamPreRunner', [
    'signalStream',
    '$interval',
    '$log',
    'signalviewMetrics',
    function (signalStream, $interval, $log, signalviewMetrics) {
        const computingForToStreamObject = {};
        const computingForTimings = {};
        const jobReapTime = 30000;
        const metricPreRunnerPrefix = 'ui.dashboard.prerunner';

        // this function fixes comparisons between null and undefined to make things easier for prerunner equality checks
        function compareJobOptionInequality(a, b) {
            return coerceNullUndefined(a) !== coerceNullUndefined(b);
        }

        function coerceNullUndefined(a) {
            if (a === undefined || a === null) {
                return null;
            } else {
                return a;
            }
        }

        function clearJob(computingFor) {
            if (computingForToStreamObject[computingFor]) {
                computingForToStreamObject[computingFor].stopStream();
                delete computingForToStreamObject[computingFor];
                delete computingForTimings[computingFor];
            }
        }

        function reapUnclaimedJobs() {
            const now = Date.now();
            angular.forEach(computingForTimings, function (startTime, computingFor) {
                if (computingForToStreamObject[computingFor] && now - startTime > jobReapTime) {
                    clearJob(computingFor);
                    $log.warn('A job for ' + computingFor + ' was reaped due to cache expiration!');
                }
            });
        }

        $interval(reapUnclaimedJobs, jobReapTime, 0, false);

        function prerun(jobOpts) {
            clearJob(jobOpts);
            computingForToStreamObject[jobOpts.computingFor] = signalStream.stream(jobOpts);
            computingForToStreamObject[jobOpts.computingFor].suspendStreaming();
            computingForTimings[jobOpts.computingFor] = Date.now();
        }

        return {
            prerun: prerun,
            fetch: function fetch(jobOpts) {
                const computingFor = jobOpts.computingFor;
                const streamObj = computingForToStreamObject[computingFor];
                if (streamObj) {
                    let jobMatch = true;
                    const originalJobOpts = streamObj.getJobOpts();
                    if (originalJobOpts.signalFlowText !== jobOpts.signalFlowText) {
                        jobMatch = false;
                        $log.error(
                            'Attempted to connect to a pre-run job that did not have matching signalflow!'
                        );
                    }
                    if (
                        compareJobOptionInequality(originalJobOpts.resolution, jobOpts.resolution)
                    ) {
                        jobMatch = false;
                        $log.error(
                            'Attempted to connect to a pre-run job that did not have matching resolution!'
                        );
                    }
                    const durationMatch =
                        Math.abs(jobOpts.stopTime - jobOpts.historyrange) ===
                        Math.abs(originalJobOpts.stopTime - originalJobOpts.historyrange);
                    const livelinessMatch =
                        (jobOpts.stopTime === 0 && originalJobOpts.stopTime === 0) ||
                        (jobOpts.stopTime < 0 && originalJobOpts.stopTime < 0);
                    if (!durationMatch || !livelinessMatch) {
                        jobMatch = false;
                        $log.error(
                            'Attempted to connect to a pre-run job that did not have matching stop time!'
                        );
                    }
                    if (
                        compareJobOptionInequality(originalJobOpts.sampleSize, jobOpts.sampleSize)
                    ) {
                        jobMatch = false;
                        $log.error(
                            'Attempted to connect to a pre-run job that did not have matching sample size!'
                        );
                    }

                    if (compareJobOptionInequality(originalJobOpts.filter, jobOpts.filter)) {
                        jobMatch = false;
                        $log.error(
                            'Attempted to connect to a pre-run job that did not have matching filter!'
                        );
                    }

                    if (
                        compareJobOptionInequality(
                            originalJobOpts.replaceOnlyFilter,
                            jobOpts.replaceOnlyFilter
                        )
                    ) {
                        jobMatch = false;
                        $log.error(
                            'Attempted to connect to a pre-run job that did not have matching replaceOnlyFilter!'
                        );
                    }

                    if (compareJobOptionInequality(originalJobOpts.maxDelay, jobOpts.maxDelay)) {
                        jobMatch = false;
                        $log.error(
                            'Attempted to connect to a pre-run job that did not have matching maxDelay!'
                        );
                    }

                    if (compareJobOptionInequality(originalJobOpts.timezone, jobOpts.timezone)) {
                        jobMatch = false;
                        $log.error(
                            'Attempted to connect to a pre-run job that did not have matching timezone!'
                        );
                    }

                    if (
                        compareJobOptionInequality(
                            originalJobOpts.fallbackResolutionMs,
                            jobOpts.fallbackResolutionMs
                        )
                    ) {
                        jobMatch = false;
                        $log.error(
                            'Attempted to connect to a pre-run job that did not have matching fallbackResolutionMs!'
                        );
                    }

                    if (!jobMatch) {
                        computingForToStreamObject[computingFor].stopStream();
                        delete computingForToStreamObject[computingFor];
                        signalviewMetrics.incr(metricPreRunnerPrefix + '.miss');
                        return null;
                    }
                    signalviewMetrics.incr(metricPreRunnerPrefix + '.hit');
                    delete computingForToStreamObject[computingFor];
                    delete computingForTimings[computingFor];
                }
                return streamObj || null;
            },
        };
    },
]);
