UNPKG

6.36 kBJavaScriptView Raw
1var Stream = require('stream')
2
3var io = require('socket.io-client')
4var SocketIoStream = require('socket.io-stream')
5var Request = require('request')
6
7var Base = require('../base')
8 , Crypto = require('./crypto')
9 , Logger = require('./logger')
10 , PolyglotRelay = require('./polyglot_relay')
11 , Publisher = require('./publisher')
12
13var EVENTS = [ 'connect', 'connect_error', 'connect_timeout', 'reconnect', 'reconnect_error', 'reconnect_failed', 'error', 'disconnect', 'open', 'close', 'reconnect_attempt' ]
14var PROXY_HTTP_HOST_USE_PROXY = 'USE_PROXY'
15
16var debug = Base.logger('ix-connector')
17
18var socket, proxyData, upstreamUrl, isUpstreamHttps
19var isRelay = false
20var isHttpProxy = false
21var deviceData = {}
22
23exports.connectToProxy = function(data, cb_err_data){
24 debug('connectToProxy', data)
25 Logger.loadFilter(function(){
26 Publisher.withDeviceData(function(err, devData){
27 deviceData = devData
28 proxyData = data
29 if (Base.env.IX_PROXY_HOST_OVERRIDE) { proxyData.proxyUrl = "https://"+Base.env.IX_PROXY_HOST_OVERRIDE }
30 console.log('Connecting to '+proxyData.proxyUrl+'...')
31 socket = io(proxyData.proxyUrl)
32 setDebugHandlers(socket, EVENTS)
33 setDebugHandlers(socket.io, EVENTS, 'manager:')
34 socket.on('connect', function(){ onConnect(cb_err_data) })
35 if (isHttpProxy) {
36 var streamSocket = SocketIoStream(socket)
37 setDebugHandlers(streamSocket, EVENTS, 'stream:')
38 streamSocket.on('http-proxy', onHttpProxy)
39 }
40 else {
41 socket.on('ix-call', onIxCall)
42 socket.on('ix-ping', onIxPing)
43 socket.on('ix-x', onIxX)
44 }
45 })
46 })
47}
48
49exports.connectRelayToProxy = function(data, cb_err_data){
50 debug('connectRelayToProxy', data)
51 isRelay = true
52 exports.connectToProxy(data, cb_err_data)
53}
54
55exports.connectHttpToProxy = function(upstream, data, cb_err_data){
56 upstreamUrl = upstream || Base.env.IX_HTTP_PROXY
57 if (upstreamUrl) {
58 upstreamUrl = upstreamUrl.trim().replace(/\/$/, '')
59 }
60 if (!upstreamUrl) { cb_err_data("No upstream server defined."); return }
61 debug('connectHttpToProxy', upstreamUrl, data)
62 isHttpProxy = true
63 isUpstreamHttps = upstreamUrl.slice(0,6) === 'https:'
64 exports.connectToProxy(data, cb_err_data)
65}
66
67var setDebugHandlers = function(socket, events, prefix){
68 prefix = prefix || ''
69 var i, event
70 for (i = 0; i < events.length; i++) {
71 event = events[i]
72 socket.on(event, debugHandlerFor(prefix+event))
73 }
74}
75
76var debugHandlerFor = function(type){
77 return function(){
78 var args = Array.prototype.slice.call(arguments, 0)
79 args.unshift('event:'+type)
80 debug.apply(this, args)
81 }
82}
83
84var onConnect = function(cb_err_data){
85 debug('connect', arguments)
86 socket.emit('set-client', { apiConfig: Base.apiConfig(), proxyUrl: proxyData.proxyUrl, proxyExport: proxyData.proxyExport, device: deviceData }, function(err, data){
87 if (err) {
88 console.log('Connect error: Unable to complete connection to '+proxyData.proxyUrl+': '+JSON.stringify(err))
89 }
90 else {
91 console.log('Connected to '+proxyData.proxyUrl+'.')
92 }
93 // TODO: if (err) { retryConnection }
94 debug('connect result', err, data)
95 cb_err_data(err, data)
96 })
97}
98
99var onIxCall = function(data, responder){
100 debug('ix-call', arguments)
101 try {
102 Logger.logCallRequest(data)
103 responder = Logger.wrapResponder(data, debug.wrap(responder))
104 }
105 catch(e){}
106 try {
107 if (isRelay) {
108 Publisher.ixCall(data, responder, PolyglotRelay.invoker)
109 }
110 else {
111 Publisher.ixCallNodejs(data, responder)
112 }
113 }
114 catch(e) {
115 responder(e)
116 }
117}
118
119var onIxPing = function(replyData, responder){
120 debug('ix-ping', replyData)
121 responder(null, replyData)
122}
123
124var onIxX = function(cipherParams, responder){
125 debug('ix-x', cipherParams)
126 Crypto.decrypt(cipherParams, function(err, data){
127 if (err) { responder.call(err); return }
128 onIxCall(data, Crypto.encryptingResponder(responder))
129 })
130}
131
132var onHttpProxy = function(faasReq, sioStream, cb_proxyResponse){
133 Logger.logHttpRequest(faasReq)
134 sioStream.allowHalfOpen = true // Let reading end before writing closes.
135 cb_proxyResponse = debug.wrap(cb_proxyResponse)
136 var url = upstreamUrl + faasReq.url
137 var fwdOptions = {
138 url: url
139 , headers: faasReq.headers
140 , method: faasReq.method
141 , followRedirect: false
142 }
143 var httpHost = Base.env.IX_PROXY_HTTP_HOST
144 if (isUpstreamHttps || httpHost === PROXY_HTTP_HOST_USE_PROXY) {
145 delete fwdOptions.headers.host
146 }
147 if (httpHost && httpHost !== PROXY_HTTP_HOST_USE_PROXY) {
148 fwdOptions.headers.host = httpHost
149 }
150 debug('http-proxy request', fwdOptions)
151 var fwdReq = Request(fwdOptions)
152 var proxyResponse = {}
153 var responseStarted = false
154 faasReq.responseBytes = 0
155 var counter = new Stream.Transform()
156 counter._transform = function(chunk, encoding, callback){
157 debug('http-proxy response chunk', chunk)
158 faasReq.responseBytes += chunk.length
159 callback(null, chunk)
160 }
161 fwdReq.on('error', function(err){
162 debug('fwdReq.error', err, proxyResponse)
163 if (responseStarted) {
164 console.error('duplicate ERROR response attempted for '+url)
165 sioStream.emit('error', err)
166 }
167 else {
168 responseStarted = true
169 proxyResponse.error = err.toString()
170 cb_proxyResponse(proxyResponse)
171 }
172 try {
173 fwdReq.abort()
174 fwdReq.destroy()
175 sioStream.unpipe()
176 sioStream.on('data', function(data){ debug('fwdReq.error data', data) })
177 sioStream.end()
178 }
179 catch (e) { console.error(e) }
180 Logger.logHttpResponseError(faasReq, proxyResponse)
181 })
182 fwdReq.on('response', function(response){
183 Logger.logHttpResponseStart(faasReq, response)
184 debug('fwdReq.response', response.headers, response.statusCode, response.statusMessage, proxyResponse)
185 if (responseStarted) {
186 console.error('duplicate SUCCESS response attempted for '+url)
187 return
188 }
189 responseStarted = true
190 proxyResponse.lag = faasReq.ts.responseStart - faasReq.ts.requestStart
191 proxyResponse.headers = response.headers
192 proxyResponse.statusCode = response.statusCode
193 proxyResponse.statusMessage = response.statusMessage
194 cb_proxyResponse(proxyResponse)
195 fwdReq.pipe(counter).pipe(sioStream)
196 })
197 fwdReq.on('end', function(){
198 Logger.logHttpResponseEnd(faasReq)
199 })
200 sioStream.pipe(fwdReq)
201}