import {
  RSocketClient,
  JsonSerializer,
  IdentitySerializer,
  Encodable,
  MAX_STREAM_ID,
  MESSAGE_RSOCKET_ROUTING,
  APPLICATION_JSON,
} from 'rsocket-core';
import { ReactiveSocket } from 'rsocket-types';
import RSocketWebSocketClient from 'rsocket-websocket-client';
import { Observable } from 'rxjs';
import { Websocket } from 'store/Websocket/Websocket.types';
import { SYSTEM_ENV, Environment } from 'src/constants';
import createDebug from 'debug';

const debug = createDebug('App:Websocket');

let client: RSocketClient<Websocket.CommunicationData, Encodable>;
let transport: RSocketWebSocketClient;
let rSocket: ReactiveSocket<unknown, Encodable>;

// TODO: move to utils / webpack
const domainMap: { [key in Environment]: string } = {
  test: 'test-dmp.ruptela.com',
  prep: 'prep-dmp.ruptela.com',
  prod: 'dmp.ruptela.com',
};

const protocol = SYSTEM_ENV === 'prod' ? 'wss' : 'ws';

// const reconnectIntervalMillis = 15000;
const keepAlive = 60000;
const lifetime = 180000;
const dataMimeType = APPLICATION_JSON.string;
const metadataMimeType = MESSAGE_RSOCKET_ROUTING.string;
const MAX_RECONNECT_COUNT = 3;

// Create an instance of a client
const getTransport = () =>
  new RSocketWebSocketClient({
    url: `${protocol}://${domainMap[SYSTEM_ENV]}/dmp-ws`,
    debug: true,
  });

const getClient = () => {
  transport = getTransport();

  // TODO: Retry connect count
  let reconnectCount = 0;
  transport.connectionStatus().subscribe({
    onNext: (status) => {
      debug(`Resumable transport status changed: ${status.kind}`);

      if (status.kind === 'ERROR' && reconnectCount <= MAX_RECONNECT_COUNT) {
        debug('Resumable transport disconnected, retrying...');
        transport.close();
        transport.connect();
        reconnectCount += 1;
      }
    },
    onSubscribe: (subscription) => {
      subscription.request(MAX_STREAM_ID);
    },
  });
  return new RSocketClient({
    serializers: {
      data: JsonSerializer,
      metadata: IdentitySerializer,
    },
    setup: {
      keepAlive,
      lifetime,
      dataMimeType,
      metadataMimeType,
    },
    transport,
  });
};
export const closeConnection = () => {
  debug('Closing socket connection');

  transport?.close();
  rSocket?.close();
  client?.close();
};

// Open the connection
export const createConnection = (data: { clientId: number }) =>
  new Observable<Websocket.Message>((observer) => {
    client = getClient();

    client.connect().subscribe({
      onComplete: (socket) => {
        rSocket = socket;
        socket
          .requestStream({
            data,
            metadata: `${String.fromCharCode('events'.length)}events`,
          })
          .subscribe({
            onComplete: () => observer.complete(),
            onError: (err) => observer.error(err),
            onNext: (val) => observer.next(val as Websocket.Message),
            onSubscribe: (sub) => sub.request(MAX_STREAM_ID),
          });
      },
      onError: (err) => observer.error(err),
    });
  });

export default {
  createConnection,
  closeConnection,
};
