var _ = require('lodash');
var util = require('../util');
var RecordSet = require('./RecordSet');
var ResultSet = require('./ResultSet');
var Channel = require('./Channel');
/**
* A Stream object
* @constructor
* @augments Container
* @param {Object} obj An object with the Stream properties
*/
var Stream = function (data, serviceObject) {
this.data = {
name: null,
description: null,
type: null,
channels: {},
};
if(!serviceObject) {
throw new Error("Missing serviceObject parameter on Stream creation");
}
this.setServiceObject(serviceObject);
this.client = this.getContainer().client;
this.exportProperties();
if(data) this.parseJSON(data);
};
util.extends(Stream, 'Container');
Stream.prototype.validate = function () {
if(!this.name)
throw new Error("Stream name is required");
if(!this.channels || !_.size(this.channels))
throw new Error("At least one stream channel is required");
_.forEach(this.channels, function (channel) {
channel.validate();
});
};
/**
* @inheritdoc
*/
Stream.prototype.toJSON = function () {
var json = this.__super__.toJSON.call(this);
json.channels = {};
_.forEach(this.channels, function (channel) {
json.channels[channel.name] = channel.toJSON();
});
return json;
};
/**
* @inheritdoc
*/
Stream.prototype.parseJSON = function (data) {
var me = this;
var parseChannels = function (channelsData) {
_.forEach(channelsData, function (raw, channelName) {
if(typeof raw === 'string') {
raw = {
name: channelName,
type: raw
};
}
if(!raw.name && typeof channelName === "string")
raw.name = channelName;
var channel = new Channel(raw, me);
me.channels[channel.name] = channel;
});
};
if(data.name)
this.data.name = data.name;
if(data.description)
this.data.description = data.description;
if(data.type)
this.data.type = data.type;
if(data.channels) {
parseChannels(data.channels);
}
};
/**
* Create a pubsub subscription for the stream
* @return {Promise} A promise for the subscription object creation
*/
Stream.prototype.subscribe = function (fn) {
var topic = this.getServiceObject().id + '/streams/' + this.data.name + '/updates';
return this.client.subscribe(topic, fn).bind(this);
};
/**
* Remove a subscription for a stream
*
* @param {Function} fn Callback to be called when data is received
* @return {Stream} The current stream
*/
Stream.prototype.unsubscribe = function (fn) {
var topic = this.serviceObject.id + '/streams/' + this.data.name + '/updates';
return this.client.unsubscribe(topic, fn).bind(this);
};
/**
* Send data to a ServiceObject stream
*
* @return {Promise} Promise callback with result
*/
Stream.prototype.push = function (data, timestamp) {
var url = '/' + this.getServiceObject().id + '/streams/' + this.name;
var record = new RecordSet(data, this);
if(timestamp) record.setTimestamp(timestamp);
return this.client.put(url, record);
};
/**
* Retieve data from a ServiceObject stream
*
* @param {int} size optional, the number of elements to return
* @param {int} from optional, the first value to get from the list for paging
*
* @return {Promise} Promise callback with result
*/
Stream.prototype.pull = function (size, offset) {
var qs = util.createQueryString(size, offset);
var url = '/' + this.getServiceObject().id + '/streams/' + this.name + '/list' + qs;
return this.client.get(url, null).bind(this).then(function (res) {
return Promise.resolve(new ResultSet((res instanceof Array) ? res : [res], this));
});
};
/**
* Retieve last updated data from a ServiceObject stream
*
* @return {Promise} Promise callback with result
*/
Stream.prototype.lastUpdate = function () {
var url = '/' + this.getServiceObject().id + '/streams/' + this.name;
return this.client.get(url).bind(this).then(function (res) {
return Promise.resolve(new RecordSet(res, this));
});
};
/**
* Search data of a ServiceObject stream
*
* @param {Object} params search params
* @param {int} size optional, the number of elements to return
* @param {int} offset optional, the first value to get from the list for paging
*
* @return {Promise} Promise callback with result
*/
Stream.prototype.search = function (params, size, offset) {
if(!params) {
return Promise.reject(new Error("No params provided for search"));
}
var qs = util.createQueryString(size, offset);
var url = '/' + this.getServiceObject().id + '/streams/' + this.name + '/search' + qs;
var query = require('./search/parser').parse(params, this);
return this.getClient().post(url, query).bind(this).then(function (res) {
return Promise.resolve(new ResultSet(res, this));
});
};
/**
* Search data of a ServiceObject by distance from a point
*
* @param {Object} position An object representing a geo-position, eg `{ latitude: 123 , longitude: 321 }`
* @param {Number} distance The distance value
* @param {String} unit Optional unit, default to `km`
*
* @return {Promise} Promise callback with result
*/
Stream.prototype.searchByDistance = function (position, distance, unit) {
return this.search({
distance: {
position: position,
value: distance,
unit: unit
}
});
};
/**
* Search data of a ServiceObject in a Bounding Box
*
* @param {Array} bbox An array of 4 elements representing the bounding box, eg
* ```
* [
* upperLat, upperLng,
* bottomLat, bottomLng
* ]
* ```
* or an Array with 2 elements each one as an object eg
* ```
* [
* { latitude: 123, longitude: 321 }, // upper
* { latitude: 321, longitude: 123 } // bottom
* ]
* ```
*
* @return {Promise} Promise callback with result
*/
Stream.prototype.searchByBoundingBox = function (bbox) {
return this.search({
bbox: {
coords: bbox
}
});
};
/**
* Search text for a channel of a ServiceObject stream
*
* @param {String} channel The channel name where to search in
* @param {Number} string The string query to search for
*
* @return {Promise} Promise callback with result
*/
Stream.prototype.searchByText = function (channel, string) {
return this.search({
match: {
string: string,
channel: channel
}
});
};
/**
* Search data by the update time range of a ServiceObject stream
*
* @param {Object} params An object with at least one of `from` or `to` properties
*
* @return {Promise} Promise callback with result
*/
Stream.prototype.searchByTime = function (params) {
if(!(typeof params == "object" && (params.from || params.to))) {
params = {
from: util.parseDate(arguments[0]),
to: arguments[1] ? util.parseDate(arguments[1]) : null
};
}
return this.search({
time: params
});
};
/**
* Search data by a numeric value of a ServiceObject stream
*
* @param {String} channel Channel name to search for
* @param {Object} params An object with at least one of `from` or `to` properties
*
* @return {Promise} Promise callback with result
*/
Stream.prototype.searchByNumber = function (channel, params) {
if(typeof params !== 'object') {
params = {
from: arguments[1],
to: arguments[2]
};
}
params.channel = channel;
return this.search({
numeric: params
});
};
module.exports = Stream;