1 | var Stream = require('stream')
|
2 |
|
3 | var io = require('socket.io-client')
|
4 | var SocketIoStream = require('socket.io-stream')
|
5 | var Request = require('request')
|
6 |
|
7 | var Base = require('../base')
|
8 | , Crypto = require('./crypto')
|
9 | , Logger = require('./logger')
|
10 | , PolyglotRelay = require('./polyglot_relay')
|
11 | , Publisher = require('./publisher')
|
12 |
|
13 | var EVENTS = [ 'connect', 'connect_error', 'connect_timeout', 'reconnect', 'reconnect_error', 'reconnect_failed', 'error', 'disconnect', 'open', 'close', 'reconnect_attempt' ]
|
14 | var PROXY_HTTP_HOST_USE_PROXY = 'USE_PROXY'
|
15 |
|
16 | var debug = Base.logger('ix-connector')
|
17 |
|
18 | var socket, proxyData, upstreamUrl, isUpstreamHttps
|
19 | var isRelay = false
|
20 | var isHttpProxy = false
|
21 | var deviceData = {}
|
22 |
|
23 | exports.connectToProxy = function(data, cb_err_data){
|
24 | debug('connectToProxy', data)
|
25 | Logger.loadFilter(function(){
|
26 | Publisher.withDeviceData(function(err, devData){
|
27 | deviceData = devData
|
28 | proxyData = data
|
29 | if (Base.env.IX_PROXY_HOST_OVERRIDE) { proxyData.proxyUrl = "https://"+Base.env.IX_PROXY_HOST_OVERRIDE }
|
30 | console.log('Connecting to '+proxyData.proxyUrl+'...')
|
31 | socket = io(proxyData.proxyUrl)
|
32 | setDebugHandlers(socket, EVENTS)
|
33 | setDebugHandlers(socket.io, EVENTS, 'manager:')
|
34 | socket.on('connect', function(){ onConnect(cb_err_data) })
|
35 | if (isHttpProxy) {
|
36 | var streamSocket = SocketIoStream(socket)
|
37 | setDebugHandlers(streamSocket, EVENTS, 'stream:')
|
38 | streamSocket.on('http-proxy', onHttpProxy)
|
39 | }
|
40 | else {
|
41 | socket.on('ix-call', onIxCall)
|
42 | socket.on('ix-ping', onIxPing)
|
43 | socket.on('ix-x', onIxX)
|
44 | }
|
45 | })
|
46 | })
|
47 | }
|
48 |
|
49 | exports.connectRelayToProxy = function(data, cb_err_data){
|
50 | debug('connectRelayToProxy', data)
|
51 | isRelay = true
|
52 | exports.connectToProxy(data, cb_err_data)
|
53 | }
|
54 |
|
55 | exports.connectHttpToProxy = function(upstream, data, cb_err_data){
|
56 | upstreamUrl = upstream || Base.env.IX_HTTP_PROXY
|
57 | if (upstreamUrl) {
|
58 | upstreamUrl = upstreamUrl.trim().replace(/\/$/, '')
|
59 | }
|
60 | if (!upstreamUrl) { cb_err_data("No upstream server defined."); return }
|
61 | debug('connectHttpToProxy', upstreamUrl, data)
|
62 | isHttpProxy = true
|
63 | isUpstreamHttps = upstreamUrl.slice(0,6) === 'https:'
|
64 | exports.connectToProxy(data, cb_err_data)
|
65 | }
|
66 |
|
67 | var setDebugHandlers = function(socket, events, prefix){
|
68 | prefix = prefix || ''
|
69 | var i, event
|
70 | for (i = 0; i < events.length; i++) {
|
71 | event = events[i]
|
72 | socket.on(event, debugHandlerFor(prefix+event))
|
73 | }
|
74 | }
|
75 |
|
76 | var debugHandlerFor = function(type){
|
77 | return function(){
|
78 | var args = Array.prototype.slice.call(arguments, 0)
|
79 | args.unshift('event:'+type)
|
80 | debug.apply(this, args)
|
81 | }
|
82 | }
|
83 |
|
84 | var onConnect = function(cb_err_data){
|
85 | debug('connect', arguments)
|
86 | socket.emit('set-client', { apiConfig: Base.apiConfig(), proxyUrl: proxyData.proxyUrl, proxyExport: proxyData.proxyExport, device: deviceData }, function(err, data){
|
87 | if (err) {
|
88 | console.log('Connect error: Unable to complete connection to '+proxyData.proxyUrl+': '+JSON.stringify(err))
|
89 | }
|
90 | else {
|
91 | console.log('Connected to '+proxyData.proxyUrl+'.')
|
92 | }
|
93 |
|
94 | debug('connect result', err, data)
|
95 | cb_err_data(err, data)
|
96 | })
|
97 | }
|
98 |
|
99 | var onIxCall = function(data, responder){
|
100 | debug('ix-call', arguments)
|
101 | try {
|
102 | Logger.logCallRequest(data)
|
103 | responder = Logger.wrapResponder(data, debug.wrap(responder))
|
104 | }
|
105 | catch(e){}
|
106 | try {
|
107 | if (isRelay) {
|
108 | Publisher.ixCall(data, responder, PolyglotRelay.invoker)
|
109 | }
|
110 | else {
|
111 | Publisher.ixCallNodejs(data, responder)
|
112 | }
|
113 | }
|
114 | catch(e) {
|
115 | responder(e)
|
116 | }
|
117 | }
|
118 |
|
119 | var onIxPing = function(replyData, responder){
|
120 | debug('ix-ping', replyData)
|
121 | responder(null, replyData)
|
122 | }
|
123 |
|
124 | var onIxX = function(cipherParams, responder){
|
125 | debug('ix-x', cipherParams)
|
126 | Crypto.decrypt(cipherParams, function(err, data){
|
127 | if (err) { responder.call(err); return }
|
128 | onIxCall(data, Crypto.encryptingResponder(responder))
|
129 | })
|
130 | }
|
131 |
|
132 | var onHttpProxy = function(faasReq, sioStream, cb_proxyResponse){
|
133 | Logger.logHttpRequest(faasReq)
|
134 | sioStream.allowHalfOpen = true
|
135 | cb_proxyResponse = debug.wrap(cb_proxyResponse)
|
136 | var url = upstreamUrl + faasReq.url
|
137 | var fwdOptions = {
|
138 | url: url
|
139 | , headers: faasReq.headers
|
140 | , method: faasReq.method
|
141 | , followRedirect: false
|
142 | }
|
143 | var httpHost = Base.env.IX_PROXY_HTTP_HOST
|
144 | if (isUpstreamHttps || httpHost === PROXY_HTTP_HOST_USE_PROXY) {
|
145 | delete fwdOptions.headers.host
|
146 | }
|
147 | if (httpHost && httpHost !== PROXY_HTTP_HOST_USE_PROXY) {
|
148 | fwdOptions.headers.host = httpHost
|
149 | }
|
150 | debug('http-proxy request', fwdOptions)
|
151 | var fwdReq = Request(fwdOptions)
|
152 | var proxyResponse = {}
|
153 | var responseStarted = false
|
154 | faasReq.responseBytes = 0
|
155 | var counter = new Stream.Transform()
|
156 | counter._transform = function(chunk, encoding, callback){
|
157 | debug('http-proxy response chunk', chunk)
|
158 | faasReq.responseBytes += chunk.length
|
159 | callback(null, chunk)
|
160 | }
|
161 | fwdReq.on('error', function(err){
|
162 | debug('fwdReq.error', err, proxyResponse)
|
163 | if (responseStarted) {
|
164 | console.error('duplicate ERROR response attempted for '+url)
|
165 | sioStream.emit('error', err)
|
166 | }
|
167 | else {
|
168 | responseStarted = true
|
169 | proxyResponse.error = err.toString()
|
170 | cb_proxyResponse(proxyResponse)
|
171 | }
|
172 | try {
|
173 | fwdReq.abort()
|
174 | fwdReq.destroy()
|
175 | sioStream.unpipe()
|
176 | sioStream.on('data', function(data){ debug('fwdReq.error data', data) })
|
177 | sioStream.end()
|
178 | }
|
179 | catch (e) { console.error(e) }
|
180 | Logger.logHttpResponseError(faasReq, proxyResponse)
|
181 | })
|
182 | fwdReq.on('response', function(response){
|
183 | Logger.logHttpResponseStart(faasReq, response)
|
184 | debug('fwdReq.response', response.headers, response.statusCode, response.statusMessage, proxyResponse)
|
185 | if (responseStarted) {
|
186 | console.error('duplicate SUCCESS response attempted for '+url)
|
187 | return
|
188 | }
|
189 | responseStarted = true
|
190 | proxyResponse.lag = faasReq.ts.responseStart - faasReq.ts.requestStart
|
191 | proxyResponse.headers = response.headers
|
192 | proxyResponse.statusCode = response.statusCode
|
193 | proxyResponse.statusMessage = response.statusMessage
|
194 | cb_proxyResponse(proxyResponse)
|
195 | fwdReq.pipe(counter).pipe(sioStream)
|
196 | })
|
197 | fwdReq.on('end', function(){
|
198 | Logger.logHttpResponseEnd(faasReq)
|
199 | })
|
200 | sioStream.pipe(fwdReq)
|
201 | }
|