const WebSocket = require('ws')
const WebSocketServer = WebSocket.Server;
const http = require("http");
const WSS_PING_INTERVAL = 30000
/**
* The web socket manager handles the lifecycle of one or more websocket servers for communication with the HTTP client.
*
* There are at least two basic types of web socket services provided here:
* * sitewide session state - manages session awareness across multiple interfaces for a session
* * transition processing - handles message to do with transtions with respect to a sesson and its transition actions.
*
* Those sessions stored in `going_sessions`, send messages that wrap the message data and included the id made for the session
* by startup method in this class.
*
* These message have the following form:
*
```
let message = {
"ws_id" : ws_id,
"data" : data
}
```
*
* In these messages, the stringified data object in *data* is the data that is being relayed.
* Responses to the message should be keyed with the same id.
*
*
*
* @memberof base
*/
class WebSocketManager {
//
constructor() {
this.going_sessions = {}
this.checking_pings = false
this.supects = {}
this.going_ws_sessions = {} // map token to web socket
this.going_ws_sitewide_sessions = {} // map token to list of web socket
this.app_wss = false
this.ws_client_attempt_timeout = null
this.port_handlers = {
"ws_port" : (auth_wss) => { this.setup_sitewide_ws(auth_wss) } ,
"wss_app_port" : (app_wss) => { this.setup_app_ws(app_wss) }
}
}
/**
* This method takes in a configuration object which should have a `websocket` field.
* The `websocket` field in turn must have two fields:
*
* * port_names - a list of strings that name ports having meaning to the application.
* * defs - a map of the port names to port numbers.
*
* The constructor provides default port handlers, "ws_port", "wss_app_port". But, an application may add to these.
*
* This method add websocket servers for each port identified by `port_names`. After each port server is created,
* this method calls the port handler corresponding to the port name, passing the web socket server wrapper.
*
* The port handlers set up the web socket event handlers for fielding messages and errors.
*
* @param {object} conf
* @param {object} app
*/
initialize(conf,app) {
this.app = app
this.conf = conf
//
if ( conf && conf.websockets && conf.websocket.port_names ) {
let port_names = conf.websocket.port_names
if ( Array.isArray(port_names) && (typeof conf.websocket.defs === 'object') ) {
let defs = conf.websocket.defs
for ( let pname of port_names ) {
if ( defs[pname] !== undefined ) {
let ws = this.add_service(defs[pname])
if ( typeof this.port_handlers[pname] === 'function' ) {
this.port_handlers[pname](ws)
}
}
}
}
}
//
}
/**
*
* @param {object} trans_processor - an instance of the class
* @param {object} user_processor
* @param {object} mime_processor
*/
set_contractual_filters(trans_processor,user_processor,mime_processor) {
this.trans_processor = trans_processor
this.user_processor = user_processor
this.mime_processor = mime_processor
}
/**
* Calls the framework (node.js) createServer method and then creates a new WebSocket wrapper
* with the server as its parameter. Returns the web socket server wrapper.
*
* @param {number} port
* @returns {Object} - WebSocketServer - the web socket server wrapper.
*/
add_service(port) {
let server = http.createServer(this.app);
server.listen(port);
return new WebSocketServer({server: server});
}
/**
*
* This method handles the connection event. This method filters out messages that are not on th `/shared_auth`
* path of the web socket. If the connection is on the path, then it completes the connection setup
* setting up the handler for events: message, close, error.
*
* @param {object} auth_wss - a web socket handle
*/
setup_sitewide_ws(auth_wss) {
auth_wss.on("connection", (ws,req) => {
if ( req.url.indexOf("/shared_auth") > 0 ) {
this._setup_sitewide_ws(ws,this.auth_wss)
}
});
}
//
/**
* This method handles the connection event. This method filters out messages that are not on th `/shared_auth`
* path of the web socket. If the connection is on the path, then it completes the connection setup
* setting up the handler for events: message, close, error.
*
* Sets up a websocket that can be used the client to run messages through the tokenized transition framework.
*
* Access may be available to a transition engine that is customized to send data messages (in quasi real time) to
* the client interfaces.
*
* This method filters out messages that are not on th `transitional` path of the web socket.
*
* @param {object} app_wss - a web socket handle
*/
setup_app_ws(app_wss) {
app_wss.on("connection", (ws,req) => {
if ( req.url.indexOf("/transitional") > 0 ) {
this._setup_app_ws(ws,this.app_wss)
}
});
}
final() {
}
// ---- ---- ---- ---- ---- ---- ---- ---- ---- ---- ---- ---- ---- ---- ---- ---- ---- ----
// send_to_ws -- websocket wrapper
/**
* Given the websocket wrapper with the `send` method, this send data to the connected client.
* The data must be stringifiable.
*
* @param {object} ws
* @param {object} data
*/
send_to_ws(ws,data) {
if ( ws ) {
try {
let message = JSON.stringify(data)
ws.send(message);
} catch (e) {}
}
}
// APPLICATION WS (COM EXCHAGE)
/**
*
* Sets up a websocket message event handler that can be used the client to run messages through the tokenized transition framework.
*
* The message handler parses the body data, which should be a JSON parseable string. This is parsed into the body of the message.
*
* The body shoud have the following fields:
*
* * message - The message is itself an object with application defined fields. One required field will be `server_id`.
* * transition - The name of the transition.
* * ping_id - for ping message
*
* The message handler checks to see if the `server_id` field on the message.
* If the id is not on the object, this method attempts to treat the message as a
* ping/pong message. If the field `server_id` is on the message, then this method calls upon the transition processor's
* method, `ws_transition`, which performs similar operations to the HTTP path handler.
*
* The `close` event handler is also set up. This calls `close_wss_session` which remove the descriptor from the `going_sessions`
* table.
*
*
* @param {object} ws
* @param {object} app_wss
*/
_setup_app_ws(ws,app_wss) {
if ( !app_wss ) return
//
this.app_wss = app_wss
this.add_ws_session(ws)
ws.on("message", async (data) => {
try {
let body = JSON.parse(data.toString());
//
let server_id = body.message ? body.message.server_id : false
if ( server_id ) { // transitional
let transition = body.transition
this.trans_processor.ws_transition(transition,body.message)
} else {
let ping_id = body.ping_id
if ( ping_id ) {
let response = await this.ponged(ws)
this.send_to_ws(ws,response)
}
}
} catch (e) {}
});
ws.on("close", () => {
this.close_wss_session(ws)
});
}
// SITEWIDE WS (COM EXCHAGE)
/**
*
* @param {object} ws - web socket connection
* @param {object} auth_wss
*/
_setup_sitewide_ws(ws,auth_wss) {
ws.on("message",() => { this.ws_sitewide_message_handler(ws) }) // data parameter implicit
ws.on("close",() => { this.ws_shutdown(ws) })
}
/**
* Takes the message data and parses as JSON.
*
* If the object produced has a `ping_id`, this method responds to the message by calling `ponged`.
*
* The sitewide websocket server locates the websocket session descriptor by using a token field on th message body.
* The body must also have an `action` field. This method provides to action handlers, *setup* and *logout*.
*
* The *setup* pathway adds the web socket server wrapper to the `going_ws_sitewide_sessions[token]` array.
* This method will create the array the first time the token is seen. Ideally, the token will be a session identifier (token),
* and will keep connections to the websocket clients.
*
* In a browser, different web socket connections for a single session would be the result of
* opening a number of tabs and windows belonging to the session. When the session is ended,
* one window or tab will initiate the logout and the rest of the interfaces will receive the *logout* message
* from this handler.
*
* @param {Buffer} data
* @param {object} ws
*/
ws_sitewide_message_handler(data,ws) {
try {
let body = JSON.parse(data.toString());
let ping_id = body.ping_id
if ( ping_id ) {
this.ponged(ws)
return
}
// else not a ping
let token = body.token
if ( token ) {
switch ( body.action ) {
case 'setup' : {
let ws_data = this.going_ws_sitewide_sessions[token]
if ( ws_data === undefined ) {
this.going_ws_sitewide_sessions[token] = [ ws ]
} else {
let ws_list = this.going_ws_sitewide_sessions[token]
if ( ws_list.indexOf(ws) < 0 ) {
ws_list.push(ws)
}
}
break;
}
case 'logout' : {
let ws_list = this.going_ws_sitewide_sessions[token]
let command = {
'action' : 'logout',
'token' : token
}
ws_list.forEach(ws => {
this.send_to_ws(ws,command)
})
break;
}
}
}
} catch(e) {
}
}
//
/**
*
* @param {object} ws - web socket server
*/
ws_shutdown(ws) {
let token = null
for ( let tk in going_ws_sessions ) {
let ws_dex = going_ws_sessions[tk].indexOf(ws)
if ( ws_dex >= 0 ) {
token = tk
going_ws_sessions[token].splice(ws_dex,1)
break
}
}
if ( token && (going_ws_sessions[token].length == 0) ) {
delete going_ws_sessions[token]
}
}
/**
*
* @param {string} proc_ws_token
* @param {object} sitewide_socket
*/
ws_connection_attempt(proc_ws_token,sitewide_socket) {
if ( !sitewide_socket ) {
console.log(new Error("ERROR: sitewide_socket not available in ws_connection_attempt ... general transition engine "))
process.exit(1)
}
try {
// setup a webSocket connection to get finalization data on logging in. --
sitewide_socket.on('error',(e) => {
sitewide_socket = null
console.log(e.message)
if ( e.code == 'ECONNREFUSED'|| e.message.indexOf('502') > 0 ) {
console.log("try again in 1 seconds")
let try_again = () => { this.ws_connection_attempt(proc_ws_token,sitewide_socket) }
this.ws_client_attempt_timeout = setTimeout(try_again,1000)
} else {
console.dir(e)
}
})
sitewide_socket.on('open', () => {
//
console.log("web sockets connected")
if ( this.ws_client_attempt_timeout !== null ) {
clearTimeout(this.ws_client_attempt_timeout)
this.ws_client_attempt_timeout = null
}
//
let msg = {
'token' : proc_ws_token,
'action' : 'setup'
}
sitewide_socket.send(JSON.stringify(msg))
//
})
sitewide_socket.onmessage = async (event) => { // handle the finalization through the websocket
try {
let handler = JSON.parse(event.data)
if ( handler.data && (handler.data.type === 'ping') ) {
if ( sitewide_socket ) {
let ponger = {
"ping_id" : handler.data.ping_id,
"time" : Date.now()
}
sitewide_socket.send(JSON.stringify(ponger))
}
} else {
if ( (handler.token === proc_ws_token) && (this.user_processor && this.user_processor.sitewide_logout)) {
if ( handler.action === "logout" ) {
let t_obj = await this.user_processor.sitewide_logout(handler)
if ( sitewide_socket && t_obj ) {
sitewide_socket.send(JSON.stringify(t_obj)) // send all connected windows the logout
}
} else {
try {
eval(handler.action)
} catch (e_eval) {
}
}
}
}
} catch (e) {
}
}
//
} catch(e) {
console.log(e.message)
console.dir(e)
process.exit(1)
}
}
/**
* This method is a wrapper of the method `send_to_ws`.
* This method pertains to transtion handling, and so it requires the transition
* token to find the websocket connection wrapper. It sends any data that can be
* stringified.
*
* The *outofband* part of the name usually refers to the situation in data is sent
* at times other than when a transtion action is being performed or before the action
* has been completed.
*
* @param {string} token_key - tansition token
* @param {object} data
*/
send_ws_outofband(token_key,data) {
if ( token_key ) {
let ws = this.going_ws_sessions[token_key]
if ( ws ) {
this.send_to_ws(ws,data)
}
}
}
/**
* Called by _setup_app_ws.
*
* This method creates an id for the websocket connection and maps the session into `going_sessions`.
* This method then sends a message to the client indicating that the connection has taken place.
*
*
*
* @param {object} ws - web socket server
*/
add_ws_session(ws) {
ws._app_x_isAlive = true;
let ws_id = global_appwide_token()
this.going_sessions[ws_id] = ws
ws._app_x_ws_id = ws_id
this.send_ws(ws_id,{ "status" : "connected", "type" : "ws_id" })
//
if ( !(this.checking_pings) ) {
this.start_checking_pings()
}
}
/**
* Send a ping message to a single client.
*
* This adds a field to the ws, the socket connection wrapper. The field is `_app_x_isAlive`.
* A message containing the `_app_x_ws_id` is sent to the client. `_app_x_ws_id` is another
* field added to the socket connection wrapper and is used to find the connection wrapper in the
* `going_sessions` table belonging to this class.
*
*
*
* @param {object} ws - web socket server
*/
ping(ws) { // send a message to a client to see if it is up
ws._app_x_isAlive = false;
let data = {
"data" : { "type" : "ping", "ping_id" : ws._app_x_ws_id },
"time" : Date.now(),
}
ws.send(JSON.stringify(data));
}
/**
* This method must called in response to the ping call in order for the websocket connection to be considered
* to be alive.
*
* @param {object} ws - web socket server
*/
ponged(ws) {
ws._app_x_isAlive = true;
if ( this.supects[ws._app_x_ws_id] ) {
delete this.supects[ws._app_x_ws_id]
}
}
/**
* Closes out a web socket connection.
*
* @param {object} ws - web socket client connection
* @returns {boolean}
*/
close_wss_session(ws) {
let result = false
if ( ws ) {
let ws_id = ws._app_x_ws_id
try {
result = ws.close()
} catch (e) {
}
if ( ws_id && this.going_sessions[ws_id]) delete this.going_sessions[ws_id]
}
return result
}
/**
* Send a message on the identified web sockets.
*
* This method creates a wrapper for the actual data being sent and attach it to the data field of the wrapper.
* It also puts the id of the wrapper and assigns it as the value to the `ws_id` field of the message.
*
* @param {string} ws_id
* @param {object} data
*/
send_ws(ws_id,data) {
let ws = this.going_sessions[ws_id]
if ( ws ) {
let message = {
"ws_id" : ws_id,
"data" : data
}
this.send_to_ws(ws,message)
}
}
/**
* Pings the clients to see if they will respond.
*
* @param {Function} caller
*/
do_pings(caller) {
for ( let key in this.going_sessions ) {
let ws = this.going_sessions[key]
if ( !(ws._app_x_isAlive) ) {
this.supects[ws._app_x_ws_id] = ws
} else {
this.ping(ws)
}
}
setTimeout(() => { caller.do_pings(caller) },WSS_PING_INTERVAL)
}
/**
* Closes off client connections that are not responding to pings.
*/
close_suspects() {
for ( let wsid in this.supects ) {
let ws = this.supects[wsid]
delete this.supects[wsid]
this.close_wss_session(ws)
}
}
/**
* Called by `add_ws_session` if it has not yet been called.
* It is possible it will be called again if at some point it was turned off and if
* `add_ws_session` is called.
*
*/
start_checking_pings() {
this.checking_pings = true
let self = this
setTimeout(() => { self.do_pings(self) },WSS_PING_INTERVAL)
setTimeout(() => { self.close_suspects() },WSS_PING_INTERVAL*2)
}
}
module.exports = WebSocketManager