UNPKG

10.7 kBJavaScriptView Raw
1'use strict'
2
3const TelegramApi = require('./api/TelegramApi')
4const TelegramRouter = require('./routing/TelegramRouter')
5const ConsoleLogger = require('./logger/ConsoleLogger')
6const TelegramDataSource = require('./TelegramDataSource')
7const UpdateProcessorsManager = require('./updateProcessors/UpdateProcessorsManager')
8const InMemoryStorage = require('./storage/session/InMemoryStorage')
9const TelegramSessionStorage = require('./storage/session/TelegramSessionStorage')
10const TelegramBaseController = require('./mvc/TelegramBaseController')
11const TelegramBaseCallbackQueryController = require('./mvc/TelegramBaseCallbackQueryController')
12const TelegramBaseInlineQueryController = require('./mvc/TelegramBaseInlineQueryController')
13const Models = require('./models/Models')
14const Update = require('./models/Update')
15const Ivan = require('./localization/Ivan')
16const Scope = require('./mvc/Scope')
17const InputFile = require('./api/InputFile')
18const InlineScope = require('./mvc/InlineScope')
19const BaseStorage = require('./storage/BaseStorage')
20const BaseLogger = require('./logger/BaseLogger')
21const BaseScopeExtension = require('./mvc/BaseScopeExtension')
22const BaseUpdateProcessor = require('./updateProcessors/BaseUpdateProcessor')
23const BaseUpdateFetcher = require('./updateFetchers/BaseUpdateFetcher')
24
25const cluster = require('cluster')
26const os = require('os')
27const SharedStorage = require('./storage/sharedStorage/SharedStorage')
28const TelegramIPC = require('./ipc/TelegramIPC')
29
30const WebAdmin = require('./webAdmin/server/WebAdmin')
31
32const WebhookUpdateFetcher = require('./updateFetchers/WebhookUpdateFetcher')
33const LongPoolingUpdateFetcher = require('./updateFetchers/LongPoolingUpdateFetcher')
34
35const WebAdminLogger = require('./logger/WebAdminLogger')
36const Statistics = require('./statistics/Statistics')
37
38const BaseCommand = require('./routing/commands/BaseCommand')
39const TextCommand = require('./routing/commands/TextCommand')
40const RegexpCommand = require('./routing/commands/RegexpCommand')
41
42class Telegram {
43 /**
44 *
45 * @param {string} token
46 * @param {{
47 * logger: BaseLogger,
48 * storage: BaseStorage,
49 * localization: Object[],
50 * workers: number,
51 * webhook: {url: string, port: number, host: string }
52 * updateFetcher: BaseUpdateFetcher
53 * webAdmin: {port: number, host: string}
54 * }} options
55 */
56 constructor(token, options) {
57 options = options || {}
58
59 this._token = token
60 this._logger = options.logger || new WebAdminLogger()
61 this._storage = options.storage || new InMemoryStorage()
62 this._sharedStorage = new SharedStorage(this._storage)
63 this._localization = new Ivan(this._sharedStorage, (options.localization || []))
64 this._webAdminPort = options.webAdmin ? options.webAdmin.port : 7777
65 this._webAdminHost = options.webAdmin ? options.webAdmin.host : 'localhost'
66
67 this._cpus = os.cpus()
68 this._workersCount = options.workers || this._cpus.length
69
70 this._ipc = new TelegramIPC()
71
72 this._telegramDataSource = new TelegramDataSource(
73 new TelegramApi(token, this._logger),
74 new TelegramRouter(),
75 this._logger,
76 new TelegramSessionStorage(this._sharedStorage),
77 this._localization,
78 this._ipc
79 )
80
81 this._beforeUpdateFunction = null
82
83 this._checkNodeVersion()
84
85 this._updatesFetcher = null
86
87 if (options.updateFetcher)
88 this._updatesFetcher = options.updateFetcher
89 else if (options.webhook) {
90 this._updatesFetcher = new WebhookUpdateFetcher(
91 this._telegramDataSource.api,
92 this._logger,
93 options.webhook.url,
94 options.webhook.host,
95 options.webhook.port,
96 token
97 )
98 }
99 else {
100 this._updatesFetcher = new LongPoolingUpdateFetcher(
101 this._telegramDataSource.api,
102 this._logger
103 )
104 }
105
106 this._setup()
107 }
108
109 _checkNodeVersion() {
110 if (process.version.replace('v', '').split('.')[0] < 6) {
111 this._logger.error({
112 'Fatal error': 'Node version must be 6 or greater, please update your Node.js'
113 })
114
115 process.exit()
116 }
117 }
118
119 _setup() {
120 if (cluster.isMaster)
121 this._master()
122
123 if (cluster.isWorker)
124 this._worker()
125 }
126
127 _master() {
128 this._logger.log({
129 'Telegram': `Master started, ${this._cpus.length} CPUs found, ${this._workersCount} workers will start`
130 })
131
132 this._waitingUpdates = {} // each worker can ask master to send him next update from specific chat
133 this._waitingCallbacks = {}
134 this._workers = {}
135 this.statistics = new Statistics()
136
137 new WebAdmin(
138 this._webAdminHost,
139 this._webAdminPort,
140 __dirname + '/webAdmin/client',
141 this._logger,
142 this
143 )
144
145 this._runWorkers()
146
147 this._updatesFetcher.fetch(updates => {
148 this._processUpdates(updates)
149 })
150 }
151
152 _worker() {
153 this._updateProcessor = new UpdateProcessorsManager(this._telegramDataSource)
154
155 process.on('message', msg => {
156 if (msg.type == 'update') {
157 this._processUpdates([Update.deserialize(msg.update)])
158 return
159 }
160
161 this._sharedStorage.handleMessageFromMaster(msg)
162 })
163 }
164
165 _fork() {
166 return cluster.fork()
167 }
168
169 restartWorkers() {
170 this._logger.log({ 'Telegram': 'restarting workers' })
171
172 for (const pid in this._workers) {
173 if (this._workers[pid])
174 this._workers[pid].kill()
175 }
176 }
177
178 /**
179 * This callback will be called from master process
180 *
181 * @param {Function} callback
182 */
183 onMaster(callback) {
184 if (cluster.isMaster)
185 callback()
186 }
187
188 _runWorkers() {
189 for(var i = 0; i < this._workersCount; i++) {
190 this._runWorker()
191 }
192
193 cluster.on('online', w => this._logger.log({ 'Telegram': `Worker started at ${w.process.pid} PID`}))
194
195 cluster.on('exit', (worker, code, signal) => {
196 this._workers[worker.process.pid] = null
197
198 this._logger.log({
199 'Telegram': `Worker ${worker.process.pid} died with code: ${code}, and signal: ${signal}, Starting a new worker`
200 })
201 this._runWorker()
202 this.statistics.workerDied(worker.process.pid)
203 })
204 }
205
206 _runWorker() {
207 let worker = this._fork()
208 this._workers[worker.process.pid] = worker
209 this.statistics.addWorker(worker.process.pid)
210
211 let self = this
212
213 worker.on('message', function(msg) {
214 if (msg.type == 'waitForUpdate') {
215 self._waitingUpdates[msg.chatId] = worker
216 return
217 }
218
219 if (msg.type == 'waitForCallbackQuery') {
220 self._waitingCallbacks[msg.data] = worker
221 }
222
223 self._sharedStorage.handleMessageFromWorkers(msg, this)
224 })
225 }
226
227 /**
228 * Pass child of BaseScopeExtension or array of children to use that extensions
229 *
230 * @param {BaseScopeExtension|BaseScopeExtension[]} extension
231 */
232 addScopeExtension(extension) {
233 this._telegramDataSource.addScopeExtension(extension)
234 }
235
236 /**
237 * @param {Update} update
238 */
239 emulateUpdate(update) {
240 this._updateProcessor.process(update)
241 }
242
243 /**
244 *
245 * @returns {TelegramApi}
246 */
247 get api() {
248 return this._telegramDataSource.api
249 }
250
251 /**
252 *
253 * @returns {TelegramRouter}
254 */
255 get router() {
256 return this._telegramDataSource.router
257 }
258
259 /**
260 *
261 * @returns {BaseLogger}
262 */
263 get logger() {
264 return this._telegramDataSource.logger
265 }
266
267 /**
268 *
269 * @returns {TelegramSessionStorage}
270 */
271 get sessionStorage() {
272 return this._telegramDataSource.sessionStorage
273 }
274
275 /**
276 * @callback continueCallback
277 * @param {boolean} handle
278 */
279
280 /**
281 * @callback beforeHandler
282 * @param {Update} update
283 * @param {continueCallback} callback
284 */
285
286 /**
287 * Your handler function passed to this method will be called after getting
288 * any update, but before it's processing.
289 *
290 * Also to your function will be passed callback function,
291 * if you call that function with 'true' argument, then update handling will be continued,
292 * else the update will not be handled.
293 *
294 * @param {beforeHandler} handler
295 */
296 before(handler) {
297 this._beforeUpdateFunction = handler
298 }
299
300 /**
301 * @param {Update[]} updates
302 * @private
303 */
304 _processUpdates(updates) {
305 if (cluster.isMaster) {
306 updates.forEach(u => {
307 let worker
308
309
310 if (u.message && this._waitingUpdates[u.message.chat.id] != null)
311 worker = this._waitingUpdates[u.message.chat.id]
312 else if (u.callbackQuery && this._waitingCallbacks[u.callbackQuery.data] != null)
313 worker = this._waitingCallbacks[u.callbackQuery.data]
314 else
315 worker = this._pickRandomWorker() //pick random worker for update
316
317 this.statistics.registrateRequest(worker.process.pid)
318 worker.send({ type: 'update', update: u.serialize() })
319
320 if (u.message)
321 this._waitingUpdates[u.message.chat.id] = null
322
323 })
324
325 return
326 }
327
328 updates.forEach(update => {
329 if (!this._beforeUpdateFunction) {
330 this._updateProcessor.process(update)
331 return
332 }
333
334 this._beforeUpdateFunction(update, handle => {
335 if (handle === true) {
336 this._updateProcessor.process(update)
337 }
338 })
339 })
340 }
341
342 _pickRandomWorker() {
343 const pids = Object.keys(this._workers).filter(pid => this._workers[pid] != null)
344 return this._workers[pids[Math.floor(Math.random() * pids.length)]]
345 }
346}
347
348module.exports = {
349 TelegramApi,
350 Telegram,
351 TelegramBaseController,
352 TelegramBaseCallbackQueryController,
353 TelegramBaseInlineQueryController,
354 Scope,
355 BaseLogger,
356 BaseScopeExtension,
357 InputFile,
358 InlineScope,
359 BaseStorage,
360 BaseUpdateFetcher,
361 BaseCommand,
362 TextCommand,
363 RegexpCommand,
364 Models
365}