import WebSocket from 'ws';
import { fromEvent, merge, empty, Subscription } from 'rxjs';
import {
  actionStreamFromSocket,
  dataStreamFromSocket,
  pipeStreamIntoSocket,
} from './shared/sockets';
import Joi from 'joi';
import {
  concatMap,
  map,
  finalize,
  ignoreElements,
  endWith,
  tap,
} from 'rxjs/operators';
import { defaultBasicLogger } from './shared';

const url = 'http://localhost:4010/events';

async function start() {
  const logger = defaultBasicLogger();

  const ws = new WebSocket(url);
  const socket = Object.assign(ws, { id: 'test-socket', closingByKit: false });

  process.stdin.setEncoding('utf8');

  const data = fromEvent<string>(process.stdin, 'data');

  const allData = dataStreamFromSocket(socket);

  const actions = actionStreamFromSocket(allData, () => Joi.object());

  const jobs = merge(
    actions.pipe(
      concatMap((action) => {
        logger.log('  -> Received', action);
        return empty();
      })
    )
  );

  const subscriptions = new Subscription();

  subscriptions.add(
    pipeStreamIntoSocket(
      data.pipe(
        map((text) => ({
          type: 'TEST',
          text: text.replace('\n', ''),
        })),
        tap((action) => {
          logger.log('  <- Sending', action);
        })
      ),
      socket
    )
  );

  return jobs
    .pipe(
      finalize(() => {
        subscriptions.unsubscribe();
      }),
      ignoreElements(),
      endWith()
    )
    .toPromise();
}

start().catch((err) => {
  defaultBasicLogger().error('', err);
});
