Source: base_telegram_bot/sender.js

/**
 * @fileoverview Queue based sender worker for TelegramBot class
 * @author Vlad Vnukovskiy <vvlad1973@gmail.com>
 * 
 * @package
 */

/**
 * @module sender
 */
'use strict'

//TODO: It's need to refactoring sender, change doing iterations from while 
//loop to setInterval initiation

import { parentPort, workerData } from 'worker_threads';
import got from 'got';

import Queue from 'vvlad1973-queue';

import { createFormData } from '../../helpers/utils.js';
import { StatusCodes, TelegramBotApiMethods } from '../enums.js';

const queue = new Queue(workerData.queue.size);

const instance = got.extend({
  handlers: [
    (options, next) => {
      Error.captureStackTrace(options.context);
      return next(options);
    }
  ],
  hooks: {
    beforeError: [
      error => {
        error.source = error.options.context.stack.split('\n');
        return error;
      }
    ]
  }
});

let
  capacity = workerData.capacity,
  error429 = workerData.error429 ?? { descriptionPattern: '.*retry after (\\d+)', maxWaitingTime: 5000, maxResendAttepmts: 10 },
  apiUrl = workerData.url,
  counter = {
    dateTime: 0,
    count: 0
  };


function doTask(task) {
  return new Promise(async (resolve, reject) => {
    let
      options = {},
      result,
      response;

    try {

      if (task.data.multipart) {

        let form = createFormData(task.data.payload);

        options.headers = form.getHeaders();
        options.body = form;

      } else {

        options.json = task.data.payload;
      }

      options.responseType = 'json';

      try {

        result = await instance.post(apiUrl, options);

        response = {
          result: {
            ok: true,
            code: StatusCodes.OK
          },
          response: result?.body,
          request: task,
          queue_size: queue.size
        };
      } catch (error) {
        if (
          error.response?.statusCode === 429 &&
          error.response?.body?.description
        ) {
          /* TODO Добавить обработку ошибки 429. 
           * 
           * При получении ошибки:
           * - проверяем, что количество попыток переотправки не 
           *   достигло максимума
           *     - если достигло, отправляем в родительский
           *       поток сообщение с ошибкой и завершаем обработку задачи
           * - проверяем, не превышает ли время, обозначенное в ответе, 
           *   максимальный интервал ожидания
           * - если не превышает, запускаем таймер на время, указанное в ответе 
           *   с ошибкой
           *     - по завершению таймаута, ставим сообщение в очередь на 
           *       отправку с наивысшим приоритетом
           * - если превышает, отправляем репорт о неудачной отправке с ошибкой
           *
           * Описание ошибки в response:
           * 'Too Many Requests: retry after 6326' - в секундах
           *  response.response.description
           */

          let timeout;

          if (error.response?.body?.description)
            timeout =
              error.response?.body?.description.match(error429.descriptionPattern);

          if (timeout && timeout[1] && timeout[1] * 1000 <= error429.maxWaitingTime) {
            logWarn(`Pause: ${+timeout[1] * 1000} ms`);
            logTrace(`Task: ${JSON.stringify(task, '', '\t')}`);

            // setInterval(() => {queue.enqueue(task)}, 1000);

          } else {
            logError(`Too long waiting time: ${timeout[1]} seconds`);
          }
        }
        response = {
          result: {
            ok: false,
            code: error.response?.statusCode
          },
          response: error.response?.body,
          request: task,
          queue_size: queue.size
        };
      }
      resolve(response);
      return response;
    } catch (error) {
      reject(error);
    }
  });
}


parentPort.on('message', request => {
  if (typeof request === 'object' && typeof request.data.payload.method === 'string') {

    if (Object.values(TelegramBotApiMethods).includes(request.data.payload.method)) {
      let priority;

      switch (request.data.payload.method) {
        case TelegramBotApiMethods.ANSWER_CALLBACK_QUERY:
          priority = 1;
          break;

        default:
          priority = 2;
          break;
      }

      queue.enqueue(request, priority)
        .catch(error => {
          logError({
            result: {
              ok: false,
              code: StatusCodes.INTERNAL_SERVER_ERROR
            },
            error: error,
            request: request,
            queue: queue.contents
          })
        })

    } else {

      logError({
        result: {
          ok: false,
          code: StatusCodes.NOT_IMPLEMENTED
        },
        error: {
          message: `Method ${request.data.payload.method} is not implemented`
        },
        request: request
      });
    }
  } else {

    logError({
      result: {
        ok: false,
        code: StatusCodes.BAD_REQUEST
      },
      error: {
        message: 'Invalid format of data'
      },
      request: request
    });
  }
});


setInterval(() => {
  counter.dateTime = Date.now();
  counter.count = 0;
}, capacity.interval);

while (true) {
  let
    now = Date.now(),
    delay = (counter.count === capacity.limit) ? (capacity.interval - (now - counter.dateTime)) : 0;

  await (() => {
    return new Promise((resolve) => {
      setTimeout(() => {
        resolve()
      }, delay)
    })
  })();

  let task = await queue.dequeue();

  if (task) {
    doTask(task).then(async result => {
      parentPort.postMessage(await result);
    }).catch(error => {

      logError({

        result: {
          ok: false,
          code: error.code
        },
        error: error,
        request: task
      })
    });
    counter.count++;
  }
}


function logTrace(message) {

  if (message)
    parentPort.postMessage({ trace: message });
}


function logDebug(message) {

  if (message)
    parentPort.postMessage({ debug: message });
}


function logError(error) {

  if (error)
    parentPort.postMessage({ error: error });
}


function logWarn(message) {

  if (message)
    parentPort.postMessage({ warn: message });
}


function logInfo(message) {

  if (message)
    parentPort.postMessage({ info: message });
}