UNPKG

2.42 kBJavaScriptView Raw
1var xtend = require('xtend')
2var through = require('through2')
3var readonly = require('read-only-stream')
4var once = require('once')
5var collect = require('collect-stream')
6
7var errors = require('../errors')
8var refs2nodes = require('../lib/util').refs2nodes
9
10/**
11 * Get the changes in a changeset, as `cb(err, changes)` or as a stream
12 * @param {string} id Changeset ID
13 * @param {Object} osm osm-p2p-db instance
14 * @param {Function} cb callback(err, array of elements from changeset)
15 * Elements have the property 'action' which is one of
16 * create|modify|delete
17 * @returns {ReadableStream} Readable object stream of changes
18 */
19module.exports = function (osm) {
20 return function getChanges (id, opts, cb) {
21 if (typeof opts === 'function') {
22 cb = opts
23 opts = {}
24 }
25 var stream = through.obj(getDoc)
26 // Check whether doc with id exists
27 osm.get(id, function (err, docs) {
28 if (err) return onError(err)
29 // Ensure that doc with id is of type changset
30 if (!isChangeset(docs)) {
31 return onError(new errors.NotFound('changeset id: ' + id))
32 }
33 // An object stream {key: versionId, value: 0}
34 var r = osm.changeset.list(id, opts)
35 r.on('error', onError)
36 r.pipe(stream)
37 })
38 if (cb) {
39 // If a callback is defined, collect the stream into an array
40 cb = once(cb)
41 collect(stream, cb)
42 } else {
43 // Otherwise return a readable stream
44 return readonly(stream)
45 }
46
47 function getDoc (row, enc, next) {
48 var self = this
49 osm.log.get(row.key, function (err, doc) {
50 if (err) return next(err)
51 var element = xtend(doc.value.v, {
52 id: doc.value.k,
53 version: doc.key,
54 action: getAction(doc)
55 })
56 self.push(refs2nodes(element))
57 next()
58 })
59 }
60
61 function onError (err) {
62 if (cb) return cb(err)
63 stream.emit('error', err)
64 }
65 }
66}
67
68function isChangeset (docs) {
69 var versions = Object.keys(docs)
70 var result = false
71 versions.forEach(function (version) {
72 if (docs[version].type === 'changeset') result = true
73 })
74 return result
75}
76
77function getAction (doc) {
78 if (doc.links.length === 0 && doc.value.d === undefined) return 'create'
79 if (doc.links.length > 0 && doc.value.d === undefined) return 'modify'
80 if (doc.value.d !== undefined) return 'delete'
81}