1 |
|
2 | var d = require("debug")("raptorjs:stream")
|
3 | var Base = require("./Base")
|
4 | var Record = require("./model/Record")
|
5 | var util = require("./util")
|
6 | var Pager = require("./pager")
|
7 |
|
8 | class Stream extends Base {
|
9 |
|
10 | constructor(container) {
|
11 | super(container)
|
12 | }
|
13 |
|
14 | list(stream, paging={}) {
|
15 | d("Fetch data for %s on %s", stream.name, stream.deviceId)
|
16 | const url = this.route("STREAM_PULL", stream.deviceId, stream.name) + Pager.buildQuery(paging, null)
|
17 | return this.getClient().get(url)
|
18 | .then((list) => {
|
19 | return Promise.resolve(Pager.create(list, (record) => new Record(record, stream)))
|
20 | })
|
21 | }
|
22 |
|
23 | lastUpdate(stream) {
|
24 | d("Fetch last update for %s on %s", stream.name, stream.deviceId)
|
25 | return this.getClient().get(this.route("STREAM_LAST_UPDATE", stream.deviceId, stream.name))
|
26 | .then((r) => {
|
27 | return Promise.resolve(new Record(r, stream))
|
28 | })
|
29 | }
|
30 |
|
31 | push(data, stream) {
|
32 | stream = data.getStream ? data.getStream() : stream
|
33 | if(!stream) {
|
34 | throw new Error("stream must be provided in Record or as second argument")
|
35 | }
|
36 | d("Save data for %s on %s", stream.name, stream.deviceId)
|
37 | return this.getClient().put(this.route("STREAM_PUSH", stream.deviceId, stream.name), data)
|
38 | }
|
39 |
|
40 | search(stream, q, paging = {}) {
|
41 | d("Search data for %s on %s", stream.name, stream.deviceId)
|
42 | const url = this.route("STREAM_SEARCH", stream.deviceId, stream.name) + Pager.buildQuery(paging)
|
43 | return this.getClient().post(url, q)
|
44 | .then((list) => {
|
45 | let contents = list.content.map((r) => new Record(r, stream))
|
46 | return Promise.resolve(contents)
|
47 | })
|
48 | }
|
49 |
|
50 | delete(stream) {
|
51 | d("Remove data for %s on %s", stream.name, stream.deviceId)
|
52 | return this.getClient().delete(this.route("STREAM_LIST", stream.deviceId, stream.name))
|
53 | }
|
54 |
|
55 | subscribe(stream, fn) {
|
56 | return this.getClient().subscribe("stream/" + stream.deviceId + "/" + stream.name, fn)
|
57 | .then(() => Promise.resolve(stream))
|
58 | }
|
59 |
|
60 | unsubscribe(stream, fn) {
|
61 | return this.getClient().unsubscribe("stream/" + stream.deviceId + "/" + stream.name, fn)
|
62 | .then(() => Promise.resolve(stream))
|
63 | }
|
64 |
|
65 | }
|
66 |
|
67 | module.exports = Stream
|