export default [
    '_',
    '$log',
    'signalStream',
    'chartDisplayUtils',
    'timepickerUtils',
    'kubeDataService',
    function (_, $log, signalStream, chartDisplayUtils, timepickerUtils, kubeDataService) {
        // Default JOB resolution of a k8s job
        const DEFAULT_JOB_RESOLUTION = 1000;
        const JOB_POINT_DENSITY = 1;
        const FALLBACK_RESOLUTION = 300000; // 5m

        const DEBOUNCE_TIMEOUT = 500;
        const THROTTLE_PERIOD = 1000;
        const MAX_DATA_UPDATE_BACKOFF = 5000;
        const STREAM_UPDATE_DEBOUNCE = 50;

        return function () {
            let tsidToMetadata = {};
            let metricData = {};
            let dataCallback = null;
            let aggregationCallback = null;
            let streamObject;
            let streamOptions = null;
            let idFunction = null;

            const startStreams = _.debounce(
                () => (streamObject = signalStream.stream(streamOptions)),
                DEBOUNCE_TIMEOUT
            );

            const throttledDataEmit = _.throttle((metricData, latestTimestampSeen) => {
                if (dataCallback) {
                    dataCallback(metricData, latestTimestampSeen);
                }
            }, THROTTLE_PERIOD);

            /**
             * Serves as a capped back-off for rapid stream updates.
             *
             * Together with throttledDataEmit this debouncer will back-off from UI updates
             * for rapidly updating stream for a reasonable amount of time (MAX_DATA_UPDATE_BACKOFF).
             * When the time-series starts to stream at a slower pace (mostly in live data mode),
             * throttledDataEmit takes over most control for per second UI update.
             *
             * The two are required in combination because for k8s, there are usually multiple
             * streams powering the table/entity-detail component and might run at varying resolutions.
             */
            const emitData = _.debounce(throttledDataEmit, STREAM_UPDATE_DEBOUNCE, {
                maxWait: MAX_DATA_UPDATE_BACKOFF,
            });

            return {
                setJobParams,
                start,
                stop,
                reset,
                addDataCallback,
                addAggregationCallback,
            };

            function setJobParams(idFunc, config, filter, time) {
                streamOptions = null;
                idFunction = null;

                if (!idFunc || !config) {
                    return;
                }

                idFunction = idFunc;
                const program = kubeDataService.getProcessedProgramText(config, filter || []);

                let timeConfig = null;
                if (time && time.startTime) {
                    timeConfig = timepickerUtils.getChartConfigParametersFromURLTimeObject({
                        start: time.startTime,
                        end: time.endTime || 'Now',
                        relative: _.isString(time.startTime) && time.startTime.startsWith('-'),
                    });
                }

                streamOptions = createStreamOptions(program, timeConfig);
            }

            function createStreamOptions(program, timeConfig) {
                let historyRange;
                let endAt;

                if (timeConfig) {
                    historyRange = chartDisplayUtils.getFetchDurationFromConfig(false, timeConfig);
                    endAt = chartDisplayUtils.getEndDurationFromConfig(false, timeConfig);
                }

                const resolution = chartDisplayUtils.getResolution(
                    Math.abs(historyRange),
                    DEFAULT_JOB_RESOLUTION,
                    JOB_POINT_DENSITY
                );

                return {
                    resolution,
                    fallbackResolutionMs: FALLBACK_RESOLUTION,
                    bulk: true,
                    historyrange: historyRange || -1 * (DEFAULT_JOB_RESOLUTION * 2),
                    stopTime: endAt,
                    signalFlowText: program,
                    ephemeral: true,
                    withDerivedMetadata: true,
                    offsetByMaxDelay: true,
                    resolutionAdjustable: false,
                    streamStartCallback: (id) => streamStartCallback(id),
                    metaDataUpdated: (metadata, tsId) => metaDataUpdated(metadata, tsId),
                    callback: (tsIdsData) => dataUpdated(tsIdsData),
                    onFeedback: onStreamFeedback,
                };
            }

            function onStreamFeedback(feedback) {
                const jobResolutionMsg =
                    feedback &&
                    feedback.find(
                        (feedbackItem) => feedbackItem.messageCode === 'JOB_RUNNING_RESOLUTION'
                    );
                if (jobResolutionMsg) {
                    streamOptions.resolution = jobResolutionMsg.contents.resolutionMs;
                }
            }

            function start() {
                stop();

                if (!streamOptions) {
                    return null;
                }

                startStreams();
                return streamOptions;
            }

            function metaDataUpdated(metadata, tsid) {
                tsidToMetadata[tsid] = metadata;
            }

            function addDataCallback(callback) {
                dataCallback = callback;
            }

            function addAggregationCallback(callback) {
                aggregationCallback = callback;
            }

            function dataUpdated(tsidData) {
                let latestTimestampSeen = 0;

                Object.keys(tsidData).forEach((tsid) => {
                    const metadata = tsidToMetadata[tsid];
                    const metricLabel = metadata.sf_streamLabel;
                    const id = idFunction(metadata);

                    const dataTimestamp = tsidData[tsid].timestamp;
                    let latestValue = tsidData[tsid].value;

                    if (!metricData[metricLabel]) {
                        metricData[metricLabel] = {};
                    }

                    if (!metricData[metricLabel][id]) {
                        metricData[metricLabel][id] = {};
                    }

                    const data = metricData[metricLabel][id];

                    // Ignore old data
                    if (data.timestamp && data.timestamp > dataTimestamp) {
                        return;
                    }

                    data.id = id;
                    if (aggregationCallback) {
                        latestValue = aggregationCallback(data, metadata, latestValue);
                    }

                    latestTimestampSeen = Math.max(latestTimestampSeen, dataTimestamp);

                    if (latestValue !== null) {
                        data.value = latestValue;
                    }

                    // signalStream's option bulk = true will send null for all TSIDs not published by the
                    // time it dispatches its batch update.
                    // It is very likely that these time series will publish data out-of-sync
                    // from each other and thus data points should not be immediately set to null.
                    if (
                        latestValue === null &&
                        data.value !== null &&
                        dataTimestamp - data.timestamp > streamOptions.resolution
                    ) {
                        data.value = null;
                    }

                    angular.extend(data, metadata);
                    // only update the timestamp if the latest data wasn't null, this will allow stale data to drop out over time
                    if (latestValue !== null) {
                        data.timestamp = dataTimestamp;
                    }
                    data[metricLabel] = data.value;
                });

                emitData(metricData, latestTimestampSeen);
            }

            function streamStartCallback(id) {
                $log.info(`stream has started with id: ${id}`);
            }

            function stop() {
                startStreams.cancel();

                if (streamObject) {
                    streamObject.stopStream();
                    streamObject = null;
                }
                reset();
                emitData.cancel();
                throttledDataEmit.cancel();
            }

            function reset() {
                tsidToMetadata = {};
                metricData = {};
            }
        };
    },
];
