require 'socket' require 'json' require 'thwait' class FaasIxServer @@around_filters = [] def self.start(require_paths) FaasIxServer.new(require_paths).run end def self.register_around_filter(&block) @@around_filters.push(block) end def run socket_opts = ENV['FAAS_IX_SOCKET'].to_s.split(':') type = socket_opts.shift if type == 'tcp' run_tcp_socket else run_unix_socket(*socket_opts) end end private def initialize(require_paths) @require_paths = require_paths end def run_tcp_socket TCPServer.open(0) do |server| run_server(server) end end def run_unix_socket(socket_dir=nil, socket_file=nil) socket_file = "faas.sock.#{Process.pid}.#{rand(10000)}" if !socket_file || socket_file.strip == '' socket_path = socket_file if socket_path[0] != '/' if !socket_dir || socket_dir.strip == '' require 'tmpdir' socket_dir = Dir.tmpdir end socket_dir = socket_dir.sub(/\/$/,'') socket_path = File.join(socket_dir, socket_path) end # UNIXServer.open(socket_path) do |server| # doesn't delete socket_path :( Socket.unix_server_socket(socket_path) do |server| run_server(server) end end def run_server(server) # Publisher.publishAndFindRelay(function(err, data){ # Connector.connectToRelay(data, function(err, data){ ### # - Start polyglot nodejs socket. # - Get config settings from socket (incl. where to look for methods). # - Find & gather exported methods info. # - Send method info to nodejs socket. # - Run loop to wait for inbound method calls. ### @server = server start_relay # Run the nodejs process. listener_thread = Thread.new do # Listen for the relay crash in parallel. setup_relay listen_for_calls end ThreadsWait.all_waits(@relay_thread, listener_thread) do |t| raise "Thread exited with value: #{t.value.inspect}" end # relay_exit = @relay_thread.value # raise "Relay exited with status #{relay_exit}" end def start_relay @relay_thread = Thread.new do cmd_path = File.join(__dir__, (ENV['FAAS_IX_DEV'] == 'TRUE' ? 'dev-' : '')+'faas-ix-polyglot-relay') child_cmd = [cmd_path] addr = @server.connect_address if addr.ip? child_cmd << ['tcp', addr.ip_address, addr.ip_port.to_s].join(':') else child_cmd << ['unix', addr.unix_path].join(':') end # Kill the relay process when the ruby process is killed. trap('TERM') do if @relay_pid Process.kill('TERM', @relay_pid) else exit end end @relay_pid = spawn(*child_cmd) Thread.pass Process.wait(@relay_pid) $? # return the process status. end end def setup_relay socket, addrinfo = @server.accept @config = JSON.parse(socket.read) # socket.close_read exports = load_exports puts "EXPORTS: "+exports.inspect socket.write(exports.to_json) socket.close end def load_exports mods = find_methods_in_paths mods.each_with_object({}) do |name_info,exports| exports[name_info[0]] = { exports: name_info[1] } end end def find_methods_in_paths old_constants = Module.constants @files = {} @require_paths.each do |path| Dir.glob(File.join(path, '**', '*.rb')).sort.each do |filename| absname = File.absolute_path(filename) @files[absname] = filename.sub(path,'').sub(/^\//,'') begin puts "loading #{filename}" require absname rescue ScriptError, StandardError => e puts e.inspect end end end new_constants = Module.constants - old_constants meta_for_atts(Object, new_constants) end def meta_for_atts(parent, constants=nil) constants ||= parent.constants @found_constants ||= {} atts = {} constants.each do |c| mod = parent.const_get(c) next if @found_constants[mod] # careful about circular references @found_constants[mod] = true if mod.is_a?(Module) sub_atts = {} if mod.is_a?(Class) mod.public_instance_methods(false).each do |meth_name| meth = mod.public_instance_method(meth_name) file, line = meth.source_location if @files.has_key?(file) sub_atts[meth_name] = { isFunc: true } end end end sub_atts.merge!(meta_for_atts(mod)) atts[c] = { isFunc: false, atts: sub_atts } unless sub_atts.empty? end end atts end def meta_for_atts_static(parent, constants=nil) constants ||= parent.constants @found_constants ||= {} atts = {} constants.each do |c| mod = parent.const_get(c) next if @found_constants[mod] # careful about circular references @found_constants[mod] = true if mod.is_a?(Module) sub_atts = {} mod.public_methods(false).each do |meth_name| meth = mod.public_method(meth_name) file, line = meth.source_location if @files.has_key?(file) sub_atts[meth_name] = { isFunc: true } end end sub_atts.merge!(meta_for_atts_static(mod)) atts[c] = { isFunc: false, atts: sub_atts } unless sub_atts.empty? end end atts end def listen_for_calls Socket.accept_loop(@server) do |socket, addrinfo| Thread.new(socket, addrinfo) do |s,_a| # new thread for each call begin ix_call = JSON.parse(s.read) data = call_with_callbacks(ix_call) rescue => e puts "ERROR: #{e.message} - "+e.inspect puts e.backtrace err = e end resp = { err: err && err.to_s, data: data } s.write(resp.to_json) s.close end end end def call_with_callbacks(ix_call) api_call = APICall.new(ix_call) wrapped = proc { |old_api_call, &block| block.call(old_api_call) } @@around_filters.reverse_each do |cb| old_wrapped = wrapped # needed to prevent infinite recursion wrapped = proc do |old_api_call, &block| cb.call(old_api_call) do |new_api_call| old_wrapped.call(new_api_call, &block) end end end wrapped.call(api_call) do |new_api_call| new_api_call.call_instance.send(new_api_call.call_method, *new_api_call.args) end end def call_with_callbacks_static(mod, method_name, args) wrapped = proc { |&block| block.call } @@around_filters.reverse_each do |cb| old_wrapped = wrapped # needed to prevent infinite recursion wrapped = proc do |&block| cb.call do old_wrapped.call(&block) end end end wrapped.call do mod.send(method_name, *args) end end class APICall attr_reader :api attr_reader :args attr_reader :custom attr_reader :token attr_reader :req attr_reader :call_instance attr_reader :call_class attr_reader :call_method def initialize(ix_call) @api = ix_call['api'] @args = ix_call['args'] @custom = ix_call['custom'] @token = ix_call['token'] @req = ix_call['req'] @call_class, @call_method = find_method(@api) @call_instance = @call_class.new end private def find_method(name) names = name.split('.') meth = names.pop.to_sym mod = Object.const_get(names.join('::')) [mod, meth] end end # class APICall end