{"version":3,"sources":["../../src/nodes/NodeManager.ts"],"sourcesContent":["import Cluster from '../cluster/index.js';\nimport Network, { Connection } from '../network/index.js';\nimport Node from './Node.js';\nimport { ServiceAddress, Stash } from '../service/index.js';\nimport { Pool, await_interval, log } from '../utils/index.js';\n\n/* this class is created to manage the nodes socket conenctions on a service,\n * it will handle new conenction fron node, and will remove them when they are disconnected\n * this class provide an interface for other classes to interact with all the nodes\n * if a class wants to run a function on each difrent node,\n * if a class wants to broadcast a message to all the nodes\n * if a class want to get the next idle node,\n * or the number of node connected to the network\n * or the number of node that have been disconnected\n * or which are currently busy, or idle, etc\n */\n\ntype Options = {\n    // name of the service\n    name: string,\n    // the host and port of the service\n    host: string,\n    port: number,\n    // the number of processes that will be started on each node\n    number_of_nodes?: number,\n    max_number_of_nodes?: number,\n    min_number_of_node?: number,\n    // the number of request that have to be in queue before increasing the number of processes\n    increase_node_at_requests?: number,\n    // the number of node that have to be idle before decreasing the number of processes\n    decrease_node_at_idles?: number,\n    // you can pass the stash object to the node manager\n    stash?: Stash,\n}\n\nclass NodeManager {\n    private name: string;\n    private network: Network;\n    //private heartBeat: number = 1000;\n    private nodes: Pool<Node> = new Pool();\n    private options: Options;\n    private cluster = new Cluster({});\n    private stash: Stash | null;\n\n\n    constructor(options: Options) {\n        this.name = options.name;\n        this.options = options;\n        this.network = new Network({ name: this.name + '_node_manager' });\n        // create server\n        this.network.createServer(\n            this.name + '_node_manager',\n            this.options.host,\n            this.options.port\n        );\n        // handle when new node is connected\n        this.network.onNodeConnection(this.handleNewNode.bind(this));\n        // handle when a node is disconnected\n        this.network.onNodeDisconnect(this.handleNodeDisconnect.bind(this));\n        // set the stash\n        this.stash = options.stash || null;\n  }\n\n  private handleNewNode(connection: Connection) {\n      /* this function is called when a new node is connected to the master */\n      log('[Node manager] Got a new connectection from a node');\n      // create a new node\n      let node = new Node();\n      // set the functions for the stash of objects\n      node.setStashFunctions({\n          get: async (key: string) => await this.stash?.get(key),\n          set: async (key: string, value: any) => await this.stash?.set(key, value),\n      });\n      // set the connection to the node\n      node.setNodeConnection(connection, this.network);\n      // add callback on status change\n      node.setStatusChangeCallback(this.handleStatusChange.bind(this));\n      // get the id of the node\n      let id = node.getId();\n      if(id === undefined) throw new Error('node id is undefined');\n      // add to the pool\n      this.nodes.add(id, node);\n      // add to the enabled pool\n      this.setIdle(id);\n  }\n\n\n  private handleNodeDisconnect(connection: Connection) {\n      // get the node id\n      let id = connection.getId();\n      if(id === undefined) throw new Error('node id is undefined');\n      // remove the node from the pool\n      this.nodes.remove(id);\n  }\n\n  private handleStatusChange(status: string, node: Node) {\n      // if the node is idle add it to the enabled pool\n      // if the node is busy add it to the disabled pool\n      if(!status) throw new Error('status is undefined');\n      let id = node.getId();\n      if(id === undefined) throw new Error('node id is undefined');\n      if(node.isIdle() || node.isError())\n          this.setIdle(id);\n      else if(node.isBusy())\n          this.setBusy(id);\n      else\n          throw new Error('invalid node status');\n  }\n\n  public async getIdle(node_id: string = '') : Promise<Node> {\n      /* this function return a node that is idle */\n      if(node_id !== '') { // we are looking for a specific node on the pool\n          let node = this.getNode(node_id);\n          // await for seelcted node to be idle\n          await await_interval(() => node.isIdle(), 60 * 60 * 60 * 1000).catch(() => {\n              throw new Error(`timeout of one hour, node ${node_id} is not idle`);\n          });\n          return node;\n      }\n      // check if there are nodes in the pool\n      if(this.nodes.isEmpty())\n          log('[node manager] (WARNING) no nodes found');\n      // await until we get a node which is idle\n      // 0 will make it wait for every for a idle node\n      await await_interval(() => this.nodes.hasEnabled(), 0)\n      .catch(() => { throw new Error('timeout of 10 seconds, no idle node found') });\n      //log('[node manager] got idle node');\n      // get the next node\n      let node = this.nodes.pop();\n      if(node === null) throw new Error('node is null');\n      // return the node\n      return node\n  }\n\n  public getBusy(){\n      // returned a single busy node\n      return this.nodes.getDisabled().pop();\n  }\n\n  public getIdleNodes() : Node[] {\n      /* this function return the nodes which are idle */\n      return this.nodes.getEnabledObjects();\n  }\n\n  public getBusyNodes() : Node[] {\n      /* this function return the nodes which are busy */\n      return this.nodes.getDisabledObjects();\n  }\n\n  public async forEach(callback: (node: Node) => void) {\n      let nodes = this.nodes.toArray();\n      // for each node, make a promise\n      let promises = nodes.map(async (node: Node) => {\n          if(node.isBusy()) await node.toFinish();\n          return callback(node);\n      });\n      // wait for all the promises to resolve\n      return Promise.all(promises);\n  }\n\n  public async registerServices(services: ServiceAddress[]) {\n      // register the services to all the nodes\n      return this.broadcast(\n          async (node: Node) => await node.registerServices(services)\n      );\n  }\n\n  public async spawnNodes(name: string = '', count: number = 1, metadata: any = {}) {\n      /* spawn new nodes */\n      if(name === '') name = 'node_' + this.name;\n      log('[nodeManager][spawnNodes] spawning nodes', name, count);\n      this.cluster.spawn(name, {\n          numberOfSpawns: count,\n          metadata: metadata\n      });\n  }\n\n  public async killNode(nodeId: string = '') {\n      // this function will get an idle node fom the pool\n      if(this.nodes.isEmpty())\n          return false\n      // get an idle node\n      let node = (nodeId === '')?\n          this.nodes.removeOne() :\n          this.nodes.remove(nodeId);\n      if(node === null || node === undefined)\n          throw new Error('Node sentenced to death could not be found');\n      // and exit it\n      await node.exit();\n  }\n\n  public async killNodes(nodesId: string[]=[]) {\n      /* kill nodes */\n      for(let nodeId of nodesId)\n          await this.killNode(nodeId);\n  }\n\n  public getIdleCount(){\n      // return the number of idle nodes\n    return this.nodes.getEnabledCount();\n  }\n\n  public getBusyCount(){\n      // return the number of busy nodes\n    return this.nodes.getDisabledCount();\n  }\n\n  public getNodes() {\n      // get all nodes\n      return this.nodes.toArray();\n  }\n\n  public nextNode() {\n      return this.nodes.next();\n  }\n\n  public getNodeCount() {\n    return this.nodes.size();\n  }\n\n  public getNode(nodeId: string) {\n      // get a node by its id\n      let node = this.nodes.get(nodeId);\n      if(node === null) throw new Error(`[node manager] (ERROR) selected node ${nodeId} not found`);\n      return node;\n  }\n\n  public getListeners() {\n      if(this.network === undefined) throw new Error('network is undefined');\n      return this.network.getRegisteredListeners();\n  }\n\n  public async numberOfNodesConnected(count: number) {\n      let timeout = 100000;\n      await await_interval(() => this.nodes.size() >= count, timeout)\n      .catch(() => { throw new Error(`timeout of ${timeout} seconds, not enough nodes connected`) });\n      return true;\n  }\n\n\n  public async exit() {\n      // close all the nodes\n      return this.broadcast(\n          async (node: Node) => await node.exit()\n      );\n  }\n\n  private async broadcast(callback: (node: Node) => any) {\n      // get all the nodes\n      let nodes = this.nodes.toArray();\n      // for each node, make a promise\n      let promises = nodes.map(\n          async (node: Node) => await callback(node)\n      );\n      // wait for all the promises to resolve\n      return Promise.all(promises);\n  }\n\n  private setIdle = (NodeId: string) => this.nodes.enable(NodeId);\n\n  private setBusy = (NodeId: string) => this.nodes.disable(NodeId);\n\n  /* synonims */\n  public addNode = this.spawnNodes\n  public removeNode = this.killNodes\n  public getNumberOfNodes = this.getNodeCount;\n\n}\n\nexport default NodeManager;\n"],"mappings":";;;;;;;;;;;;;;;;;;;;;;;;;;;;;;AAAA;AAAA;AAAA;AAAA;AAAA;AAAA,qBAAoB;AACpB,qBAAoC;AACpC,kBAAiB;AAEjB,mBAA0C;AA+B1C,MAAM,YAAY;AAAA,EAUd,YAAY,SAAkB;AAT9B,wBAAQ;AACR,wBAAQ;AAER;AAAA,wBAAQ,SAAoB,IAAI,kBAAK;AACrC,wBAAQ;AACR,wBAAQ,WAAU,IAAI,eAAAA,QAAQ,CAAC,CAAC;AAChC,wBAAQ;AAwNV,wBAAQ,WAAU,CAAC,WAAmB,KAAK,MAAM,OAAO,MAAM;AAE9D,wBAAQ,WAAU,CAAC,WAAmB,KAAK,MAAM,QAAQ,MAAM;AAG/D;AAAA,wBAAO,WAAU,KAAK;AACtB,wBAAO,cAAa,KAAK;AACzB,wBAAO,oBAAmB,KAAK;AA3NzB,SAAK,OAAO,QAAQ;AACpB,SAAK,UAAU;AACf,SAAK,UAAU,IAAI,eAAAC,QAAQ,EAAE,MAAM,KAAK,OAAO,gBAAgB,CAAC;AAEhE,SAAK,QAAQ;AAAA,MACT,KAAK,OAAO;AAAA,MACZ,KAAK,QAAQ;AAAA,MACb,KAAK,QAAQ;AAAA,IACjB;AAEA,SAAK,QAAQ,iBAAiB,KAAK,cAAc,KAAK,IAAI,CAAC;AAE3D,SAAK,QAAQ,iBAAiB,KAAK,qBAAqB,KAAK,IAAI,CAAC;AAElE,SAAK,QAAQ,QAAQ,SAAS;AAAA,EACpC;AAAA,EAEQ,cAAc,YAAwB;AAE1C,0BAAI,oDAAoD;AAExD,QAAI,OAAO,IAAI,YAAAC,QAAK;AAEpB,SAAK,kBAAkB;AAAA,MACnB,KAAK,OAAO,QAAgB,MAAM,KAAK,OAAO,IAAI,GAAG;AAAA,MACrD,KAAK,OAAO,KAAa,UAAe,MAAM,KAAK,OAAO,IAAI,KAAK,KAAK;AAAA,IAC5E,CAAC;AAED,SAAK,kBAAkB,YAAY,KAAK,OAAO;AAE/C,SAAK,wBAAwB,KAAK,mBAAmB,KAAK,IAAI,CAAC;AAE/D,QAAI,KAAK,KAAK,MAAM;AACpB,QAAG,OAAO,OAAW,OAAM,IAAI,MAAM,sBAAsB;AAE3D,SAAK,MAAM,IAAI,IAAI,IAAI;AAEvB,SAAK,QAAQ,EAAE;AAAA,EACnB;AAAA,EAGQ,qBAAqB,YAAwB;AAEjD,QAAI,KAAK,WAAW,MAAM;AAC1B,QAAG,OAAO,OAAW,OAAM,IAAI,MAAM,sBAAsB;AAE3D,SAAK,MAAM,OAAO,EAAE;AAAA,EACxB;AAAA,EAEQ,mBAAmB,QAAgB,MAAY;AAGnD,QAAG,CAAC,OAAQ,OAAM,IAAI,MAAM,qBAAqB;AACjD,QAAI,KAAK,KAAK,MAAM;AACpB,QAAG,OAAO,OAAW,OAAM,IAAI,MAAM,sBAAsB;AAC3D,QAAG,KAAK,OAAO,KAAK,KAAK,QAAQ;AAC7B,WAAK,QAAQ,EAAE;AAAA,aACX,KAAK,OAAO;AAChB,WAAK,QAAQ,EAAE;AAAA;AAEf,YAAM,IAAI,MAAM,qBAAqB;AAAA,EAC7C;AAAA,EAEA,MAAa,QAAQ,UAAkB,IAAoB;AAEvD,QAAG,YAAY,IAAI;AACf,UAAIC,QAAO,KAAK,QAAQ,OAAO;AAE/B,gBAAM,6BAAe,MAAMA,MAAK,OAAO,GAAG,KAAK,KAAK,KAAK,GAAI,EAAE,MAAM,MAAM;AACvE,cAAM,IAAI,MAAM,6BAA6B,OAAO,cAAc;AAAA,MACtE,CAAC;AACD,aAAOA;AAAA,IACX;AAEA,QAAG,KAAK,MAAM,QAAQ;AAClB,4BAAI,yCAAyC;AAGjD,cAAM,6BAAe,MAAM,KAAK,MAAM,WAAW,GAAG,CAAC,EACpD,MAAM,MAAM;AAAE,YAAM,IAAI,MAAM,2CAA2C;AAAA,IAAE,CAAC;AAG7E,QAAI,OAAO,KAAK,MAAM,IAAI;AAC1B,QAAG,SAAS,KAAM,OAAM,IAAI,MAAM,cAAc;AAEhD,WAAO;AAAA,EACX;AAAA,EAEO,UAAS;AAEZ,WAAO,KAAK,MAAM,YAAY,EAAE,IAAI;AAAA,EACxC;AAAA,EAEO,eAAwB;AAE3B,WAAO,KAAK,MAAM,kBAAkB;AAAA,EACxC;AAAA,EAEO,eAAwB;AAE3B,WAAO,KAAK,MAAM,mBAAmB;AAAA,EACzC;AAAA,EAEA,MAAa,QAAQ,UAAgC;AACjD,QAAI,QAAQ,KAAK,MAAM,QAAQ;AAE/B,QAAI,WAAW,MAAM,IAAI,OAAO,SAAe;AAC3C,UAAG,KAAK,OAAO,EAAG,OAAM,KAAK,SAAS;AACtC,aAAO,SAAS,IAAI;AAAA,IACxB,CAAC;AAED,WAAO,QAAQ,IAAI,QAAQ;AAAA,EAC/B;AAAA,EAEA,MAAa,iBAAiB,UAA4B;AAEtD,WAAO,KAAK;AAAA,MACR,OAAO,SAAe,MAAM,KAAK,iBAAiB,QAAQ;AAAA,IAC9D;AAAA,EACJ;AAAA,EAEA,MAAa,WAAW,OAAe,IAAI,QAAgB,GAAG,WAAgB,CAAC,GAAG;AAE9E,QAAG,SAAS,GAAI,QAAO,UAAU,KAAK;AACtC,0BAAI,4CAA4C,MAAM,KAAK;AAC3D,SAAK,QAAQ,MAAM,MAAM;AAAA,MACrB,gBAAgB;AAAA,MAChB;AAAA,IACJ,CAAC;AAAA,EACL;AAAA,EAEA,MAAa,SAAS,SAAiB,IAAI;AAEvC,QAAG,KAAK,MAAM,QAAQ;AAClB,aAAO;AAEX,QAAI,OAAQ,WAAW,KACnB,KAAK,MAAM,UAAU,IACrB,KAAK,MAAM,OAAO,MAAM;AAC5B,QAAG,SAAS,QAAQ,SAAS;AACzB,YAAM,IAAI,MAAM,4CAA4C;AAEhE,UAAM,KAAK,KAAK;AAAA,EACpB;AAAA,EAEA,MAAa,UAAU,UAAkB,CAAC,GAAG;AAEzC,aAAQ,UAAU;AACd,YAAM,KAAK,SAAS,MAAM;AAAA,EAClC;AAAA,EAEO,eAAc;AAEnB,WAAO,KAAK,MAAM,gBAAgB;AAAA,EACpC;AAAA,EAEO,eAAc;AAEnB,WAAO,KAAK,MAAM,iBAAiB;AAAA,EACrC;AAAA,EAEO,WAAW;AAEd,WAAO,KAAK,MAAM,QAAQ;AAAA,EAC9B;AAAA,EAEO,WAAW;AACd,WAAO,KAAK,MAAM,KAAK;AAAA,EAC3B;AAAA,EAEO,eAAe;AACpB,WAAO,KAAK,MAAM,KAAK;AAAA,EACzB;AAAA,EAEO,QAAQ,QAAgB;AAE3B,QAAI,OAAO,KAAK,MAAM,IAAI,MAAM;AAChC,QAAG,SAAS,KAAM,OAAM,IAAI,MAAM,wCAAwC,MAAM,YAAY;AAC5F,WAAO;AAAA,EACX;AAAA,EAEO,eAAe;AAClB,QAAG,KAAK,YAAY,OAAW,OAAM,IAAI,MAAM,sBAAsB;AACrE,WAAO,KAAK,QAAQ,uBAAuB;AAAA,EAC/C;AAAA,EAEA,MAAa,uBAAuB,OAAe;AAC/C,QAAI,UAAU;AACd,cAAM,6BAAe,MAAM,KAAK,MAAM,KAAK,KAAK,OAAO,OAAO,EAC7D,MAAM,MAAM;AAAE,YAAM,IAAI,MAAM,cAAc,OAAO,sCAAsC;AAAA,IAAE,CAAC;AAC7F,WAAO;AAAA,EACX;AAAA,EAGA,MAAa,OAAO;AAEhB,WAAO,KAAK;AAAA,MACR,OAAO,SAAe,MAAM,KAAK,KAAK;AAAA,IAC1C;AAAA,EACJ;AAAA,EAEA,MAAc,UAAU,UAA+B;AAEnD,QAAI,QAAQ,KAAK,MAAM,QAAQ;AAE/B,QAAI,WAAW,MAAM;AAAA,MACjB,OAAO,SAAe,MAAM,SAAS,IAAI;AAAA,IAC7C;AAEA,WAAO,QAAQ,IAAI,QAAQ;AAAA,EAC/B;AAWF;AAEA,IAAO,sBAAQ;","names":["Cluster","Network","Node","node"]}