UNPKG

3.42 kBMarkdownView Raw
1This is a minimalist CouchDB (HTTP) API
2It provides exactly what this module needs, but no more.
3
4 LRU = require 'lru-cache'
5 {EventEmitter2} = require 'eventemitter2'
6
7 http = require 'http'
8 https = require 'https'
9
10 lru_dispose = new EventEmitter2 newListener: false, verboseMemoryLeak: true
11
12 options =
13 max: 200
14 dispose: (key) -> ev.emit key
15 maxAge: 20*60*1000
16
17 lru_cache = LRU options
18
19 static_cache = new Map()
20
21 class CouchDB
22
23 constructor: (uri,use_lru) ->
24 if uri.match /\/$/
25 @uri = uri
26 else
27 @uri = "#{uri}/"
28
29 if use_lru
30 @cache = lru_cache
31 @limit = most.fromEvent @uri, lru_dispose
32 else
33 @cache = static_cache
34 @limit = most.never()
35
36 switch
37 when uri.match /^http:/
38 @agent = agent.agent http.globalAgent
39 when uri.match /^https:/
40 @agent = agent.agent https.globalAgent
41 else
42 @agent = agent
43
44 return
45
46 put: (doc) ->
47 {_id} = doc
48 uri = new URL ec(_id), @uri
49 @agent
50 .put uri.toString()
51 .type 'json'
52 .accept 'json'
53 .send doc
54 .then ({body}) -> body
55
56 get: (_id,{rev} = {}) ->
57 uri = new URL ec(_id), @uri
58 uri.searchParams.set 'rev', rev if rev?
59 @agent
60 .get uri.toString()
61 .accept 'json'
62 .then ({body}) -> body
63
64 delete: ({_id,_rev}) ->
65 uri = new URL ec(_id), @uri
66 uri.searchParams.set 'rev', _rev if _rev?
67 @agent
68 .delete uri.toString()
69 .accept 'json'
70 .then ({body}) -> body
71
72 changes: (options = {}) ->
73 {live,include_docs,since} = options
74 live ?= true
75 throw new Error 'Only live streaming is supported' unless live is true
76
77 if @cache.has @uri
78 return @cache.get @uri
79
80 since ?= 'now'
81 uri = new URL '_changes', @uri
82 uri.searchParams.set 'feed', 'eventsource'
83 uri.searchParams.set 'heartbeat', true
84 uri.searchParams.set 'include_docs', include_docs ? false
85
86The stream might end because the server disconnects or other errors.
87In all cases we let it finish cleanly.
88
89 s = ->
90 uri.searchParams.set 'since', since
91 fromEventSource new EventSource uri.toString()
92 .continueWith ->
93 console.error 'retry', uri.host, uri.pathname, since
94 most.never()
95 .recoverWith (error) ->
96 console.error 'retry', (error.stack ? error), uri.host, uri.pathname, since
97 most.never()
98
99 stream = autoRestart(s)
100 .map ({data}) -> data
101 .map JSON.parse
102 .tap ({seq}) -> since = seq if seq?
103 .until @limit
104 .multicast()
105
106 @cache.set @uri, stream
107 stream
108
109 module.exports = CouchDB
110 ec = encodeURIComponent
111 most = require 'most'
112 EventSource = require 'eventsource'
113 {fromEventSource} = require 'most-w3msg'
114 {URL} = require 'url'
115 agent = require 'superagent'
116
117When the stream finishes, we restart it with a small delay.
118
119 sleep = (timeout) -> new Promise (resolve) -> setTimeout resolve, timeout
120
121 autoRestart = (s) ->
122
123 most
124 .generate -> yield 200
125 .startWith 0
126 .map (delay) ->
127 await sleep delay
128 s()
129 .chain most.fromPromise
130 .switchLatest()