UNPKG

6.22 kBJavaScriptView Raw
1var cluster = require ('cluster'),
2 WebSocket = require ('ws'),
3 uuid = require ('node-uuid'),
4 util = require ('util'),
5 EventEmitter = require ('events').EventEmitter;
6
7var data = require ('./test-data.json');
8
9var testConf = {
10 threads: 4,
11 connPerThread: 1000,
12 instruments: 1000,
13};
14
15function toJSONStingArray(orderArray) {
16 var ret = [];
17 for(var i = 0; i < orderArray.length; i++)
18 ret[i] = JSON.stringify(orderArray[i]);
19 return ret;
20}
21
22function launch () {
23 if (cluster.isMaster) {
24 for (var i = 0; i < testConf.threads; i++) {
25 cluster.fork();
26 }
27 } else if (cluster.isWorker) {
28 console.log ('I am worker #' + cluster.worker.id);
29
30 var instruments = prepareInstruments (testConf.instruments);
31
32 var wsc = new wsClient (cluster);
33
34 wsc.registerAll (instruments);
35
36 wsc.on ('register', function (err) {
37 if (err) {
38 // TODO: pass error to the script sute
39 return;
40 }
41 // TODO: pass ok to the script suite
42 wsc.executeAll (testConf.connPerThread);
43 });
44
45 wsc.on ('execute-connection', function (err) {
46 if (err) {
47 // TODO: pass error to the script sute
48 return;
49 }
50 // TODO: pass ok to the script suite
51 });
52
53 setTimeout (function () {
54
55 }, 1000);
56 }
57}
58
59function prepareInstruments (instrumentCount) {
60 var instruments = [];
61
62 for (var k = 0; k < instrumentCount; k++) {
63 instruments[k] = new Buffer (uuid.v1(null, [])).toString('base64');
64 }
65
66 return instruments;
67}
68
69/**
70 * class for ws requests
71 */
72function wsClient (cluster) {
73 this.cluster = cluster;
74}
75
76util.inherits (wsClient, EventEmitter);
77
78wsClient.prototype.registerAll = function (instruments) {
79 var socketRegister = new WebSocket('ws://127.0.0.1:8080/library/text/register');
80
81 // here we start to send messages
82 socketRegister.on ('open', function() {
83 // actually we're trying to register instruments in parallel from every cluster worker
84 // ???
85 console.time ("register time "+this.cluster.worker.id);
86 for (var k = 0; k < instruments.length; k++) {
87 // console.log('register ' + instruments[k]);
88 socketRegister.send (instruments[k]);
89 }
90 console.log("register: " + this.cluster.worker.id + ": " + instruments.length + ' messages was sent');
91 }.bind (this));
92
93 var l = 0;
94
95 // this is called on every received message
96 socketRegister.on('message', function(data, flags) {
97 // TODO: check that we actually get all instruments registered
98 // maybe using data?
99 l++;
100 if (l == instruments.length) {
101 console.timeEnd("register time "+this.cluster.worker.id);
102 console.log("register: " + this.cluster.worker.id + ": all messages was delivered.");
103 this.emit ('register', null);
104 }
105 }.bind (this));
106
107 // if we don't have confirmations for every single instrument?
108 setTimeout (function () {
109 // TODO: set test to fail
110 this.emit ('register', true);
111 }.bind (this), 1000);
112}
113
114wsClient.prototype.execute = function (connId) {
115 var execute = new WebSocket ('ws://127.0.0.1:8080/library/text/execute');
116 var m = 0;
117 var failure = null;
118
119 // Ensure real instrumentId is properly escaped
120 var sellObject =
121 {id: "9g8G2mofRRiexY+b6bCYeg==", instruction: "sell",type: "limit", attributes:["AON"], price: 1000000,quantity: 1100000, instrumentId: "!!!SPLIT!!!"};
122 var sellData = JSON.stringify (sellObject).split ('"!!!SPLIT!!!"');
123 var buyObject = [
124 {id: "dYo+hyMASEiCnT5+RVLWAw==", instruction: "buy", type: "limit", attributes:["AON"], price: 100000, quantity: 110000, instrumentId: "!!!SPLIT!!!"},
125 {id: "KjGjpMmDQ4WaHwLg3w6prQ==", instruction: "buy", type: "limit", attributes:["AON"], price: 100000, quantity: 110000, instrumentId: "!!!SPLIT!!!"},
126 {id: "zRDnn1ELRU2oBvLUIvjf8g==", instruction: "buy", type: "limit", attributes:["AON"], price: 100000, quantity: 110000, instrumentId: "!!!SPLIT!!!"},
127 {id: "2LNF0tA6QbqDSMi1HcTBgA==", instruction: "buy", type: "limit", attributes:["AON"], price: 100000, quantity: 110000, instrumentId: "!!!SPLIT!!!"},
128 {id: "bXHhJts9TtCP/ADcpZOYEg==", instruction: "buy", type: "limit", attributes:["AON"], price: 100000, quantity: 110000, instrumentId: "!!!SPLIT!!!"},
129 {id: "Xj1k+TVlQweEzU5hB1M8ug==", instruction: "buy", type: "limit", attributes:["AON"], price: 100000, quantity: 110000, instrumentId: "!!!SPLIT!!!"},
130 {id: "UhnSakG3RCuSeQuFcY+y4g==", instruction: "buy", type: "limit", attributes:["AON"], price: 100000, quantity: 110000, instrumentId: "!!!SPLIT!!!"},
131 {id: "jf1xQQEoS6ifvMC7uk08eQ==", instruction: "buy", type: "limit", attributes:["AON"], price: 100000, quantity: 110000, instrumentId: "!!!SPLIT!!!"},
132 {id: "2LKzvosFSzSQKYkgk0iEuA==", instruction: "buy", type: "limit", attributes:["AON"], price: 100000, quantity: 110000, instrumentId: "!!!SPLIT!!!"},
133 {id: "+Nm5lkZeRjuiha0KrZHcEg==", instruction: "buy", type: "limit", attributes:["AON"], price: 100000, quantity: 110000, instrumentId: "!!!SPLIT!!!"}
134 ];
135 var buyData = buyObject.map (function (o) {return JSON.stringify (o).split ('"!!!SPLIT!!!"')});
136
137 execute.on ('message', function (data, flags) {
138 // console.log('data', data);
139
140 if (
141 data === "null"
142 || data.length === 0
143 || data === "[]"
144 || JSON.parse(data)[0].message !== 'accept'
145 ) {
146 failure = true;
147 // throw new Error("FAILED: execute: data=" + data);
148 }
149
150 m++;
151
152 if (m == K * 10) {
153 console.timeEnd("execute time " + this.cluster.worker.id);
154 console.log("execute: " + this.cluster.worker.id + ": all messages was delivered.");
155 this.emit ('execute-connection', failure);
156 }
157
158 }.bind (this));
159
160 execute.on ('open', function() {
161 console.time("execute time " + this.cluster.worker.id);
162 for (var k = 0; k < K; k++) {
163 var instrumentId = instruments[k];
164 // console.log("execute sell: " + sellData);
165 execute.send(sellData[0] + instrumentId + sellData[1]);
166 for (var i = 0; i < buyData.length; i++) {
167 // console.log("execute buy: " + b);
168 execute.send(buyData[i][0] + instrumentId + buyData[i][1]);
169 }
170 }
171 console.log("execute: " + cluster.worker.id + ": " + (K * buyData.length) + ' messages was sended');
172 }.bind (this));
173}
174
175wsClient.prototype.executeAll = function (connections) {
176 for (var connId = 0; connId < connections; connId ++) {
177 // create as much concurrent connections as you can
178 this.execute (connId);
179 }
180
181}