UNPKG

7.96 kBPlain TextView Raw
1require 'socket'
2require 'json'
3require 'thwait'
4
5class FaasIxServer
6
7 @@around_filters = []
8
9 def self.start(require_paths)
10 FaasIxServer.new(require_paths).run
11 end
12
13 def self.register_around_filter(&block)
14 @@around_filters.push(block)
15 end
16
17 def run
18 socket_opts = ENV['FAAS_IX_SOCKET'].to_s.split(':')
19 type = socket_opts.shift
20 if type == 'tcp'
21 run_tcp_socket
22 else
23 run_unix_socket(*socket_opts)
24 end
25 end
26
27
28 private
29
30 def initialize(require_paths)
31 @require_paths = require_paths
32 if @require_paths.empty?
33 paths = ENV['FAAS_IX_API_DIR']
34 @require_paths = paths.split(':') if paths
35 end
36 raise "No api directories specified" if @require_paths.empty?
37 end
38
39 def run_tcp_socket
40 TCPServer.open(0) do |server|
41 run_server(server)
42 end
43 end
44
45 def run_unix_socket(socket_dir=nil, socket_file=nil)
46 socket_file = "faas.sock.#{Process.pid}.#{rand(10000)}" if !socket_file || socket_file.strip == ''
47 socket_path = socket_file
48 if socket_path[0] != '/'
49 if !socket_dir || socket_dir.strip == ''
50 require 'tmpdir'
51 socket_dir = Dir.tmpdir
52 end
53 socket_dir = socket_dir.sub(/\/$/,'')
54 socket_path = File.join(socket_dir, socket_path)
55 end
56 # UNIXServer.open(socket_path) do |server| # doesn't delete socket_path :(
57 Socket.unix_server_socket(socket_path) do |server|
58 run_server(server)
59 end
60 end
61
62 def run_server(server)
63 # Publisher.publishAndFindRelay(function(err, data){
64 # Connector.connectToRelay(data, function(err, data){
65
66 ###
67 # - Start polyglot nodejs socket.
68 # - Get config settings from socket (incl. where to look for methods).
69 # - Find & gather exported methods info.
70 # - Send method info to nodejs socket.
71 # - Run loop to wait for inbound method calls.
72 ###
73
74 @server = server
75
76 start_relay # Run the nodejs process.
77
78 listener_thread = Thread.new do # Listen for the relay crash in parallel.
79 setup_relay
80 listen_for_calls
81 end
82
83 ThreadsWait.all_waits(@relay_thread, listener_thread) do |t|
84 raise "Thread exited with value: #{t.value.inspect}"
85 end
86 # relay_exit = @relay_thread.value
87 # raise "Relay exited with status #{relay_exit}"
88 end
89
90
91 def start_relay
92 @relay_thread = Thread.new do
93 cmd_path = File.join(__dir__, (ENV['FAAS_IX_DEV'] == 'TRUE' ? 'dev-' : '')+'faas-ix-polyglot-relay')
94 child_cmd = [cmd_path]
95 addr = @server.connect_address
96 if addr.ip?
97 child_cmd << ['tcp', addr.ip_address, addr.ip_port.to_s].join(':')
98 else
99 child_cmd << ['unix', addr.unix_path].join(':')
100 end
101 # Kill the relay process when the ruby process is killed.
102 trap('TERM') do
103 if @relay_pid
104 Process.kill('TERM', @relay_pid)
105 else
106 exit
107 end
108 end
109 @relay_pid = spawn(*child_cmd)
110 Thread.pass
111 Process.wait(@relay_pid)
112 $? # return the process status.
113 end
114 end
115
116 def setup_relay
117 socket, addrinfo = @server.accept
118 @config = JSON.parse(socket.read)
119 # socket.close_read
120 exports = load_exports
121 puts "EXPORTS: "+exports.inspect
122 socket.write(exports.to_json)
123 socket.close
124 end
125
126 def load_exports
127 mods = find_methods_in_paths
128 mods.each_with_object({}) do |name_info,exports|
129 exports[name_info[0]] = { exports: name_info[1] }
130 end
131 end
132
133 def find_methods_in_paths
134 old_constants = Module.constants
135 @files = {}
136 @require_paths.each do |path|
137 Dir.glob(File.join(path, '**', '*.rb')).sort.each do |filename|
138 absname = File.absolute_path(filename)
139 @files[absname] = filename.sub(path,'').sub(/^\//,'')
140 begin
141 puts "loading #{filename}"
142 require absname
143 rescue ScriptError, StandardError => e
144 puts e.inspect
145 end
146 end
147 end
148 new_constants = Module.constants - old_constants
149 meta_for_atts(Object, new_constants)
150 end
151
152 def meta_for_atts(parent, constants=nil)
153 constants ||= parent.constants
154 @found_constants ||= {}
155 atts = {}
156 constants.each do |c|
157 mod = parent.const_get(c)
158 next if @found_constants[mod] # careful about circular references
159 @found_constants[mod] = true
160 if mod.is_a?(Module)
161 sub_atts = {}
162 if mod.is_a?(Class)
163 mod.public_instance_methods(false).each do |meth_name|
164 meth = mod.public_instance_method(meth_name)
165 file, line = meth.source_location
166 if @files.has_key?(file)
167 sub_atts[meth_name] = { isFunc: true }
168 end
169 end
170 end
171 sub_atts.merge!(meta_for_atts(mod))
172 atts[c] = { isFunc: false, atts: sub_atts } unless sub_atts.empty?
173 end
174 end
175 atts
176 end
177
178 def meta_for_atts_static(parent, constants=nil)
179 constants ||= parent.constants
180 @found_constants ||= {}
181 atts = {}
182 constants.each do |c|
183 mod = parent.const_get(c)
184 next if @found_constants[mod] # careful about circular references
185 @found_constants[mod] = true
186 if mod.is_a?(Module)
187 sub_atts = {}
188 mod.public_methods(false).each do |meth_name|
189 meth = mod.public_method(meth_name)
190 file, line = meth.source_location
191 if @files.has_key?(file)
192 sub_atts[meth_name] = { isFunc: true }
193 end
194 end
195 sub_atts.merge!(meta_for_atts_static(mod))
196 atts[c] = { isFunc: false, atts: sub_atts } unless sub_atts.empty?
197 end
198 end
199 atts
200 end
201
202 def listen_for_calls
203 Socket.accept_loop(@server) do |socket, addrinfo|
204 Thread.new(socket, addrinfo) do |s,_a| # new thread for each call
205 begin
206 ix_call = JSON.parse(s.read)
207 data = call_with_callbacks(ix_call)
208 rescue => e
209 puts "ERROR: #{e.message} - "+e.inspect
210 puts e.backtrace
211 err = e
212 end
213 resp = { err: err && err.to_s, data: data }
214 s.write(resp.to_json)
215 s.close
216 end
217 end
218 end
219
220 def call_with_callbacks(ix_call)
221 api_call = api_call_class.new(ix_call)
222 wrapped = proc { |old_api_call, &block| block.call(old_api_call) }
223 @@around_filters.reverse_each do |cb|
224 old_wrapped = wrapped # needed to prevent infinite recursion
225 wrapped = proc do |old_api_call, &block|
226 cb.call(old_api_call) do |new_api_call|
227 old_wrapped.call(new_api_call, &block)
228 end
229 end
230 end
231 wrapped.call(api_call) do |new_api_call|
232 new_api_call.call_instance.send(new_api_call.call_method, *new_api_call.args)
233 end
234 end
235
236 def call_with_callbacks_static(mod, method_name, args)
237 wrapped = proc { |&block| block.call }
238 @@around_filters.reverse_each do |cb|
239 old_wrapped = wrapped # needed to prevent infinite recursion
240 wrapped = proc do |&block|
241 cb.call do
242 old_wrapped.call(&block)
243 end
244 end
245 end
246 wrapped.call do
247 mod.send(method_name, *args)
248 end
249 end
250
251 def api_call_class
252 @@api_call_class
253 end
254
255
256 class APICall
257
258 attr_reader :api
259 attr_reader :args
260 attr_reader :custom
261 attr_reader :token
262 attr_reader :req
263 attr_reader :call_instance
264 attr_reader :call_class
265 attr_reader :call_method
266
267 def initialize(ix_call)
268 @ix_call = ix_call
269 @api = ix_call['api']
270 @args = ix_call['args']
271 @custom = ix_call['custom']
272 @token = ix_call['token']
273 @req = ix_call['req']
274 @call_class, @call_method = find_method(@api)
275 @call_instance = @call_class.new
276 end
277
278 def to_s
279 to_json
280 end
281
282 def to_json
283 JSON.generate(to_h)
284 end
285
286 def to_h
287 @ix_call
288 end
289
290 private
291
292 def find_method(name)
293 names = name.split('.')
294 meth = names.pop.to_sym
295 mod = Object.const_get(names.join('::'))
296 [mod, meth]
297 end
298
299 end # class APICall
300 @@api_call_class = APICall
301
302end