export default [
    'signalStream',
    '$log',
    '_',
    'timepickerUtils',
    'chartDisplayUtils',
    'clusterMapStateFactory',
    'clusterMapFilterUtils',
    function (
        signalStream,
        $log,
        _,
        timepickerUtils,
        chartDisplayUtils,
        clusterMapStateFactory,
        clusterMapFilterUtils
    ) {
        const FALLBACK_RESOLUTION = 60 * 1000; // A minute
        const MIN_MISSING_DATA_POINTS_TO_DEAD = 2;
        const MAX_MISSING_DATA_POINTS_TO_DEAD = 5;
        const THROTTLE_PERIOD = 1000;
        const MAX_DATA_UPDATE_BACKOFF = 5000;
        const STREAM_UPDATE_DEBOUNCE = 50;

        return function createStream(dataConfig) {
            // State Management
            const kubeState = clusterMapStateFactory(dataConfig);

            // Stream Management
            const stoppedResources = {};
            const streamObjectsByResource = {};
            const metadataUpdates = new Set();
            const tsIdsRequireMetadataUpdates = new Set();
            let callback;

            // Data Management
            let missingDatapointCounter = {};
            let tsIdToMetaObject = {};
            let idToMetadataObject = {};
            let filters;
            let forceFilterUpdates = false;
            let firstDataEmitComplete = false; // Allow callback to determine presence of some data

            // Emit data every second with a back-off for large streams
            const throttledDataEmit = _.throttle(applyFiltersAndEmitState, THROTTLE_PERIOD);

            /**
             * Serves as a capped back-off for rapid stream updates.
             *
             * Together with throttledDataEmit this debouncer will back-off from UI updates
             * for rapidly updating stream for a reasonable amount of time (MAX_DATA_UPDATE_BACKOFF).
             * When the time-series starts to stream at a slower pace (mostly in live data mode),
             * throttledDataEmit takes over most control for per second UI update.
             *
             * The two are required in combination because for k8s cluster map there are multiple
             * streams powering the component and might run at varying resolutions.
             */
            const emitData = _.debounce(throttledDataEmit, STREAM_UPDATE_DEBOUNCE, {
                maxWait: MAX_DATA_UPDATE_BACKOFF,
            });

            return {
                init: _.debounce(restart, 1000),
                startStreaming, // Start Required jobs
                stopStreaming, // Stop all jobs
                reset: resetInternal, // Reset State, as if starting fresh
                getConfig,
                cleanup,
                setCallback,
                updateFilterState: applyFiltersAndEmitState,
                findChildMatchingAnalyzerResult,
                getStateFilters,
                passivelySetStateFilters,
            };

            function setCallback(callbackFunc) {
                callback = callbackFunc;
            }

            function getConfig() {
                return dataConfig;
            }

            function getStateFilters() {
                return filters;
            }

            // Set filters passively if we have already processed these filters
            // but other parts of the app still needs updating.
            // This avoids double processing of state tree.
            function passivelySetStateFilters(newFilters) {
                filters = newFilters;
            }

            function createStreamObject(resourceType, job, time) {
                const resolution = dataConfig.getResolution();
                const maxTimeRangeToQuery = getMaxTimeRangeToQuery(FALLBACK_RESOLUTION);
                let timeConfig = null;
                if (time && time.startTime) {
                    timeConfig = timepickerUtils.getChartConfigParametersFromURLTimeObject({
                        start: time.startTime,
                        end: time.endTime || 'Now',
                        relative: _.isString(time.startTime) && time.startTime.startsWith('-'),
                    });
                }

                let startAtRelativeToNow = -maxTimeRangeToQuery;
                let endAtRelativeToNow = 0;
                if (timeConfig) {
                    startAtRelativeToNow = chartDisplayUtils.getFetchDurationFromConfig(
                        false,
                        timeConfig
                    );
                    endAtRelativeToNow = chartDisplayUtils.getEndDurationFromConfig(
                        false,
                        timeConfig
                    );
                }

                const range = Math.min(
                    Math.abs(endAtRelativeToNow - startAtRelativeToNow),
                    maxTimeRangeToQuery
                );

                const streamObject = {
                    range,
                    resourceType,
                    missingDataPointLimit: getMissingDataPointLimit(resolution, resourceType),
                };

                const options = {
                    bulk: true,
                    resolution: resolution,
                    historyrange: endAtRelativeToNow - range,
                    stopTime: endAtRelativeToNow,
                    fallbackResolutionMs: FALLBACK_RESOLUTION,
                    signalFlowText: job,
                    ephemeral: true,
                    withDerivedMetadata: true,
                    offsetByMaxDelay: true,
                    resolutionAdjustable: false,
                    useCache: true,
                    streamStartCallback: (id) => streamStartCallback(id, streamObject),
                    metaDataUpdated: (metadata, tsId) =>
                        metaDataUpdated(metadata, tsId, streamObject),
                    callback: (tsIdsData) => onNewData(tsIdsData, streamObject),
                    onFeedback: (feedback) => onStreamFeedback(feedback, streamObject),
                };

                streamObject.streamer = signalStream.stream(options);
                streamObjectsByResource[resourceType].push(streamObject);
            }

            function startStreaming(resourceType, time, range) {
                if (!callback) {
                    return;
                }

                if (_.isEmpty(streamObjectsByResource[resourceType])) {
                    streamObjectsByResource[resourceType] = [];
                }

                const resourceDataConfig = dataConfig.get(resourceType);

                stopStreaming(resourceType);
                stoppedResources[resourceType] = false;

                const resourceJobs = resourceDataConfig.getResourceJobs();
                createStreamObject(resourceType, resourceJobs, time, range);
            }

            function metaDataUpdated(
                metadata,
                tsId,
                { streaming, resourceType, missingDataPointLimit }
            ) {
                if (!streaming || !resourceType || stoppedResources[resourceType] || !callback)
                    return;

                const resourceConfig = dataConfig.get(resourceType);
                const metadataId = resourceConfig.getId(metadata);
                const metaObject = resourceConfig.processMetaObjectForTsId(
                    metadata,
                    tsId,
                    idToMetadataObject[metadataId]
                );
                idToMetadataObject[metadataId] = metaObject;

                tsIdToMetaObject[tsId] = metaObject;
                tsIdsRequireMetadataUpdates.add(tsId);
                missingDatapointCounter[tsId] = missingDataPointLimit; // No updates unless at least one data point arrives

                if (firstDataEmitComplete) {
                    // If metadata update is happening in between other streams,
                    // debounce the data emit on metadata update
                    emitData();
                }
            }

            function applyFiltersAndEmitState(newFilters) {
                const stateRoot = kubeState.getState();

                filters = clusterMapFilterUtils.updateHierarchyWithNewFilters(
                    stateRoot,
                    dataConfig,
                    newFilters,
                    filters,
                    forceFilterUpdates
                );
                forceFilterUpdates = false;

                if (callback) {
                    callback(stateRoot);
                }
            }

            function findChildMatchingAnalyzerResult(cluster, targetKeyValuePair) {
                return kubeState.findChildMatchingAnalyzerResult(cluster, targetKeyValuePair);
            }

            function onNewData(tsIdsToData, { streaming, resourceType, missingDataPointLimit }) {
                if (!streaming || stoppedResources[resourceType] || !callback) return;

                const flaggedDeadHosts = new Set();
                const hostsNotDeadYet = new Set();
                const updates = [];

                for (const tsId in tsIdsToData) {
                    const { value } = tsIdsToData[tsId];

                    if (value === null) {
                        missingDatapointCounter[tsId] = Math.min(
                            missingDataPointLimit,
                            missingDatapointCounter[tsId] + 1
                        );
                    } else {
                        missingDatapointCounter[tsId] = 0;
                    }

                    const metadataObject = tsIdToMetaObject[tsId];

                    if (!metadataObject) {
                        $log.error(
                            'Unable to find Metadata associated with tsId',
                            tsId,
                            ' with value ',
                            tsIdsToData[tsId]
                        );
                        continue;
                    }

                    const key = metadataObject.id;
                    const metadataUpdateRequired = tsIdsRequireMetadataUpdates.has(tsId);

                    if (missingDatapointCounter[tsId] >= missingDataPointLimit) {
                        if (!metadataObject.isMarkedDead) {
                            flaggedDeadHosts.add(key);
                        }
                        continue;
                    }

                    hostsNotDeadYet.add(key);

                    if (metadataUpdateRequired || metadataObject.isMarkedDead) {
                        metadataUpdates.add(metadataObject);
                        tsIdsRequireMetadataUpdates.delete(tsId);

                        metadataObject.isMarkedDead = false; // Revive Resource
                    }

                    if (metadataObject.streamData[tsId].isColoringJob) {
                        const data = Object.assign(
                            { key, value, resourceType },
                            metadataObject.streamData[tsId]
                        );
                        updates.push(data);
                    }
                }

                // Only hosts not marked alive by any of the time series are considered dead
                const latestDeadHosts = new Set(
                    [...flaggedDeadHosts].filter((hostKey) => !hostsNotDeadYet.has(hostKey))
                );

                const shouldEmitData =
                    latestDeadHosts.size || metadataUpdates.size || updates.length;

                if (latestDeadHosts.size) {
                    [...latestDeadHosts].forEach((key) => {
                        const metaObject = idToMetadataObject[key];
                        if (metaObject) {
                            metaObject.isMarkedDead = true;
                        }
                    });

                    kubeState.removeDeadResources(latestDeadHosts);
                    forceFilterUpdates = true;
                }

                // Flush any pending metaObjects prior to data update
                flushMetadataUpdates();

                if (updates.length) {
                    kubeState.updateData(updates);
                }

                if (shouldEmitData || !firstDataEmitComplete) {
                    firstDataEmitComplete = true;
                    emitData();
                }
            }

            function streamStartCallback(id, streamObject) {
                streamObject.streaming = true;
                $log.debug('Kubernetes Navigator Job Id:', id, streamObject.resourceType);
            }

            function onStreamFeedback(feedback, streamObject) {
                if (feedback && feedback.length) {
                    feedback.forEach(function (feedbackItem) {
                        if (feedbackItem.messageCode === 'JOB_RUNNING_RESOLUTION') {
                            onUpdateResolution(feedbackItem.contents.resolutionMs, streamObject);
                        }
                    });
                }
            }

            function onUpdateResolution(updatedResolution, streamObject) {
                const jobResolution = streamObject.resolution || dataConfig.getResolution();
                if (jobResolution !== updatedResolution) {
                    streamObject.resolution = updatedResolution;
                    $log.warn(
                        'Got different job resolution for job. Expecting',
                        jobResolution,
                        ', Got:',
                        updatedResolution
                    );
                }
                streamObject.missingDataPointLimit = getMissingDataPointLimit(
                    updatedResolution,
                    streamObject.resourceType
                );
            }

            function getMissingDataPointLimit(resolution) {
                const deadHostPeriod = dataConfig.getDeadHostPeriod();
                const calculatedMissingDataPointLimit = deadHostPeriod
                    ? Math.ceil(deadHostPeriod / resolution)
                    : MAX_MISSING_DATA_POINTS_TO_DEAD;

                return Math.max(MIN_MISSING_DATA_POINTS_TO_DEAD, calculatedMissingDataPointLimit);
            }

            function stopStreaming(resourceType) {
                throttledDataEmit.cancel();
                emitData.cancel();

                if (resourceType && !stoppedResources[resourceType]) {
                    $log.debug(
                        'Stopping Kubernetes Navigator ' + resourceType + ' data service streaming!'
                    );
                    stoppedResources[resourceType] = true;
                    streamObjectsByResource[resourceType].forEach((streamObject) => {
                        streamObject.streamer.stopStream();
                        streamObject.streaming = false;
                    });
                    streamObjectsByResource[resourceType] = [];
                } else if (!resourceType) {
                    for (const resource in stoppedResources) {
                        stopStreaming(resource);
                    }
                    resetInternal();
                }
            }

            function restart(resourceType) {
                if (streamObjectsByResource) {
                    $log.debug('Resuming kubernetes cluster map data service streaming.');
                    if (resourceType) {
                        streamObjectsByResource[resourceType].forEach((streamObject) =>
                            streamObject.streamer.resumeStreaming()
                        );
                    } else {
                        for (const resource in stoppedResources) {
                            if (stoppedResources[resource]) {
                                restart(resource);
                            }
                        }
                    }
                } else {
                    startStreaming();
                }
            }

            function cleanup() {
                stopStreaming();
                resetInternal();
                kubeState.resetState();
                if (callback) {
                    callback(kubeState.getState());
                }
            }

            function flushMetadataUpdates() {
                if (metadataUpdates.size === 0) {
                    return;
                }

                kubeState.updateHierarchy(metadataUpdates);
                metadataUpdates.clear();
                forceFilterUpdates = true;
            }

            function resetInternal() {
                missingDatapointCounter = {};
                tsIdToMetaObject = {};
                idToMetadataObject = {};
                metadataUpdates.clear();
                tsIdsRequireMetadataUpdates.clear();
                firstDataEmitComplete = false;
            }
        };

        /********* Utilities ********/
        function getMaxTimeRangeToQuery(resolution) {
            return resolution * (MAX_MISSING_DATA_POINTS_TO_DEAD + 2);
        }
    },
];
