UNPKG

2.37 kBJavaScriptView Raw
1
2var d = require("debug")("raptorjs:stream")
3var Base = require("./Base")
4var Record = require("./model/Record")
5var util = require("./util")
6var Pager = require("./pager")
7
8class Stream extends Base {
9
10 constructor(container) {
11 super(container)
12 }
13
14 list(stream, paging={}) {
15 d("Fetch data for %s on %s", stream.name, stream.deviceId)
16 const url = this.route("STREAM_PULL", stream.deviceId, stream.name) + Pager.buildQuery(paging, null)
17 return this.getClient().get(url)
18 .then((list) => {
19 return Promise.resolve(Pager.create(list, (record) => new Record(record, stream)))
20 })
21 }
22
23 lastUpdate(stream) {
24 d("Fetch last update for %s on %s", stream.name, stream.deviceId)
25 return this.getClient().get(this.route("STREAM_LAST_UPDATE", stream.deviceId, stream.name))
26 .then((r) => {
27 return Promise.resolve(new Record(r, stream))
28 })
29 }
30
31 push(data, stream) {
32 stream = data.getStream ? data.getStream() : stream
33 if(!stream) {
34 throw new Error("stream must be provided in Record or as second argument")
35 }
36 d("Save data for %s on %s", stream.name, stream.deviceId)
37 return this.getClient().put(this.route("STREAM_PUSH", stream.deviceId, stream.name), data)
38 }
39
40 search(stream, q, paging = {}) {
41 d("Search data for %s on %s", stream.name, stream.deviceId)
42 const url = this.route("STREAM_SEARCH", stream.deviceId, stream.name) + Pager.buildQuery(paging)
43 return this.getClient().post(url, q)
44 .then((list) => {
45 let contents = list.content.map((r) => new Record(r, stream))
46 return Promise.resolve(contents)
47 })
48 }
49
50 delete(stream) {
51 d("Remove data for %s on %s", stream.name, stream.deviceId)
52 return this.getClient().delete(this.route("STREAM_LIST", stream.deviceId, stream.name))
53 }
54
55 subscribe(stream, fn) {
56 return this.getClient().subscribe("stream/" + stream.deviceId + "/" + stream.name, fn)
57 .then(() => Promise.resolve(stream))
58 }
59
60 unsubscribe(stream, fn) {
61 return this.getClient().unsubscribe("stream/" + stream.deviceId + "/" + stream.name, fn)
62 .then(() => Promise.resolve(stream))
63 }
64
65}
66
67module.exports = Stream