UNPKG

4.48 kBtext/coffeescriptView Raw
1_ = require 'lodash'
2colors = require 'colors'
3Redis = require 'ioredis'
4octobluExpress = require 'express-octoblu'
5RedisNS = require '@octoblu/redis-ns'
6RateLimitChecker = require 'meshblu-core-rate-limit-checker'
7JobLogger = require 'job-logger'
8debug = require('debug')('meshblu-core-protocol-adapter-http:server')
9{ JobManagerRequester } = require 'meshblu-core-job-manager'
10enableDestroy = require 'server-destroy'
11
12Router = require './router'
13JobToHttp = require './helpers/job-to-http'
14MeshbluAuthParser = require './helpers/meshblu-auth-parser'
15
16class Server
17 constructor: (options)->
18 {
19 @disableLogging
20 @port
21 @aliasServerUri
22 @redisUri
23 @cacheRedisUri
24 @namespace
25 @maxConnections
26 @jobTimeoutSeconds
27 @jobLogSampleRate
28 @jobLogSampleRateOverrideUuids
29 @jobLogRedisUri
30 @jobLogQueue
31 @requestQueueName
32 @responseQueueName
33 } = options
34 @panic 'missing @jobLogQueue', 2 unless @jobLogQueue?
35 @panic 'missing @jobLogRedisUri', 2 unless @jobLogRedisUri?
36 @panic 'missing @redisUri', 2 unless @redisUri?
37 @panic 'missing @cacheRedisUri', 2 unless @cacheRedisUri?
38 @panic 'missing @jobLogSampleRate', 2 unless @jobLogSampleRate?
39 @panic 'missing @requestQueueName', 2 if _.isEmpty @requestQueueName
40 @panic 'missing @responseQueueName', 2 if _.isEmpty @responseQueueName
41
42 @cacheClient = new Redis @cacheRedisUri, dropBufferSupport: true
43 rateLimitCheckerClient = new RedisNS 'meshblu-count', @cacheClient
44 @rateLimitChecker = new RateLimitChecker client: rateLimitCheckerClient
45 @authParser = new MeshbluAuthParser
46
47 address: =>
48 @server.address()
49
50 panic: (message, exitCode, error) =>
51 error ?= new Error('generic error')
52 console.error colors.red message
53 console.error error?.stack
54 process.exit exitCode
55
56 run: (callback) =>
57 app = octobluExpress({ @disableLogging, bodyLimit: '10mb' })
58
59 app.use '/proofoflife', (req, res) =>
60 @jobManager.healthcheck (error, healthy) =>
61 return res.sendError error if error?
62 return res.sendError new Error("Job Manager Unhealthy") unless healthy
63 @cacheClient.set 'test:write', Date.now(), (error) =>
64 return res.sendError error if error?
65 res.send online: true
66
67 rateLimit = (req, res, next) =>
68 as = req.get 'x-meshblu-as'
69 auth = @authParser.parse req
70 uuid = as ? auth?.uuid
71 return next() unless uuid?
72 @rateLimitChecker.isLimited {uuid}, (error, result) =>
73 return res.sendError error if error?
74 return res.status(429).send(message: 'Too Many Requests') if result
75 next()
76 app.use rateLimit
77
78 jobLogger = new JobLogger
79 client: new Redis @jobLogRedisUri, dropBufferSupport: true
80 indexPrefix: 'metric:meshblu-core-protocol-adapter-http'
81 type: 'meshblu-core-protocol-adapter-http:request'
82 jobLogQueue: @jobLogQueue
83
84 @jobManager = new JobManagerRequester {
85 @namespace
86 @redisUri
87 @jobTimeoutSeconds
88 @jobLogSampleRate
89 @jobLogSampleRateOverrideUuids
90 @requestQueueName
91 @responseQueueName
92 queueTimeoutSeconds: @jobTimeoutSeconds
93 maxConnections: 2
94 }
95
96 @jobManager.once 'error', (error) =>
97 @stop =>
98 @panic 'fatal job manager error', 1, error
99
100 @jobManager.once 'factoryCreateError', (error) =>
101 @stop =>
102 @panic 'fatal job manager factoryCreateError', 1, error
103
104 @jobManager._do = @jobManager.do
105 @jobManager.do = (request, callback) =>
106 @jobManager._do request, (error, response) =>
107 jobLogger.log { error, request, response }, (jobLoggerError) =>
108 return callback jobLoggerError if jobLoggerError?
109 callback error, response
110
111 @jobManager.start (error) =>
112 return callback error if error?
113
114 jobToHttp = new JobToHttp
115 router = new Router { @jobManager, jobToHttp }
116 router.route app
117
118 @server = app.listen @port, (error) =>
119 return callback error if error?
120 debug 'meshblu-http listening on port', @port
121 callback null
122 enableDestroy @server
123
124 destroy: (callback) =>
125 @server.destroy =>
126 @jobManager.stop callback
127
128 stop: (callback) =>
129 @server.close =>
130 @jobManager.stop callback
131
132module.exports = Server