import { PayloadAction } from '@reduxjs/toolkit';
import axios from 'axios';
import { all, call, put, select, take } from 'redux-saga/effects';

import { getBaseURL } from 'common/api/config';
import { IngestedDataAction } from 'common/features/ingested-data/actions';
import { subscribeToDataStream } from 'common/features/ingested-data/helpers';
import {
  addIngestedData,
  setIngestedData,
  setIngestedDataColumns,
  setIngestedDataFooter,
  setIngestedDataStatus,
} from 'common/features/ingested-data/store';
import {
  IngestedData,
  IngestedDataColumn,
  IngestedDataFooter,
} from 'common/features/ingested-data/types';
import { selectApplicationContext } from 'common/stores/globalSlice';
import { setHardFetchNewData } from 'common/stores/ingestedDataFooterSlice';
import { PowerToolApplicationContext } from 'common/types/types';

export function* fetchIngestedDataHandler(
  action: PayloadAction<IngestedDataAction>
) {
  const application: PowerToolApplicationContext = yield select(
    selectApplicationContext
  );

  // for now, TAT does not stream data
  // just a REST API call with JSON response
  if (application === 'train_analysis') {
    yield all([
      put(setIngestedData([])),
      put(setIngestedDataStatus('fetching')),
    ]);

    const allData = yield call(getData, action);

    // collect all data
    const { rows, columns, footer } = allData;

    // then dispatch it's pieces all at once
    yield all([
      put(setIngestedDataColumns(columns)),
      put(setIngestedData(rows)),
      put(setIngestedDataFooter(footer)),
      put(setIngestedDataStatus('complete')),
      put(setHardFetchNewData(false)),
    ]);
  }

  // otherwise we stream the data back
  else {
    yield streamData(action);
  }
}

const getData = (action: PayloadAction<IngestedDataAction>) => {
  const { url, hardFetch } = action.payload;

  return axios
    .get<any[]>(hardFetch ? `${url}&disableCacheHit=true` : url)
    .then((response) => {
      const columns: IngestedDataColumn[] = [];
      const rows: IngestedData[] = [];
      let footer: IngestedDataFooter = {};

      response.data.forEach((row) => {
        if (row?.name !== undefined) {
          columns.push(row);
        } else if (row?.OBJID !== undefined) {
          rows.push(row);
        } else if (row?.columns !== undefined) {
          footer = row;
        }
      });

      return { columns, rows, footer };
    });
};

function* streamData(action: PayloadAction<IngestedDataAction>) {
  const { url, hardFetch } = action.payload;

  // we handle disable cache hits here to prevent circular effects
  const eventSource = new EventSource(
    `${getBaseURL()}/` + (hardFetch ? `${url}&disableCacheHit=true` : url)
  );

  // reset data before we fetch, set status
  yield all([put(setIngestedData([])), put(setIngestedDataStatus('fetching'))]);

  // establish SSE channel
  const dataChannel = yield call(subscribeToDataStream, eventSource);

  // keep track of message counts
  let messageIndex = 0;

  // we throttle down the dispatches if we
  // are getting lots of messages
  let data: IngestedData[] = [];
  let throttled: boolean = false;

  // while channel is open
  let streaming: boolean = true;
  while (streaming) {
    // wait for event
    const event = yield take(dataChannel);

    // if error occurred, break out of the streaming loop
    if (event?.error) {
      yield put(setIngestedDataStatus('complete'));
      streaming = false;
    }

    // if first, we know its columns
    else if (messageIndex === 0) {
      yield put(setIngestedDataColumns(event));
    }

    // if contains 'columns' we know its footer, so last message
    // confusing that it is named columns - but footer message sends us
    // data about which columns contain only null values
    // we also reset the 'hard fetch' state when we are done streaming a message
    else if (event.columns) {
      yield all([
        put(setIngestedDataStatus('complete')),
        put(setIngestedDataFooter(event.columns)),
        put(setHardFetchNewData(false)),
      ]);
      streaming = false;
    }

    // otherwise its a 'normal' data message, so just add the data
    else {
      // if we aren't throttling down
      // add the new data to the grid
      if (!throttled) {
        yield all([
          put(setIngestedDataStatus('streaming')),
          put(addIngestedData(event)),
        ]);
      }

      // otherwise we are throttled
      // aggregate data locally
      else {
        data = [...data, ...event];
      }
    }

    // our throttle threshold is at 5 messages
    // implies 5000 records since each message
    // holds 1000 rows and the first row is columns
    if (messageIndex > 5) {
      throttled = true;
    }

    messageIndex++;
  }

  // if we throttled down
  // dispatch the final chunk of data
  if (throttled) {
    yield put(addIngestedData(data));
  }
}
