UNPKG

12.4 kBJavaScriptView Raw
1"use strict";var __extends=this&&this.__extends||function(){var r=Object.setPrototypeOf||{__proto__:[]}instanceof Array&&function(e,s){e.__proto__=s}||function(e,s){for(var t in s)s.hasOwnProperty(t)&&(e[t]=s[t])};return function(e,s){function t(){this.constructor=e}r(e,s),e.prototype=null===s?Object.create(s):(t.prototype=s.prototype,new t)}}(),__decorate=this&&this.__decorate||function(e,s,t,r){var a,o=arguments.length,n=o<3?s:null===r?r=Object.getOwnPropertyDescriptor(s,t):r;if("object"==typeof Reflect&&"function"==typeof Reflect.decorate)n=Reflect.decorate(e,s,t,r);else for(var i=e.length-1;0<=i;i--)(a=e[i])&&(n=(o<3?a(n):3<o?a(s,t,n):a(s,t))||n);return 3<o&&n&&Object.defineProperty(s,t,n),n},__metadata=this&&this.__metadata||function(e,s){if("object"==typeof Reflect&&"function"==typeof Reflect.metadata)return Reflect.metadata(e,s)},__param=this&&this.__param||function(t,r){return function(e,s){r(e,s,t)}};Object.defineProperty(exports,"__esModule",{value:!0});var D=require("@jayesol/jayeson.lib.delivery"),Collections=require("typescript-collections"),core_1=require("./core"),message_class_1=require("./message_class"),merge_1=require("./merge"),data_structure_1=require("./data_structure"),circular=require("circular-json");require("reflect-metadata");var injection_js_1=require("injection-js"),codec_1=require("./codec"),InEndPointEventHandler=function(){function e(e,s,t){this._streamSources={},this._filterData=null,this._startMessage=e,this._sportsMsgGrp=s,this._streamNameCode=t,this._exclude=e.exclude,this._wireFormat=e.wireFormat}return e.prototype.setfilterData=function(e){this._filterData=e},e.prototype.streamSources=function(){return this._streamSources},e.prototype.exclude=function(){return this._exclude},e.prototype.wireFormat=function(){return this._wireFormat},e.prototype.startMessage=function(){return this._startMessage},e.prototype.sportsMsgGrp=function(){return this._sportsMsgGrp},e.prototype.streamNameCode=function(){return this._streamNameCode},e.prototype.filterData=function(){return this._filterData},e.prototype.onEvent=function(e){if(e instanceof D.Subscriber.Event.ConsumptionStart){for(var s in e.streams)if(e.streams.hasOwnProperty(s))for(var t=0;t<e.streams[s].length;t++){if(parseInt(s)==this.sportsMsgGrp().id()){var r=e.streams[s][t];this._streamSources[r]=e.endPoint;var a=new codec_1.FilterRequest(codec_1.FilterRequest.IGNORED_REQUEST_ID,this.filterData());this.sendFilter(a,r),console.log("Sending Start Message to stream : '"+r+"'");var o=new D.MessageWrapper(this._startMessage,this._sportsMsgGrp.ADMIN_START_SUBSCRIBE_FEED());this.sendMessage(r,o)}}}else if(e instanceof D.Subscriber.Event.ConsumptionError){var n=e.client.config.url+" has errors on consumption: ";for(var s in e.streams)if(e.streams.hasOwnProperty(s)){n+="\n"+s+":[";for(t=0;t<e.streams[s].length;t++)n+=e.streams[s][t]+",";n=n.substring(1,n.length-1),n+="]"}console.log(n)}else console.log("Unidentified event received by onEvent: "+circular.stringify(e))},e.prototype.sendMessage=function(e,s){s.addMetaInfo(this._streamNameCode.code(),e);var t=this._streamSources[e];null==t?console.log("No Endpoint present corresponding to stream '"+e+"'. Unable to send message"):t.send(s)},e.prototype.updateFilter=function(e){this._filterData=e.getFilterData(),this.sendFilter(e)},e.prototype.sendFilter=function(e,s){if(void 0===s&&(s=null),null!==s)this._sendFilter(e,s);else for(var t in this._streamSources)this._streamSources.hasOwnProperty(t)&&this._sendFilter(e,t)},e.prototype._sendFilter=function(e,s){var t=e.getFilterData(),r=null;null!=t?(r=new D.MessageWrapper(e,this._sportsMsgGrp.FILTER_SET()),console.log("Sending FILTER_SET to stream: '"+s+"' with filter data: "+JSON.stringify(t))):(r=new D.MessageWrapper(e.getRequestId().toString(),this._sportsMsgGrp.FILTER_REMOVE()),console.log("Sending FILTER_REMOVE to stream: '"+s+"'")),this.sendMessage(s,r)},e=__decorate([injection_js_1.Injectable(),__metadata("design:paramtypes",[message_class_1.StartSubscribeFeed,message_class_1.SportsFeedMessageGroup,D.StreamNameCode])],e)}();exports.InEndPointEventHandler=InEndPointEventHandler;var StreamCommandProcessor=function(){function e(e,s){this._fsRepo=e,this._sportsGrp=s,this.messageClassName={0:"DATA_RESET",1:"DATA_INSERT_ODD",2:"DATA_UPDATE_ODD",3:"DATA_DELETE_ODD",4:"DATA_INSERT_EVENT",5:"DATA_UPDATE_EVENT",6:"DATA_DELETE_EVENT",7:"DATA_INSERT_MATCH",8:"DATA_UPDATE_MATCH",9:"DATA_DELETE_MATCH",10:"TTL_REMOVE",11:"TTL_RESTORE",13:"FILTER_REMOVE",14:"FILTER_SET",16:"ADMIN_START_SUBSCRIBE_FEED",17:"ADMIN_REFRESH"}}return e.prototype.process=function(e,s,t){if(void 0===t&&(t=null),null!=s&&s instanceof data_structure_1.Incoming){var r=s,a=this.messageClassName[r.msgType().id()];console.debug("SCP processing: "+a)}if(null==t)for(var o=0,n=this._fsRepo.appendSnapshot(e,s);o<n.length;o++){var i=n[o];this._fsRepo.push(i)}else this._fsRepo.push(t)},e.prototype.fsRepo=function(){return this._fsRepo},e.prototype.sportsGrp=function(){return this._sportsGrp},e.prototype.printMessage=function(e){var s=this.getClassName(e),t=e.delta();switch(this.printPartitionKeys(s,t),e.msgType().id()){case this._sportsGrp.DATA_INSERT_MATCH().id():case this._sportsGrp.DATA_UPDATE_MATCH().id():case this._sportsGrp.DATA_DELETE_MATCH().id():this.printMatch(s,t);break;case this._sportsGrp.DATA_INSERT_ODD().id():case this._sportsGrp.DATA_UPDATE_ODD().id():case this._sportsGrp.DATA_DELETE_ODD().id():this.printRecord(s,t)}},e.prototype.getClassName=function(e){return e instanceof core_1.TTLOutgoing?e.getTtlType()==core_1.TTLType.REMOVE?"TTL_REM":"TTL_RES":this.messageClassName[e.msgType().id()]},e.prototype.printMatch=function(e,s){for(var t=s.matches(),r=0;r<t.length;r++)console.log("["+e+"] "+t[r].id())},e.prototype.printRecord=function(e,s){for(var t=s.matches(),r=0;r<t.length;r++)for(var a=t[r].events(),o=0;o<a.length;o++)for(var n=a[o].records(),i=0;i<n.length;i++){var c=n[i];console.log("["+e+"] "+c.matchId()+"_"+c.eventId()+"_"+c.id()+"_"+c.source()+"_"+c.oddType()+"_"+c.oddFormat())}},e.prototype.printPartitionKeys=function(e,s){var t=s.getPartitions().toString();console.log("["+e+"][PK]"+t)},e=__decorate([injection_js_1.Injectable(),__param(0,injection_js_1.Inject(core_1.FSREPO_IMPL)),__metadata("design:paramtypes",[Object,message_class_1.SportsFeedMessageGroup])],e)}();exports.StreamCommandProcessor=StreamCommandProcessor;var SportsFeedInProcessor=function(o){function e(e,s,t,r){var a=o.call(this,s)||this;return a.scp=e,a.sportsFeedMsgGroup=s,a.snCode=t,a.recycleBin=r,a}return __extends(e,o),e.prototype.process=function(e){var s=e.getMetaInfo(this.snCode.code());if(null!=s&&""!==s)if(this.isFeedMessage(e))if(this.sportsFeedMsgGroup.isIndicatorMessage(e.messageClass)){var t=this.processIndicatorMessage(e);this.scp.process(s,null,t)}else{var r=new MergeableIncoming(this.sportsFeedMsgGroup,e.messageClass,s,e.message,this.recycleBin);this.triggerTtlRestoreIfApplicable(r),this.scp.process(r.stream(),r)}else console.log("SportsFeedInProcessor received unknown messageclass: "+circular.stringify(e.messageClass)+". Ignoring");else console.log("SportsFeedInProcessor received messagewrapper with no StreamName. Ignoring.")},e.prototype.processIndicatorMessage=function(e){var s=this;return e.messageClass==this.sportsFeedMsgGroup.TTL_RESTORE_START()&&e.message.getPartitions().forEach(function(e){s.recycleBin.clearBin(e)}),new data_structure_1.OutgoingImpl(e.messageClass,e.message)},e.prototype.triggerTtlRestoreIfApplicable=function(e){if(null!=this.getRecycleBin()&&null!=this.getRecycleBin().getTtlConfig()&&this.getRecycleBin().getTtlConfig().isEnableTtl()){var s=this.getRecycleBin().getTtlRestoreSnapshot(e);null!=s&&this.scp.process(s.getStream(),s)}},e.prototype.triggerTtlRemove=function(){var s=this;null!=this.getRecycleBin()&&this.getRecycleBin().getTtlRemoveSnapshot().forEach(function(e){s.scp.process(e.getStream(),e)})},e.prototype.startTtlVerification=function(){var e=this;if(null!=this.getRecycleBin()){var s=this.getRecycleBin().getTtlConfig();if(null!=s&&s.isEnableTtl()){var t=s.getRunInterval();setInterval(function(){return e.triggerTtlRemove()},t),console.log("Scheduled TTLCheck with interval of "+t)}else console.log("No TTL set. Not scheduling any TTL.")}},e.prototype.isFeedMessage=function(e){var s=e.messageClass;return s instanceof message_class_1.InsertOddMessageClass||s instanceof message_class_1.UpdateOddMessageClass||s instanceof message_class_1.DeleteOddMessageClass||s instanceof message_class_1.InsertEventMessageClass||s instanceof message_class_1.UpdateEventMessageClass||s instanceof message_class_1.DeleteEventMessageClass||s instanceof message_class_1.InsertMatchMessageClass||s instanceof message_class_1.UpdateMatchMessageClass||s instanceof message_class_1.DeleteMatchMessageClass||s instanceof message_class_1.RefreshMessageClass||s instanceof message_class_1.TTLRemoveMessageClass||s instanceof message_class_1.TTLRestoreMessageClass||s instanceof message_class_1.ResetMessageClass||s instanceof message_class_1.SwitchFilterStartMessageClass||s instanceof message_class_1.SwitchFilterFailMessageClass||s==this.sportsFeedMsgGroup.SWITCH_FILTER_START()||s==this.sportsFeedMsgGroup.SWITCH_FILTER_END()||s==this.sportsFeedMsgGroup.FULLSNAPSHOT_START()||s==this.sportsFeedMsgGroup.FULLSNAPSHOT_END()||s==this.sportsFeedMsgGroup.SWITCH_FILTER_FAIL()||s==this.sportsFeedMsgGroup.TTL_RESTORE_START()||s==this.sportsFeedMsgGroup.TTL_RESTORE_END()},e.prototype.getScp=function(){return this.scp},e.prototype.getRecycleBin=function(){return this.recycleBin},e=__decorate([injection_js_1.Injectable(),__metadata("design:paramtypes",[StreamCommandProcessor,message_class_1.SportsFeedMessageGroup,D.StreamNameCode,core_1.RecycleBin])],e)}(D.IMessageGroupProcessor);exports.SportsFeedInProcessor=SportsFeedInProcessor;var MergeableIncoming=function(n){function e(e,s,t,r,a){var o=n.call(this,s,t,r)||this;return o.sportsGrp=e,o.bin=a,o}return __extends(e,n),e.prototype.apply=function(e){null==e&&(e=data_structure_1.IndexedSnapshotImpl.EMPTY_SNAPSHOT);var s=[];this.beforeSnap=e;var t=new data_structure_1.MergeableWrapper;if(this.msgType()instanceof message_class_1.InsertMatchMessageClass||this.msgType()instanceof message_class_1.InsertEventMessageClass||this.msgType()instanceof message_class_1.DeleteMatchMessageClass||this.msgType()instanceof message_class_1.DeleteEventMessageClass||this.msgType()instanceof message_class_1.UpdateEventMessageClass){for(var r=merge_1.SnapshotUtil.combineSnapshots(this.msgType(),this.data(),e),a=0,o=merge_1.DeltaTransformingLogicImpl.transform(this.sportsGrp,this.stream(),this.msgType(),this.data(),r);a<o.length;a++){var n=o[a],i=new core_1.Delta(n,r.getAfterSs(),this.beforeSnap);s.push(i),this.beforeSnap=r.getAfterSs()}t.setAfter(r.getAfterSs()),t.setDeltaOut(s)}else if(this.msgType()instanceof message_class_1.TTLRemoveMessageClass){for(var c=null,l=e,p=new Collections.Dictionary,g=[],_=!1,u=e.getPartitionMap(),f=0,h=this.data().getPartitions().toArray();f<h.length;f++){var m=h[f];console.log("removing keys from upstream "+JSON.stringify(m)),u.containsKey(m)&&(_=!0,p.setValue(m,u.getValue(m)),l=(c=this.bin.removeData(new data_structure_1.TTLWrapper,l,m)).getRemainingSs(),g.push(c.getRemovedSs()))}_?(console.log("reseting -----\x3e"+JSON.stringify(p.keys())),t=merge_1.DeltaTransformingLogicImpl.transformTTLRemove(g,this.bin.getGrp(),this.stream(),e,l,p)):(t.setDeltaOut([]),t.setAfter(e))}else if(this.msgType()instanceof message_class_1.TTLRestoreMessageClass){c=null,l=e;for(var d=new Collections.Dictionary,T=!1,y={},E=[],S=0,M=this.data().getPartitions().toArray();S<M.length;S++){m=M[S];if(console.log("restoring keys from upstream "+JSON.stringify(m)),this.bin.containData(m)){T=!0;var R=(new Date).getTime();d.setValue(m,R),y=this.bin.copyData(m,y),l=(c=this.bin.restoreData(new data_structure_1.TTLWrapper,l,m,R)).getRemainingSs(),E.push(c.getRestoredSs())}}T?(console.log("restoring -----\x3e"+JSON.stringify(d.keys())),t=merge_1.DeltaTransformingLogicImpl.transformTTLRestore(E,this.bin.getGrp(),this.stream(),e,l,y,d)):(t.setAfter(e),t.setDeltaOut([]))}else{r=merge_1.SnapshotUtil.combineSnapshots(this.msgType(),this.data(),e),i=new core_1.Delta(this,r.getAfterSs(),e);s.push(i),t.setDeltaOut(s),t.setAfter(r.getAfterSs())}return t},e}(data_structure_1.Incoming);exports.MergeableIncoming=MergeableIncoming;
\No newline at end of file