import { ActionsObservable, Epic, ofType } from 'redux-observable';
import { interval as observableInterval } from 'rxjs';
import { filter, map, switchMap, take, tap, throttle } from 'rxjs/operators';

import { Actions, ScopeTypes } from 'uf-ws/WebsocketActions';
import { SET_ACTIVE_PROJECT, SetActiveProjectAction } from 'uf/app/ActionTypes';
import { getActiveProjectId } from 'uf/app/selectors';
import { addAppMessage } from 'uf/appservices/actions';
import { combineEpics } from 'uf/base/epics';
import { rollbarError } from 'uf/rollbar';
import { UFState } from 'uf/state';
import { getUser } from 'uf/user/selectors/user';
import {
  subscribeAll,
  subscribeProject,
  unsubscribeFromProject,
  websocketsReady,
} from 'uf/websockets/actions';

import {
  WebsocketMessageAction,
  WEBSOCKETS_INITIALIZED,
  WEBSOCKETS_RECEIVE_ERROR,
  WEBSOCKETS_SENT_MESSAGE,
  WebsocketsInitializedAction,
} from './actionTypes';

export function websocketError(
  action$: ActionsObservable<WebsocketMessageAction>,
) {
  return action$.pipe(
    ofType(WEBSOCKETS_RECEIVE_ERROR),
    tap(action => {
      rollbarError(action.exception || action.message, {
        env: __CLIENT__ ? 'browser' : 'server',
        source: 'websockets',
      });
    }),
    // only tell the user every 60 seconds
    throttle(() => observableInterval(60 * 1000)),
    map(() =>
      addAppMessage(
        'We are currently experiencing a degradation in service. Please be patient while we work on the problem!',
        { level: 'danger' },
      ),
    ),
  );
}
export const subscribeOnWebsocketsInitialized: Epic<
  WebsocketsInitializedAction,
  any,
  UFState
> = action$ => {
  return action$.pipe(
    ofType(WEBSOCKETS_INITIALIZED),
    filter(() => __CLIENT__ || __TESTING__),
    map(() => subscribeAll()),
  );
};

/**
 *
 *   An epic that will emit WEBSOCKETS_READY after websockets setup happens.  We wait for websockets
 *   to get initialized, and then listen for user subscriptions to project, user and session
 *   channels.  We don't care which specific channels in each scope,
 */
export const websocketsReadyAfterSubscribe: Epic<
  WebsocketsInitializedAction,
  any, // TODO: this should be WebsocketsReadyAction,
  UFState
> = action$ => {
  return action$.pipe(
    ofType(WEBSOCKETS_INITIALIZED),
    filter(() => __CLIENT__ || __TESTING__),
    switchMap(() => {
      const channels = [];

      const subscribeAction$ = (
        action$ as ActionsObservable<WebsocketMessageAction>
      )
        .ofType(WEBSOCKETS_SENT_MESSAGE)
        .pipe(filter(isWebsocketsSubscribeMessage));

      return subscribeAction$.pipe(
        filter(action => {
          const channel = action.message.scope_type;

          if (!channels.includes(channel)) {
            channels.push(channel);
          }

          return subscribedToAllChannels(channels);
        }),

        take(1),
        map(() => websocketsReady()),
      );
    }),
  );
};

function isWebsocketsSubscribeMessage(action: WebsocketMessageAction): boolean {
  return action.message.action === Actions.SUBSCRIBE;
}

function subscribedToAllChannels(channels: string[]): boolean {
  return (
    channels.includes(ScopeTypes.PROJECT) &&
    channels.includes(ScopeTypes.SESSION) &&
    channels.includes(ScopeTypes.USER)
  );
}

export const subscribeOnProjectSwitch: Epic<
  SetActiveProjectAction,
  any,
  UFState
> = (action$, state$) => {
  let previousProjectId = getActiveProjectId(state$.value);
  return action$.pipe(
    ofType(SET_ACTIVE_PROJECT),
    filter(() => __CLIENT__ || __TESTING__),
    filter(({ value: projectId }) => {
      if (projectId === previousProjectId) {
        return false;
      }
      previousProjectId = projectId;

      if (!projectId) {
        return false;
      }
      return true;
    }),
    map(({ value: projectId }) =>
      subscribeProject(projectId, getUser(state$.value)),
    ),
  );
};

/**
 * an epic to send a websocket message to unsubscribe from a redis project channel
 */
export const unsubscribeFromPreviousProject: Epic<SetActiveProjectAction, any> =
  (action$, state$) => {
    let previousProjectId = getActiveProjectId(state$.value);
    return action$.pipe(
      ofType(SET_ACTIVE_PROJECT),
      filter(() => __CLIENT__ || __TESTING__),
      filter(({ value: projectId }) => {
        if (projectId === previousProjectId) {
          return false;
        }
        // sometimes the projectId can get set to 'null'.  i.e. the active project gets deleted.
        if (!previousProjectId) {
          previousProjectId = projectId;
          return false;
        }
        return true;
      }),
      map(({ value: projectId }) => {
        const action = unsubscribeFromProject(previousProjectId);
        previousProjectId = projectId;
        return action;
      }),
    );
  };

export default combineEpics(
  {
    websocketError,
    websocketsReadyAfterSubscribe,
    subscribeOnWebsocketsInitialized,
    subscribeOnProjectSwitch,
    unsubscribeFromPreviousProject,
  },
  'websockets',
);
