Writer.coffee | |
---|---|
fs = require 'fs'
EventEmitter = (require 'events').EventEmitter
Addr64 = (require '../log/Addr64.coffee').Addr64
LogUtil = (require '../log/LogUtil.coffee').LogUtil
LightOutputStream =
(require '../database/impl/iterate/LightOutputStream.coffee').
LightOutputStream
NullLoggable = (require '../log/NullLoggable.coffee').NullLoggable
CompressedUnsignedLongByteIterable =
(require '../log/iterate/CompressedUnsignedLongByteIterable.coffee').
CompressedUnsignedLongByteIterable | |
This class manages writing loggables to file system. | class Writer extends EventEmitter |
@private | directory: undefined |
@private | writeStreams: undefined |
@private | writeBuffers: undefined |
@private | fileNames: undefined |
@private | openFiles: undefined |
@private | fn: undefined |
@private | nextBufferSize: undefined |
@private | fileSize: undefined |
@private | stream: undefined |
@private | emitClose: undefined |
@private | mask: undefined |
@private | addr: undefined |
Constructor. @param directory the directory where the log is placed. @param fileSize the size of log file. | @create$String$int: (directory, fileSize, o) ->
if !o? then o = new Writer
o.directory = directory
o.fileSize = fileSize * LogUtil.LOG_BLOCK_ALIGNMENT
o.mask = Addr64.create$int$int 0, o.fileSize - 1
o.writeStreams = new Object()
o.writeBuffers = new Object()
o.fileNames = new Object()
o.openFiles = 0
o.addr = LogUtil.getHighAddress$String o.directory
o.parseAddr$Addr64 o.addr
o.nextBufferSize = o.fileSize - o.offset
o.createNewWriteStream$String "a"
o.on 'writeFile', (fn) =>
if o.openFiles == 0
o.writeBuffers = new Object()
o.writeStreams = new Object()
if o.emitClose
o.emit 'close'
else
o.emit 'writeAll'
return o
getHighAddress: () ->
return @addr |
@private Creates new WriteStream. @param opt is either "a" (for appending stream) or "w" (for new file). | createNewWriteStream$String: (opt) ->
@writeBuffers[@fn] = new Buffer @nextBufferSize
@writeBuffers[@fn].fill 0
@nextBufferSize = @fileSize
@stream = LightOutputStream.create$Buffer @writeBuffers[@fn]
options = {flags: opt, encoding: null, mode: "0666", start: @offset}
@writeStreams[@fn] = fs.createWriteStream @fn, options
fn = @fn
@writeStreams[fn].on 'close', () =>
@writeBuffers[fn] = undefined
@openFiles -= 1
@emit 'writeFile', fn
@writeStreams[fn].on 'error', (err) =>
console.log err
@openFiles += 1
fs.open fn, "a+", "0666", (err, fd) =>
throw err if err |
@private Gets filename and offset from address. @param addr address to be parsed. | parseAddr$Addr64: (addr) ->
@offset = addr.logAnd$Addr64(@mask).toFloat()
addr = addr.plus$int(-@offset)
@fn = @directory + LogUtil.getLogFilename$Addr64 addr |
@private Write loggable to buffer. @param loggable loggable to write @param it byte iterator @return new value of it | loggableToBuffer$Loggable$LightOuputStream: (loggable, stream) ->
CompressedUnsignedLongByteIterable.
fillBytes$int$LightOutputStream loggable.getType(), stream
if !NullLoggable.isNullLoggable$Loggable loggable
CompressedUnsignedLongByteIterable.
fillBytes$int$LightOutputStream loggable.getStructureId(), stream
CompressedUnsignedLongByteIterable.
fillBytes$int$LightOutputStream loggable.getDataLength(), stream
stream.write$Buffer loggable.getData().getBytesUnsafe()
return stream |
Writes specified loggable continuously in a single file. @param loggable the loggable to write. @return address where the loggable was placed or undefined if the loggable can't be written continuously in current appendable file. | write$Loggable: (loggable) ->
@parseAddr$Addr64 @addr
if @offset + loggable.length() > @fileSize
return undefined
else
addr = @addr.copy()
@addr = @addr.plus$int(loggable.length())
if !@writeStreams[@fn]?
@createNewWriteStream$String "w"
@loggableToBuffer$Loggable$LightOuputStream loggable, @stream
if @offset + loggable.length() == @fileSize
@writeStreams[@fn].end @writeBuffers[@fn]
return addr |
Pad current file with null loggables. Null loggable takes only one byte in the log,so each file of the log with arbitrary alignment can be padded with nulls. Padding with nulls is automatically performed when a loggable to be written can't be placed within the appendable file without overcome of the value of fileLengthBound. This feature allows to guarantee that each file starts with a new loggable, no loggable can begin in one file and end in another. Also, this simplifies reading algorithm: if we started reading by address it definitely should finish within current file. | padWithNulls: () ->
@parseAddr$Addr64 @addr
n = @fileSize - @offset
nlogable = NullLoggable.create()
for i in [1..n]
@loggableToBuffer$Loggable$LightOuputStream nlogable, @stream
@writeStreams[@fn].end @writeBuffers[@fn]
@addr = @addr.plus$int(-@offset).plus$int(@fileSize)
@parseAddr$Addr64 @addr
if !@writeStreams[@fn]?
@createNewWriteStream$String "w" |
Closes all streams. This is the last operation, it's impossible to write after it. Use flush if you want to write more. @param emitClose flag that determines whether to emit 'close' when done. | close$emit: (emitClose = true) ->
@emitClose = emitClose
@parseAddr$Addr64 @addr
fn = @fn
if @offset != 0
offset = @offset - (@fileSize - @writeBuffers[@fn].length)
else
offset = 0
buffer = new Buffer offset
@writeBuffers[@fn].copy buffer, 0, 0, offset
@writeStreams[@fn].end buffer |
Flushes data - closes all write streams and opens new one. Use this method to be sure that data is stored on disk and you can continue. Do not continue writing before event 'flush' emitted. @return addr the address of last DatabaseRoot record. | flush$emit: () ->
@once 'writeAll', () =>
@parseAddr$Addr64 @addr
@nextBufferSize = @fileSize - @offset
@createNewWriteStream$String "a"
@emit 'flush'
@close$emit false
return @addr
exports.Writer = Writer
|