UNPKG

5.26 kBJavaScriptView Raw
1var url = require('url');
2var util = require('util');
3var ReadableStream = require('stream').Readable;
4var EventEmitter = require('events').EventEmitter;
5var rels = require('zetta-rels');
6var buildDeviceActions = require('./api_formats/siren/device.siren').buildActions;
7
8var VirtualStream = module.exports = function(topic, socket, options) {
9 ReadableStream.call(this, options);
10 this._topic = topic;
11 this._socket = socket;
12 this.listener = null;
13};
14util.inherits(VirtualStream, ReadableStream);
15
16VirtualStream.prototype._read = function(size) {
17 var self = this;
18
19 if(!this.listener) {
20 this.listener = function(data) {
21 if(!self.push(data)) {
22 self._socket.unsubscribe(self._topic, self.listener);
23 self.listener = null;
24 }
25 };
26 this._socket.subscribe(this._topic);
27 this._socket.on(this._topic, this.listener);
28 }
29};
30
31var VirtualDevice = module.exports = function(entity, peerSocket) {
32 var self = this;
33 this._socket = peerSocket;
34 this._update(entity);
35
36 this._eventEmitter = new EventEmitter();
37 this.on = this._eventEmitter.on.bind(this._eventEmitter);
38
39 var logTopic = this._getTopic(this._getLinkWithTitle('logs'));
40 this._socket.subscribe(logTopic, function() {
41 self._eventEmitter.emit('ready');
42 });
43
44 this._socket.on(logTopic, function(data) {
45 // Format data.actions to siren action format
46 data.actions = buildDeviceActions(data.properties.id, self._socket.ws._env, self._socket.ws._loader, data.transitions);
47 delete data.transitions;
48
49 self._update(data);
50 self._eventEmitter.emit(data.transition);
51 });
52
53 self._eventEmitter.on('zetta-device-destroy', function() {
54 self._eventEmitter.emit('remote-destroy', self);
55 self._eventEmitter.emit('destroy');
56 });
57
58 // setup streams
59 this.streams = {};
60
61 // add all object-stream
62 this._getLinksWithRel(rels.objectStream).forEach(function(monitor) {
63 var topic = self._getTopic(monitor);
64 // subscribe to topic
65 self._socket.subscribe(topic);
66 if(!self.streams[monitor.title]) {
67 self.streams[monitor.title] = new VirtualStream(self._getTopic(monitor), self._socket, { objectMode: true });
68 }
69 self._socket.on(topic, function(data) {
70 self[monitor.title] = data.data;
71 });
72 });
73
74 // add all binary-stream
75 this._getLinksWithRel(rels.binaryStream).forEach(function(monitor) {
76 var topic = self._getTopic(monitor);
77 if(!self.streams[monitor.title]) {
78 self.streams[monitor.title] = new VirtualStream(self._getTopic(monitor), self._socket, { objectMode: false });
79 }
80 self._socket.on(topic, function(data) {
81 self[monitor.title] = data.data;
82 });
83 });
84
85};
86
87VirtualDevice.prototype.createReadStream = function(name) {
88 var link = this._getLinkWithTitle(name);
89 return new VirtualStream(this._getTopic(link), this._socket, { objectMode: (link.rel.indexOf(rels.objectStream) > -1) });
90};
91
92VirtualDevice.prototype.call = function(/* transition, args, cb */) {
93 var self = this;
94 var args = Array.prototype.slice.call(arguments);
95 var transition = args[0];
96
97 var cb, transitionArgs;
98 if(typeof args[args.length - 1] === 'function') {
99 cb = args[args.length - 1];
100 transitionArgs = args.slice(1, args.length - 1);
101 } else {
102 transitionArgs = args.slice(1, args.length);
103 cb = function(err) {
104 if (err) {
105 throw err;
106 }
107 };
108 }
109
110 var action = this._getAction(transition);
111 if(!action) {
112 cb(new Error('Transition not available'));
113 return;
114 }
115
116 var actionArguments = this._encodeData(action, transitionArgs);
117
118 this._socket.transition(action, actionArguments, function(err, body) {
119 if(err) {
120 cb(err);
121 } else {
122 self._update(body);
123 cb();
124 }
125 });
126
127};
128
129VirtualDevice.prototype.available = function(transition) {
130 return !!this._getAction(transition);
131};
132
133VirtualDevice.prototype._encodeData = function(action, transitionArgs) {
134 var actionArguments = {};
135 action.fields.forEach(function(arg) {
136 if(arg.type === 'hidden') {
137 actionArguments[arg.name] = arg.value;
138 } else if(transitionArgs.length) {
139 actionArguments[arg.name] = transitionArgs.shift();
140 }
141 });
142
143 return actionArguments;
144};
145
146VirtualDevice.prototype._update = function(entity) {
147 var self = this;
148 Object.keys(entity.properties).forEach(function(prop) {
149 self[prop] = entity.properties[prop];
150 });
151 this._actions = entity.actions;
152
153 if(entity.links) {
154 this._links = entity.links;
155 }
156};
157
158VirtualDevice.prototype._getAction = function(name) {
159 var returnAction;
160 this._actions.some(function(action) {
161 if(action.name === name) {
162 returnAction = action;
163 return true;
164 }
165 });
166 return returnAction;
167};
168
169VirtualDevice.prototype._getLinkWithTitle = function(title) {
170 var returnLink;
171 this._links.some(function(link) {
172 if(link.title === title) {
173 returnLink = link;
174 return true;
175 }
176 });
177 return returnLink;
178};
179
180VirtualDevice.prototype._getTopic = function(link) {
181 var querystring = url.parse(link.href, true);
182 return querystring.query.topic;
183};
184
185VirtualDevice.prototype._getLinksWithRel = function(rel) {
186 var returnLinks = this._links.filter(function(link) {
187 return link.rel.indexOf(rel) !== -1;
188 });
189 return returnLinks;
190};