1 | var xtend = require('xtend')
|
2 | var through = require('through2')
|
3 | var readonly = require('read-only-stream')
|
4 | var once = require('once')
|
5 | var collect = require('collect-stream')
|
6 |
|
7 | var errors = require('../errors')
|
8 | var refs2nodes = require('../lib/util').refs2nodes
|
9 |
|
10 |
|
11 |
|
12 |
|
13 |
|
14 |
|
15 |
|
16 |
|
17 |
|
18 |
|
19 | module.exports = function (osm) {
|
20 | return function getChanges (id, opts, cb) {
|
21 | if (typeof opts === 'function') {
|
22 | cb = opts
|
23 | opts = {}
|
24 | }
|
25 | var stream = through.obj(getDoc)
|
26 |
|
27 | osm.get(id, function (err, docs) {
|
28 | if (err) return onError(err)
|
29 |
|
30 | if (!isChangeset(docs)) {
|
31 | return onError(new errors.NotFound('changeset id: ' + id))
|
32 | }
|
33 |
|
34 | var r = osm.changeset.list(id, opts)
|
35 | r.on('error', onError)
|
36 | r.pipe(stream)
|
37 | })
|
38 | if (cb) {
|
39 |
|
40 | cb = once(cb)
|
41 | collect(stream, cb)
|
42 | } else {
|
43 |
|
44 | return readonly(stream)
|
45 | }
|
46 |
|
47 | function getDoc (row, enc, next) {
|
48 | var self = this
|
49 | osm.log.get(row.key, function (err, doc) {
|
50 | if (err) return next(err)
|
51 | var element = xtend(doc.value.v, {
|
52 | id: doc.value.k,
|
53 | version: doc.key,
|
54 | action: getAction(doc)
|
55 | })
|
56 | self.push(refs2nodes(element))
|
57 | next()
|
58 | })
|
59 | }
|
60 |
|
61 | function onError (err) {
|
62 | if (cb) return cb(err)
|
63 | stream.emit('error', err)
|
64 | }
|
65 | }
|
66 | }
|
67 |
|
68 | function isChangeset (docs) {
|
69 | var versions = Object.keys(docs)
|
70 | var result = false
|
71 | versions.forEach(function (version) {
|
72 | if (docs[version].type === 'changeset') result = true
|
73 | })
|
74 | return result
|
75 | }
|
76 |
|
77 | function getAction (doc) {
|
78 | if (doc.links.length === 0 && doc.value.d === undefined) return 'create'
|
79 | if (doc.links.length > 0 && doc.value.d === undefined) return 'modify'
|
80 | if (doc.value.d !== undefined) return 'delete'
|
81 | }
|