/**
* @fileoverview Queue based sender worker for TelegramBot class
* @author Vlad Vnukovskiy <vvlad1973@gmail.com>
*
* @package
*/
/**
* @module sender
*/
'use strict'
//TODO: It's need to refactoring sender, change doing iterations from while
//loop to setInterval initiation
import { parentPort, workerData } from 'worker_threads';
import got from 'got';
import Queue from 'vvlad1973-queue';
import { createFormData } from '../../helpers/utils.js';
import { StatusCodes, TelegramBotApiMethods } from '../enums.js';
const queue = new Queue(workerData.queue.size);
const instance = got.extend({
handlers: [
(options, next) => {
Error.captureStackTrace(options.context);
return next(options);
}
],
hooks: {
beforeError: [
error => {
error.source = error.options.context.stack.split('\n');
return error;
}
]
}
});
let
capacity = workerData.capacity,
error429 = workerData.error429 ?? { descriptionPattern: '.*retry after (\\d+)', maxWaitingTime: 5000, maxResendAttepmts: 10 },
apiUrl = workerData.url,
counter = {
dateTime: 0,
count: 0
};
function doTask(task) {
return new Promise(async (resolve, reject) => {
let
options = {},
result,
response;
try {
if (task.data.multipart) {
let form = createFormData(task.data.payload);
options.headers = form.getHeaders();
options.body = form;
} else {
options.json = task.data.payload;
}
options.responseType = 'json';
try {
result = await instance.post(apiUrl, options);
response = {
result: {
ok: true,
code: StatusCodes.OK
},
response: result?.body,
request: task,
queue_size: queue.size
};
} catch (error) {
if (
error.response?.statusCode === 429 &&
error.response?.body?.description
) {
/* TODO Добавить обработку ошибки 429.
*
* При получении ошибки:
* - проверяем, что количество попыток переотправки не
* достигло максимума
* - если достигло, отправляем в родительский
* поток сообщение с ошибкой и завершаем обработку задачи
* - проверяем, не превышает ли время, обозначенное в ответе,
* максимальный интервал ожидания
* - если не превышает, запускаем таймер на время, указанное в ответе
* с ошибкой
* - по завершению таймаута, ставим сообщение в очередь на
* отправку с наивысшим приоритетом
* - если превышает, отправляем репорт о неудачной отправке с ошибкой
*
* Описание ошибки в response:
* 'Too Many Requests: retry after 6326' - в секундах
* response.response.description
*/
let timeout;
if (error.response?.body?.description)
timeout =
error.response?.body?.description.match(error429.descriptionPattern);
if (timeout && timeout[1] && timeout[1] * 1000 <= error429.maxWaitingTime) {
logWarn(`Pause: ${+timeout[1] * 1000} ms`);
logTrace(`Task: ${JSON.stringify(task, '', '\t')}`);
// setInterval(() => {queue.enqueue(task)}, 1000);
} else {
logError(`Too long waiting time: ${timeout[1]} seconds`);
}
}
response = {
result: {
ok: false,
code: error.response?.statusCode
},
response: error.response?.body,
request: task,
queue_size: queue.size
};
}
resolve(response);
return response;
} catch (error) {
reject(error);
}
});
}
parentPort.on('message', request => {
if (typeof request === 'object' && typeof request.data.payload.method === 'string') {
if (Object.values(TelegramBotApiMethods).includes(request.data.payload.method)) {
let priority;
switch (request.data.payload.method) {
case TelegramBotApiMethods.ANSWER_CALLBACK_QUERY:
priority = 1;
break;
default:
priority = 2;
break;
}
queue.enqueue(request, priority)
.catch(error => {
logError({
result: {
ok: false,
code: StatusCodes.INTERNAL_SERVER_ERROR
},
error: error,
request: request,
queue: queue.contents
})
})
} else {
logError({
result: {
ok: false,
code: StatusCodes.NOT_IMPLEMENTED
},
error: {
message: `Method ${request.data.payload.method} is not implemented`
},
request: request
});
}
} else {
logError({
result: {
ok: false,
code: StatusCodes.BAD_REQUEST
},
error: {
message: 'Invalid format of data'
},
request: request
});
}
});
setInterval(() => {
counter.dateTime = Date.now();
counter.count = 0;
}, capacity.interval);
while (true) {
let
now = Date.now(),
delay = (counter.count === capacity.limit) ? (capacity.interval - (now - counter.dateTime)) : 0;
await (() => {
return new Promise((resolve) => {
setTimeout(() => {
resolve()
}, delay)
})
})();
let task = await queue.dequeue();
if (task) {
doTask(task).then(async result => {
parentPort.postMessage(await result);
}).catch(error => {
logError({
result: {
ok: false,
code: error.code
},
error: error,
request: task
})
});
counter.count++;
}
}
function logTrace(message) {
if (message)
parentPort.postMessage({ trace: message });
}
function logDebug(message) {
if (message)
parentPort.postMessage({ debug: message });
}
function logError(error) {
if (error)
parentPort.postMessage({ error: error });
}
function logWarn(message) {
if (message)
parentPort.postMessage({ warn: message });
}
function logInfo(message) {
if (message)
parentPort.postMessage({ info: message });
}