import _ from 'lodash';
import { SOCKET_SERVER_EVENTS } from 'habitual-analytics/constants/habitual-configs';
import { getDynamicAppConfigs } from 'habitual-analytics/constants/dynamicAppConfigs';
import { io } from 'socket.io-client';
import pRetry from 'p-retry';
import { DummyEmit } from '../dummyEmit';

const tickProcessor = (rawTicks, additionalData, previousStateTicks) => {
  const { marketData = {} } = rawTicks;
  const newMarketData = _.reduce(
    marketData,
    (result, obj, symbol) => {
      result[symbol] = {
        ltp:
          obj?.price ||
          _.get(previousStateTicks, `marketData.${symbol}.ltp`, 0),
        oi:
          obj?.openInterest ||
          _.get(previousStateTicks, `marketData.${symbol}.oi`, 0),
        firstOpen: additionalData[symbol]?.firstOpen || 0,
        prevDayLtp: additionalData[symbol]?.prevDayLtp || 0,
        prevDayOi: additionalData[symbol]?.prevDayOi || 0,
        firstOi: additionalData[symbol]?.firstOi || 0,
      };
      return result;
    },
    {}
  );

  return { ...rawTicks, marketData: newMarketData };
};

export const ticker = () => {
  let genericTicker;

  const onState = (rawTicks) => {
    const ticks = tickProcessor(
      _.omit(rawTicks, 'marketDataExtras'),
      genericTicker.additionalData,
      genericTicker.state
    );
    genericTicker.state = ticks;

    DummyEmit.runWhenRequired(genericTicker);
    genericTicker.onState([ticks]);
  };

  const processTicks = (rawTicks) => {
    const ticks = tickProcessor(
      rawTicks,
      genericTicker.additionalData,
      genericTicker.state
    );
    genericTicker.state = ticks;

    genericTicker.onTick([ticks]);
  };

  // Debounce the processTicks function to wait before emitting the last tick
  const onData = _.debounce(processTicks, 100); // 400ms delay (adjust as needed);

  return {
    init(argGenericTicker) {
      genericTicker = argGenericTicker;
    },

    async start() {
      // To Avoid MaxListenersExceededWarning, as we expect at the max 100 jobs to listen to
      // a single singletonLiveTickerClient.
      // this.setMaxListeners(250);

      return new Promise((resolve, reject) => {
        try {
          genericTicker.socket = io(
            getDynamicAppConfigs().envs.ANALYTICS_TICKER_URI,
            _.merge(
              {
                query: {
                  instruments: genericTicker.instruments.join(','),
                  clientName: 100,
                },
                transports: ['websocket'],
                reconnection: true,
                reconnectionDelay: 5000,
              },
              {
                Authorization: `Bearer ${
                  getDynamicAppConfigs().envs.ANALYTICS_TICKER_AUTH
                }`,
                path: `/${
                  getDynamicAppConfigs().envs.ANALYTICS_TICKER_PATH
                }/socket.io`,
              }
            )
          );

          genericTicker.socket
            .on(SOCKET_SERVER_EVENTS.STATE, onState)
            .on(SOCKET_SERVER_EVENTS.TICKS, onData)
            .on(SOCKET_SERVER_EVENTS.ERROR, (error) => {
              genericTicker.debugLog('[LT] Socket error:', error);
            })
            .once('connect', () => {
              genericTicker.debugLog('[LT] Connected to ticker server.');
              resolve();
            })
            .once('connect_error', (err) => {
              genericTicker.debugLog('[LT] Connect error:', err);
              reject(new Error('connect_error'));
            })
            .on('disconnect', (status) => {
              genericTicker.debugLog(
                `[LT] Socket disconnected. Status: ${status}`
              );
            });

          genericTicker.socket.io
            .on('reconnect_attempt', () => {
              genericTicker.socket.io.opts.query.instruments =
                genericTicker.instruments.join(',');
            })
            .on('reconnect', () => {
              genericTicker.debugLog('[LT] Reconnect successful.');
            })
            .once('connect_timeout', () => {
              genericTicker.debugLog('[LT] Connection timeout.');
              reject(new Error('connect_timeout'));
            });
        } catch (error) {
          reject(error);
        }
      });
    },

    // https://socket.io/docs/v4/client-offline-behavior/ -> Volatile Events
    async subscribe(newInstruments) {
      const subscribeToServer = () => {
        const subscribePromise = new Promise((resolve, reject) => {
          try {
            genericTicker.socket
              .timeout(4 * 1000)
              .volatile.emit(
                'subscribe',
                newInstruments,
                100,
                (
                  err,
                  { message, state: additionalState, additionalData } = {}
                ) => {
                  if (err || message === 'error') {
                    const errMsg = `[LT-${100}] server subscribe failed. ${err} Server Error: ${message}`;
                    return reject(new Error(errMsg));
                  }

                  if (_.isEmpty(_.keys(additionalData))) {
                    return resolve([]);
                  }

                  genericTicker.additionalData = _.merge(
                    genericTicker.additionalData,
                    _.reduce(
                      additionalData,
                      (result, obj, symbol) => {
                        result[symbol] = {
                          firstOpen:
                            Number(obj?.firstOpen) || Number(obj?.prevDayLtp),
                          prevDayLtp: Number(obj?.prevDayLtp) || 0,
                          firstOi:
                            Number(obj?.firstOi) || Number(obj?.prevDayOi),
                          prevDayOi: Number(obj?.prevDayOi) || 0,
                        };
                        return result;
                      },
                      {}
                    )
                  );

                  const ticks = tickProcessor(
                    {
                      marketData: _.merge(
                        _.get(genericTicker.state, 'marketData', {}),
                        additionalState
                      ),
                    },
                    genericTicker.additionalData,
                    genericTicker.state
                  );
                  genericTicker.state = ticks;
                  genericTicker.onState([ticks]);
                  resolve([ticks]);
                }
              );
          } catch (e) {
            genericTicker.debugLog(e, 'Rejected by Error');
            reject(e);
          }
        });
        return subscribePromise;
      };
      return pRetry(subscribeToServer, {
        retries: 5,
        onFailedAttempt: (error) => {
          genericTicker.debugLog(
            `[LT] server subscribe failed attempt: ${error}.`
          );
        },
      });
    },

    async unsubscribe(staleInstruments) {
      return new Promise((resolve, reject) => {
        genericTicker.socket
          .timeout(10000)
          .volatile.emit(
            'unsubscribe',
            staleInstruments,
            100,
            (err, { message } = {}) => {
              if (err || message === 'error') {
                return reject(
                  new Error(`[LT] Unsubscribe failed: ${message || err}`)
                );
              }
              genericTicker.debugLog(
                '[LT] Successfully unsubscribed:',
                staleInstruments
              );
              resolve();
            }
          );
      });
    },

    destroy() {
      if (genericTicker?.socket) {
        genericTicker.socket.removeAllListeners();
        genericTicker.socket.disconnect();
        genericTicker.debugLog('[LT] Socket destroyed.');
      }
    },
  };
};
