UNPKG

18.6 kBtext/coffeescriptView Raw
1# Copyright (c) 2013 Rod Vagg, MIT License
2# Copyright (c) 2014 Riceball LEE, MIT License
3xtend = require("xtend")
4try NoSqlStream = require("nosql-stream")
5ReadStream = NoSqlStream.ReadStream if NoSqlStream
6WriteStream = NoSqlStream.WriteStream if NoSqlStream
7AbstractObject = require("abstract-object")
8util = require("abstract-object/lib/util")
9Codec = require("buffer-codec")
10utf8ByteLength = Codec.getByteLen
11Errors = require("./abstract-error")
12AbstractIterator = require("./abstract-iterator")
13AbstractChainedBatch = require("./abstract-chained-batch")
14setImmediate = global.setImmediate or process.nextTick
15
16AbstractError = Errors.AbstractError
17NotImplementedError = Errors.NotImplementedError
18InvalidArgumentError = Errors.InvalidArgumentError
19OpenError = Errors.OpenError
20CloseError = Errors.CloseError
21inherits = util.inherits
22isString = util.isString
23
24module.exports.AbstractNoSQL = class AbstractNoSQL
25 inherits AbstractNoSQL, AbstractObject
26
27 constructor: ->
28 super
29 init:(location) ->
30 #not all database have the location argument.
31 throw new InvalidArgumentError("constructor requires a location string argument") if location and typeof location isnt "string"
32 @location = location
33
34 @::__defineGetter__ "opened", ->
35 !!@_opened
36
37 setOpened: (aValue, options)->
38 if aValue
39 @_opened = true
40 @_options = options if options
41 @emit "ready"
42 @emit "open"
43 else
44 @_opened = false
45 @emit "closed"
46
47 #the optimal low-level sync functions:
48 isExistsSync: (key, options) ->
49 options = {} unless options?
50 if @_isExistsSync
51 result = @_isExistsSync(key, options)
52 return result
53 else if @_getSync
54 try
55 @_getSync key, options
56 return true
57 catch err
58 if AbstractError.isNotFound(err)
59 return false
60 else
61 throw err
62 throw new NotImplementedError()
63
64 getSync: (key, options) ->
65 if @_getSync
66 options = {} unless options?
67 result = @_getSync(key, options)
68 return result
69 throw new NotImplementedError()
70
71 getBufferSync: (key, destBuffer, options) ->
72 if @_getBufferSync
73 options = {} unless options?
74 options.offset = 0 unless options.offset?
75 result = @_getBufferSync(key, destBuffer, options)
76 return result
77 throw new NotImplementedError()
78
79 mGetSync: (keys, options) ->
80 if @_mGetSync
81 options = {} unless options?
82 options.raiseError = options.raiseError isnt false
83 needKeyName = options.keys
84 arr = @_mGetSync(keys, options)
85 i = 0
86 result = []
87 while i < arr.length
88 if needKeyName isnt false
89 result.push
90 key: arr[i]
91 value: arr[++i]
92 else
93 result.push arr[i]
94 i++
95 return result
96 throw new NotImplementedError()
97
98 putSync: (key, value, options) ->
99 if @_putSync
100 options = {} unless options?
101 result = @_putSync(key, value, options)
102 return result
103 throw new NotImplementedError()
104
105 delSync: (key, options) ->
106 if @_delSync
107 options = {} unless options?
108 result = @_delSync(key, options)
109 return result
110 throw new NotImplementedError()
111
112 batchSync: (operations, options) ->
113 if @_batchSync
114 options = {} unless options?
115 result = @_batchSync(operations, options)
116 return result
117 throw new NotImplementedError()
118
119 approximateSizeSync: (start, end) ->
120 if @_approximateSizeSync
121 result = @_approximateSizeSync(start, end)
122 return result
123 throw new NotImplementedError()
124
125 openSync: (options) ->
126 if @_openSync
127 options = @_options || {} unless options?
128 options.createIfMissing = options.createIfMissing isnt false
129 options.errorIfExists = !!options.errorIfExists
130 result = @_openSync(options)
131 @setOpened true, options if result
132 result = @ if result
133 return result
134 throw new NotImplementedError()
135
136
137 #if successful should return true.
138 closeSync: ->
139 if @_closeSync
140 result = @_closeSync()
141 @setOpened false if result
142 return result
143 throw new NotImplementedError()
144
145
146 #the async methods simulated by sync methods:
147 #the derived class can override these methods to implement the real async methods for better performance.
148 _open: (options, callback) ->
149 that = this
150 if @_openSync
151 setImmediate ->
152 result = undefined
153 try
154 result = that._openSync(options)
155 catch err
156 callback err
157 return
158 if result
159 callback null, result
160 else
161 callback new OpenError("can not open database.")
162
163 else
164 setImmediate callback
165
166 _close: (callback) ->
167 that = this
168 if @_closeSync
169 setImmediate ->
170 result = undefined
171 try
172 result = that._closeSync()
173 catch err
174 callback err
175 return
176 if result
177 callback null, result
178 else
179 callback new CloseError("can not close database.")
180
181 else
182 setImmediate callback
183
184 _isExists: (key, options, callback) ->
185 that = this
186 if @_isExistsSync
187 setImmediate ->
188 result = undefined
189 try
190 result = that._isExistsSync(key, options)
191 catch err
192 callback err
193 return
194 callback null, result
195
196 else
197 @_get key, options, (err, value) ->
198 if err
199 if AbstractError.isNotFound(err)
200 callback null, false
201 else
202 callback err
203 else
204 callback null, true
205
206 _getBuffer: (key, destBuffer, options, callback) ->
207 that = this
208 if @_getSync or @_getBufferSync isnt AbstractNoSQL::_getBufferSync
209 setImmediate ->
210 result = undefined
211 try
212 result = that._getBufferSync(key, destBuffer, options)
213 catch err
214 callback err
215 return
216 callback null, result
217 else if @_get
218 @_get key, options, (err, value)->
219 return callback(err) if err
220 result = utf8ByteLength(value)
221 if destBuffer
222 result = Math.min(result, destBuffer.length)
223 result = destBuffer.write(value, options.offset, result) if result
224 callback null, result
225 else
226 setImmediate callback
227
228 _getBufferSync: (key, destBuffer, options) ->
229 if @_getSync
230 value = @_getSync(key, options)
231 result = utf8ByteLength(value)
232 if destBuffer
233 result = Math.min(result, destBuffer.length)
234 result = destBuffer.write(value, options.offset, result) if result
235 return result
236 else
237 throw new NotImplementedError('_mGetSync: _getSync is not implemented.')
238
239 _mGetSync: (keys, options) ->
240 if @_getSync
241 result = []
242 needKeyName = options.keys
243 raiseError = options.raiseError
244 options.asBuffer = options.asBuffer is true
245 for key in keys
246 try
247 value = @_getSync(key, options)
248 catch err
249 throw err if raiseError
250 value = undefined
251 if needKeyName isnt false
252 result.push key, value
253 else
254 result.push value
255 return result
256 else
257 throw new NotImplementedError('_mGetSync: _getSync is not implemented.')
258
259 _mGet: (keys, options, callback) ->
260 that = this
261 if @_getSync or @_mGetSync isnt AbstractNoSQL::_mGetSync
262 setImmediate ->
263 result = undefined
264 try
265 result = that._mGetSync keys, options
266 catch err
267 callback err
268 return
269 callback null, result
270 else if keys.length > 0 and @_get
271 result = []
272 i = 0
273 needKeyName = options.keys
274 raiseError = options.raiseError
275
276 readNext = (err, value)->
277 return callback(err) if err and raiseError
278
279 if needKeyName isnt false
280 result.push keys[i], value
281 else
282 result.push value
283 i++
284 return callback(null, result) if i >= keys.length
285 that._get keys[i], options, readNext
286 @_get keys[i], options, readNext
287 else
288 setImmediate callback
289
290 _get: (key, options, callback) ->
291 that = this
292 if @_getSync
293 setImmediate ->
294 result = undefined
295 try
296 result = that._getSync(key, options)
297 catch err
298 callback err
299 return
300 callback null, result
301
302 else
303 setImmediate callback
304
305 _put: (key, value, options, callback) ->
306 that = this
307 if @_putSync
308 setImmediate ->
309 result = undefined
310 try
311 result = that._putSync(key, value, options)
312 catch err
313 callback err
314 return
315 callback null, result
316
317 else
318 setImmediate callback
319
320 _del: (key, options, callback) ->
321 that = this
322 if @_delSync
323 setImmediate ->
324 result = undefined
325 try
326 result = that._delSync(key, options)
327 catch err
328 callback err
329 return
330 callback null, result
331
332 else
333 setImmediate callback
334
335 _batch: (array, options, callback) ->
336 that = this
337 if @_batchSync
338 setImmediate ->
339 result = undefined
340 try
341 result = that._batchSync(array, options)
342 catch err
343 callback err
344 return
345 callback null, result
346
347 else
348 setImmediate callback
349
350
351 #TODO: remove from here, not a necessary primitive
352 _approximateSize: (start, end, callback) ->
353 that = this
354 if @_approximateSizeSync
355 setImmediate ->
356 result = undefined
357 try
358 result = that._approximateSizeSync(start, end)
359 catch err
360 callback err
361 return
362 callback null, result
363 else
364 setImmediate callback
365
366
367 #slower impl:
368 #
369 #_exec: (fn, args, callback) ->
370 # that = this
371 # if fn then setImmediate ->
372 # result
373 # try
374 # result = fn.apply(that, args)
375 # catch (err)
376 # callback(err)
377 # return
378 #
379 # callback(null, result)
380 # else
381 # setImmediate(callback)
382 #
383 #_open: (options, callback) ->
384 # this._exec(this._openSync, [options], callback)
385 #
386 #
387 open: (options, callback) ->
388 callback = options if typeof options is "function"
389 options = @_options || {} unless typeof options is "object"
390 options.createIfMissing = options.createIfMissing isnt false
391 options.errorIfExists = !!options.errorIfExists
392 if callback
393 that = this
394 @_open options, (err, result) ->
395 that.setOpened true, options if not err?
396 callback err, result
397 else
398 @openSync options
399
400 close: (callback) ->
401 if callback
402 if typeof callback is "function"
403 that = this
404 @_close (err, result) ->
405 that.setOpened false if not err?
406 callback err, result
407
408 else
409 throw new Error("close() requires callback function argument")
410 else
411 @closeSync()
412
413 isExists: (key, options, callback) ->
414 if typeof options is "function"
415 callback = options
416 options = {}
417 else
418 options = {} unless options?
419 key = String(key) unless @_isBuffer(key)
420 if callback
421 @_isExists key, options, callback
422 else
423 @isExistsSync key, options
424 isExist: @::isExists
425
426 getBuffer: (key, destBuffer, options, callback) ->
427 err = undefined
428 if typeof options is "function"
429 callback = options
430 options = {}
431 else
432 options = {} unless options?
433 options.offset = 0 unless options.offset?
434 if callback
435 @_getBuffer key, destBuffer, options, callback
436 else
437 @getBufferSync key, destBuffer, options
438
439 mGet: (keys, options, callback) ->
440 err = undefined
441 if typeof options is "function"
442 callback = options
443 options = {}
444 else
445 options = {} unless options?
446 options.asBuffer = options.asBuffer is true
447 options.raiseError = options.raiseError isnt false
448 needKeyName = options.keys isnt false
449 if callback
450 @_mGet keys, options, (err, arr)->
451 return callback(err) if err
452 if needKeyName
453 i = 0
454 result = []
455 while i < arr.length
456 result.push
457 key: arr[i]
458 value: arr[++i]
459 i++
460 else
461 result = arr
462 callback null, result
463 else
464 @mGetSync keys, options
465
466 get: (key, options, callback) ->
467 err = undefined
468 if typeof options is "function"
469 callback = options
470 options = {}
471 else
472 options = {} unless options?
473 if err = @_checkKey(key, "key", @_isBuffer)
474 if callback
475 return callback(err)
476 else
477 throw err
478 key = String(key) unless @_isBuffer(key)
479 options.asBuffer = options.asBuffer isnt false
480 if callback
481 @_get key, options, callback
482 else
483 @getSync key, options
484
485 put: (key, value, options, callback) ->
486 err = undefined
487 if typeof options is "function"
488 callback = options
489 options = {}
490 else
491 options = {} unless options?
492 if err = @_checkKey(key, "key", @_isBuffer)
493 if callback
494 return callback(err)
495 else
496 throw err
497 key = String(key) unless @_isBuffer(key)
498
499 # coerce value to string in node, don't touch it in browser
500 # (indexeddb can store any JS type)
501 value = String(value) if value? and not @_isBuffer(value) and not process.browser
502 if callback
503 @_put key, value, options, callback
504 else
505 @putSync key, value, options
506
507 del: (key, options, callback) ->
508 err = undefined
509 if typeof options is "function"
510 callback = options
511 options = {}
512 else
513 options = {} unless options?
514 if err = @_checkKey(key, "key", @_isBuffer)
515 if callback
516 return callback(err)
517 else
518 throw err
519 key = String(key) unless @_isBuffer(key)
520 if callback
521 @_del key, options, callback
522 else
523 @delSync key, options
524
525 batch: (array, options, callback) ->
526 return @_chainedBatch() unless arguments.length
527 if typeof options is "function"
528 callback = options
529 options = {}
530 else
531 options = {} unless options?
532 callback = array if typeof array is "function"
533 unless Array.isArray(array)
534 vError = new Error("batch(array) requires an array argument")
535 if callback
536 return callback(vError)
537 else
538 throw vError
539 for e in array
540 continue unless typeof e is "object"
541 if err = @_checkKey(e.type, "type", @_isBuffer)
542 if callback
543 return callback(err)
544 else
545 throw err
546 if err = @_checkKey(e.key, "key", @_isBuffer)
547 if callback
548 return callback(err)
549 else
550 throw err
551 if callback
552 @_batch array, options, callback
553 else
554 @batchSync array, options
555
556
557 #TODO: remove from here, not a necessary primitive
558 approximateSize: (start, end, callback) ->
559 throw new Error("approximateSize() requires valid `start`, `end` and `callback`(for async) arguments") if not start? or not end? or typeof start is "function" or typeof end is "function"
560 start = String(start) unless @_isBuffer(start)
561 end = String(end) unless @_isBuffer(end)
562 if callback
563 @_approximateSize start, end, callback
564 else
565 @approximateSizeSync start, end
566
567 #TODO: move to Iterator.init
568 _setupIteratorOptions: (options) ->
569 self = this
570 options = xtend(options)
571 ["start", "end", "gt", "gte", "lt", "lte"].forEach (o) ->
572 delete options[o] if options[o] and self._isBuffer(options[o]) and options[o].length is 0
573 options.reverse = !!options.reverse
574
575 range = options.range
576 if isString(range)
577 range = range.trim()
578 if range.length >= 2
579 skipStart = if !options.reverse then range[0] is "(" else range[range.length-1] is ")"
580 skipEnd = if !options.reverse then range[range.length-1] is ")" else range[0] is "("
581 range = range.substring(1, range.length-1)
582 range = range.split(",").map (item)->
583 item = item.trim()
584 item = null if item is ""
585 return item
586 if !options.reverse
587 [start,end] = range
588 startOp = 'gt'
589 endOp = 'lt'
590 else
591 [end, start] = range
592 startOp = 'lt'
593 endOp = 'gt'
594 startOp = startOp + 'e' unless skipStart
595 endOp = endOp + 'e' unless skipEnd
596 options[startOp] = start
597 options[endOp] = end
598 options.keys = options.keys isnt false
599 options.values = options.values isnt false
600 options.limit = (if "limit" of options then options.limit else -1)
601 options.keyAsBuffer = options.keyAsBuffer is true
602 options.valueAsBuffer = options.valueAsBuffer is true
603 options
604
605
606 #should override this to test sync or if you do not wanna implement the _iterator function.
607 IteratorClass: AbstractIterator
608 iterator: (options) ->
609 options = {} unless typeof options is "object"
610 options = @_setupIteratorOptions(options)
611 return @_iterator(options) if typeof @_iterator is "function"
612 new @IteratorClass(this, options)
613
614 _chainedBatch: ->
615 new AbstractChainedBatch(this)
616
617 _isBuffer: (obj) ->
618 Buffer.isBuffer obj
619
620 _checkKey: (obj, type) ->
621 if not obj?
622 return new InvalidArgumentError(type + " cannot be `null` or `undefined`")
623 if @_isBuffer(obj)
624 return new InvalidArgumentError(type + " cannot be an empty Buffer") if obj.length is 0
625 else
626 return new InvalidArgumentError(type + " cannot be an empty String") if String(obj) is ""
627
628 isOpen: ->
629 !!@_opened
630 readStream: (options, makeData)->
631 if (ReadStream)
632 opt = xtend(@_options, options)
633 ReadStream @, opt, makeData
634 else
635 console.error "please `npm install nosql-stream` first"
636 createReadStream: @::readStream
637 valueStream: (options, makeData)->
638 opt = xtend(options)
639 opt.keys = false
640 @readStream opt, makeData
641 createValueStream: @::valueStream
642 keyStream: (options, makeData)->
643 opt = xtend(options)
644 opt.values = false
645 @readStream opt, makeData
646 createKeyStream: @::keyStream
647 writeStream: (options)->
648 if (WriteStream)
649 opt = xtend(@_options, options)
650 WriteStream @, opt
651 else
652 console.error "please `npm install nosql-stream` first"
653 createWriteStream: @::writeStream
654
655module.exports.AbstractLevelDOWN = AbstractNoSQL
656module.exports.AbstractIterator = AbstractIterator
657module.exports.AbstractChainedBatch = AbstractChainedBatch