UNPKG

10.8 kBJavaScriptView Raw
1'use strict'
2
3const TelegramApi = require('./api/TelegramApi')
4const TelegramRouter = require('./routing/TelegramRouter')
5const ConsoleLogger = require('./logger/ConsoleLogger')
6const TelegramDataSource = require('./TelegramDataSource')
7const UpdateProcessorsManager = require('./updateProcessors/UpdateProcessorsManager')
8const InMemoryStorage = require('./storage/session/InMemoryStorage')
9const TelegramSessionStorage = require('./storage/session/TelegramSessionStorage')
10const TelegramBaseController = require('./mvc/TelegramBaseController')
11const TelegramBaseCallbackQueryController = require('./mvc/TelegramBaseCallbackQueryController')
12const TelegramBaseInlineQueryController = require('./mvc/TelegramBaseInlineQueryController')
13const Models = require('./models/Models')
14const Update = require('./models/Update')
15const Ivan = require('./localization/Ivan')
16const Scope = require('./mvc/Scope')
17const InputFile = require('./api/InputFile')
18const InlineScope = require('./mvc/InlineScope')
19const BaseStorage = require('./storage/BaseStorage')
20const BaseLogger = require('./logger/BaseLogger')
21const BaseScopeExtension = require('./mvc/BaseScopeExtension')
22const BaseUpdateProcessor = require('./updateProcessors/BaseUpdateProcessor')
23const BaseUpdateFetcher = require('./updateFetchers/BaseUpdateFetcher')
24
25const cluster = require('cluster')
26const os = require('os')
27const SharedStorage = require('./storage/sharedStorage/SharedStorage')
28const TelegramIPC = require('./ipc/TelegramIPC')
29
30const WebAdmin = require('./webAdmin/server/WebAdmin')
31
32const WebhookUpdateFetcher = require('./updateFetchers/WebhookUpdateFetcher')
33const LongPoolingUpdateFetcher = require('./updateFetchers/LongPoolingUpdateFetcher')
34
35const WebAdminLogger = require('./logger/WebAdminLogger')
36const Statistics = require('./statistics/Statistics')
37
38const BaseCommand = require('./routing/commands/BaseCommand')
39const TextCommand = require('./routing/commands/TextCommand')
40const RegexpCommand = require('./routing/commands/RegexpCommand')
41const CustomFilterCommand = require('./routing/commands/CustomFilterCommand')
42
43class Telegram {
44 /**
45 *
46 * @param {string} token
47 * @param {{
48 * logger: BaseLogger,
49 * storage: BaseStorage,
50 * localization: Object[],
51 * workers: number,
52 * webhook: {url: string, port: number, host: string }
53 * updateFetcher: BaseUpdateFetcher
54 * webAdmin: {port: number, host: string}
55 * }} options
56 */
57 constructor(token, options) {
58 options = options || {}
59
60 this._token = token
61 this._logger = options.logger || new WebAdminLogger()
62 this._storage = options.storage || new InMemoryStorage()
63 this._sharedStorage = new SharedStorage(this._storage)
64 this._localization = new Ivan(this._sharedStorage, (options.localization || []))
65 this._webAdminPort = options.webAdmin ? options.webAdmin.port : 7777
66 this._webAdminHost = options.webAdmin ? options.webAdmin.host : 'localhost'
67
68 this._cpus = os.cpus()
69 this._workersCount = options.workers || this._cpus.length
70
71 this._ipc = new TelegramIPC()
72
73 this._telegramDataSource = new TelegramDataSource(
74 new TelegramApi(token, this._logger),
75 new TelegramRouter(),
76 this._logger,
77 new TelegramSessionStorage(this._sharedStorage),
78 this._localization,
79 this._ipc
80 )
81
82 this._beforeUpdateFunction = null
83
84 this._checkNodeVersion()
85
86 this._updatesFetcher = null
87
88 if (options.updateFetcher)
89 this._updatesFetcher = options.updateFetcher
90 else if (options.webhook) {
91 this._updatesFetcher = new WebhookUpdateFetcher(
92 this._telegramDataSource.api,
93 this._logger,
94 options.webhook.url,
95 options.webhook.host,
96 options.webhook.port,
97 token
98 )
99 }
100 else {
101 this._updatesFetcher = new LongPoolingUpdateFetcher(
102 this._telegramDataSource.api,
103 this._logger
104 )
105 }
106
107 this._setup()
108 }
109
110 _checkNodeVersion() {
111 if (process.version.replace('v', '').split('.')[0] < 6) {
112 this._logger.error({
113 'Fatal error': 'Node version must be 6 or greater, please update your Node.js'
114 })
115
116 process.exit()
117 }
118 }
119
120 _setup() {
121 if (cluster.isMaster)
122 this._master()
123
124 if (cluster.isWorker)
125 this._worker()
126 }
127
128 _master() {
129 this._logger.log({
130 'Telegram': `Master started, ${this._cpus.length} CPUs found, ${this._workersCount} workers will start`
131 })
132
133 this._waitingUpdates = {} // each worker can ask master to send him next update from specific chat
134 this._workers = {}
135 this.statistics = new Statistics()
136
137 new WebAdmin(
138 this._webAdminHost,
139 this._webAdminPort,
140 __dirname + '/webAdmin/client',
141 this._logger,
142 this
143 )
144
145 this._runWorkers()
146
147 this._updatesFetcher.fetch(updates => {
148 this._processUpdates(updates)
149 })
150 }
151
152 _worker() {
153 this._updateProcessor = new UpdateProcessorsManager(this._telegramDataSource)
154
155 process.on('message', msg => {
156
157 if(msg.type == 'update' && msg.update.callback_query){
158 msg.update.callback_query.message.text = msg.update.callback_query.data
159 msg.update.callback_query.message.from = msg.update.callback_query.from
160 }
161
162 if (msg.type == 'update') {
163
164 this._processUpdates([Update.deserialize(msg.update)])
165 return
166 }
167
168 this._sharedStorage.handleMessageFromMaster(msg)
169 })
170 }
171
172 _fork() {
173 return cluster.fork()
174 }
175
176 restartWorkers() {
177 this._logger.log({ 'Telegram': 'restarting workers' })
178
179 for (const pid in this._workers) {
180 if (this._workers[pid])
181 this._workers[pid].kill()
182 }
183 }
184
185 /**
186 * This callback will be called from master process
187 *
188 * @param {Function} callback
189 */
190 onMaster(callback) {
191 if (cluster.isMaster)
192 callback()
193 }
194
195 _runWorkers() {
196 for(var i = 0; i < this._workersCount; i++) {
197 this._runWorker()
198 }
199
200 cluster.on('online', w => this._logger.log({ 'Telegram': `Worker started at ${w.process.pid} PID`}))
201
202 cluster.on('exit', (worker, code, signal) => {
203 this._workers[worker.process.pid] = null
204
205 this._logger.log({
206 'Telegram': `Worker ${worker.process.pid} died with code: ${code}, and signal: ${signal}, Starting a new worker`
207 })
208 this._runWorker()
209 this.statistics.workerDied(worker.process.pid)
210 })
211 }
212
213 _runWorker() {
214 let worker = this._fork()
215 this._workers[worker.process.pid] = worker
216 this.statistics.addWorker(worker.process.pid)
217
218 let self = this
219
220 worker.on('message', function(msg) {
221 if (msg.type == 'waitForUpdate') {
222 self._waitingUpdates[msg.chatId] = worker
223 return
224 }
225
226 self._sharedStorage.handleMessageFromWorkers(msg, this)
227 })
228 }
229
230 /**
231 * Pass child of BaseScopeExtension or array of children to use that extensions
232 *
233 * @param {BaseScopeExtension|BaseScopeExtension[]} extension
234 */
235 addScopeExtension(extension) {
236 this._telegramDataSource.addScopeExtension(extension)
237 }
238
239 /**
240 * @param {Update} update
241 */
242 emulateUpdate(update) {
243 this._updateProcessor.process(update)
244 }
245
246 /**
247 *
248 * @returns {TelegramApi}
249 */
250 get api() {
251 return this._telegramDataSource.api
252 }
253
254 /**
255 *
256 * @returns {TelegramRouter}
257 */
258 get router() {
259 return this._telegramDataSource.router
260 }
261
262 /**
263 *
264 * @returns {BaseLogger}
265 */
266 get logger() {
267 return this._telegramDataSource.logger
268 }
269
270 /**
271 *
272 * @returns {TelegramSessionStorage}
273 */
274 get sessionStorage() {
275 return this._telegramDataSource.sessionStorage
276 }
277
278 /**
279 * @callback continueCallback
280 * @param {boolean} handle
281 */
282
283 /**
284 * @callback beforeHandler
285 * @param {Update} update
286 * @param {continueCallback} callback
287 */
288
289 /**
290 * Your handler function passed to this method will be called after getting
291 * any update, but before it's processing.
292 *
293 * Also to your function will be passed callback function,
294 * if you call that function with 'true' argument, then update handling will be continued,
295 * else the update will not be handled.
296 *
297 * @param {beforeHandler} handler
298 */
299 before(handler) {
300 this._beforeUpdateFunction = handler
301 }
302
303 /**
304 * @param {Update[]} updates
305 * @private
306 */
307 _processUpdates(updates) {
308 if (cluster.isMaster) {
309 updates.forEach(u => {
310 let worker
311
312 if ((u.message && this._waitingUpdates[u.message.chat.id] != null) || (u.callbackQuery && this._waitingUpdates[u.callbackQuery.message.chat.id] != null))
313 worker = u.callbackQuery ? this._waitingUpdates[u.callbackQuery.message.chat.id] : this._waitingUpdates[u.message.chat.id]
314 else
315 worker = this._pickRandomWorker() //pick random worker for update
316
317 this.statistics.registrateRequest(worker.process.pid)
318 worker.send({ type: 'update', update: u.serialize() })
319
320 if (u.message)
321 this._waitingUpdates[u.message.chat.id] = null
322 if (u.callbackQuery)
323 this._waitingUpdates[u.callbackQuery.message.chat.id] = null
324 })
325
326 return
327 }
328
329 updates.forEach(update => {
330 if (!this._beforeUpdateFunction) {
331 this._updateProcessor.process(update)
332 return
333 }
334
335 this._beforeUpdateFunction(update, handle => {
336 if (handle === true) {
337 this._updateProcessor.process(update)
338 }
339 })
340 })
341 }
342
343 _pickRandomWorker() {
344 const pids = Object.keys(this._workers).filter(pid => this._workers[pid] != null)
345 return this._workers[pids[Math.floor(Math.random() * pids.length)]]
346 }
347}
348
349module.exports = {
350 TelegramApi,
351 Telegram,
352 TelegramBaseController,
353 TelegramBaseCallbackQueryController,
354 TelegramBaseInlineQueryController,
355 Scope,
356 BaseLogger,
357 BaseScopeExtension,
358 InputFile,
359 InlineScope,
360 BaseStorage,
361 BaseUpdateFetcher,
362 BaseCommand,
363 TextCommand,
364 RegexpCommand,
365 CustomFilterCommand,
366 Models
367}