Jump To …

Writer.coffee

fs = require 'fs'
EventEmitter = (require 'events').EventEmitter
Addr64 = (require '../log/Addr64.coffee').Addr64
LogUtil = (require '../log/LogUtil.coffee').LogUtil
LightOutputStream =
  (require '../database/impl/iterate/LightOutputStream.coffee').
  LightOutputStream
NullLoggable = (require '../log/NullLoggable.coffee').NullLoggable
CompressedUnsignedLongByteIterable =
  (require '../log/iterate/CompressedUnsignedLongByteIterable.coffee').
  CompressedUnsignedLongByteIterable

This class manages writing loggables to file system.

class Writer extends EventEmitter

@private

  directory: undefined

@private

  writeStreams: undefined

@private

  writeBuffers: undefined

@private

  fileNames: undefined

@private

  openFiles: undefined

@private

  fn: undefined

@private

  nextBufferSize: undefined

@private

  fileSize: undefined

@private

  stream: undefined

@private

  emitClose: undefined

@private

  mask: undefined

@private

  addr: undefined

Constructor.

@param directory the directory where the log is placed. @param fileSize the size of log file.

  @create$String$int: (directory, fileSize, o) ->
    if !o? then o = new Writer
    o.directory = directory
    o.fileSize = fileSize * LogUtil.LOG_BLOCK_ALIGNMENT
    o.mask = Addr64.create$int$int 0, o.fileSize - 1
    o.writeStreams = new Object()
    o.writeBuffers = new Object()
    o.fileNames = new Object()
    o.openFiles = 0
    o.addr = LogUtil.getHighAddress$String o.directory
    o.parseAddr$Addr64 o.addr
    o.nextBufferSize = o.fileSize - o.offset
    o.createNewWriteStream$String "a"
    o.on 'writeFile', (fn) =>
      if o.openFiles == 0
        o.writeBuffers = new Object()
        o.writeStreams = new Object()
        if o.emitClose
          o.emit 'close'
        else
          o.emit 'writeAll'
    return o

  getHighAddress: () ->
    return @addr

@private Creates new WriteStream.

@param opt is either "a" (for appending stream) or "w" (for new file).

  createNewWriteStream$String: (opt) ->
    @writeBuffers[@fn] = new Buffer @nextBufferSize
    @writeBuffers[@fn].fill 0
    @nextBufferSize = @fileSize
    @stream = LightOutputStream.create$Buffer @writeBuffers[@fn]
    options = {flags: opt, encoding: null, mode: "0666", start: @offset}
    @writeStreams[@fn] = fs.createWriteStream @fn, options
    fn = @fn
    @writeStreams[fn].on 'close', () =>
      @writeBuffers[fn] = undefined
      @openFiles -= 1
      @emit 'writeFile', fn
    @writeStreams[fn].on 'error', (err) =>
      console.log err
    @openFiles += 1
    fs.open fn, "a+", "0666", (err, fd) =>
      throw err if err

@private Gets filename and offset from address.

@param addr address to be parsed.

  parseAddr$Addr64: (addr) ->
    @offset = addr.logAnd$Addr64(@mask).toFloat()
    addr = addr.plus$int(-@offset)
    @fn = @directory + LogUtil.getLogFilename$Addr64 addr

@private Write loggable to buffer.

@param loggable loggable to write @param it byte iterator @return new value of it

  loggableToBuffer$Loggable$LightOuputStream: (loggable, stream) ->
    CompressedUnsignedLongByteIterable.
    fillBytes$int$LightOutputStream loggable.getType(), stream
    if !NullLoggable.isNullLoggable$Loggable loggable
      CompressedUnsignedLongByteIterable.
      fillBytes$int$LightOutputStream loggable.getStructureId(), stream
      CompressedUnsignedLongByteIterable.
      fillBytes$int$LightOutputStream loggable.getDataLength(), stream
      stream.write$Buffer loggable.getData().getBytesUnsafe()
    return stream

Writes specified loggable continuously in a single file.

@param loggable the loggable to write. @return address where the loggable was placed or undefined if the loggable can't be written continuously in current appendable file.

  write$Loggable: (loggable) ->
    @parseAddr$Addr64 @addr
    if @offset + loggable.length() > @fileSize
      return undefined
    else
      addr = @addr.copy()
      @addr = @addr.plus$int(loggable.length())
      if !@writeStreams[@fn]?
        @createNewWriteStream$String "w"
      @loggableToBuffer$Loggable$LightOuputStream loggable, @stream
      if @offset + loggable.length() == @fileSize
        @writeStreams[@fn].end @writeBuffers[@fn]
      return addr

Pad current file with null loggables. Null loggable takes only one byte in the log,so each file of the log with arbitrary alignment can be padded with nulls. Padding with nulls is automatically performed when a loggable to be written can't be placed within the appendable file without overcome of the value of fileLengthBound. This feature allows to guarantee that each file starts with a new loggable, no loggable can begin in one file and end in another. Also, this simplifies reading algorithm: if we started reading by address it definitely should finish within current file.

  padWithNulls: () ->
    @parseAddr$Addr64 @addr
    n = @fileSize - @offset
    nlogable = NullLoggable.create()
    for i in [1..n]
      @loggableToBuffer$Loggable$LightOuputStream nlogable, @stream
    @writeStreams[@fn].end @writeBuffers[@fn]
    @addr = @addr.plus$int(-@offset).plus$int(@fileSize)
    @parseAddr$Addr64 @addr
    if !@writeStreams[@fn]?
      @createNewWriteStream$String "w"

Closes all streams. This is the last operation, it's impossible to write after it. Use flush if you want to write more.

@param emitClose flag that determines whether to emit 'close' when done.

  close$emit: (emitClose = true) ->
    @emitClose = emitClose
    @parseAddr$Addr64 @addr
    fn = @fn
    if @offset != 0
      offset = @offset - (@fileSize - @writeBuffers[@fn].length)
    else
      offset = 0
    buffer = new Buffer offset
    @writeBuffers[@fn].copy buffer, 0, 0, offset
    @writeStreams[@fn].end buffer

Flushes data - closes all write streams and opens new one. Use this method to be sure that data is stored on disk and you can continue. Do not continue writing before event 'flush' emitted.

@return addr the address of last DatabaseRoot record.

  flush$emit: () ->
    @once 'writeAll', () =>
      @parseAddr$Addr64 @addr
      @nextBufferSize = @fileSize - @offset
      @createNewWriteStream$String "a"
      @emit 'flush'
    @close$emit false
    return @addr

exports.Writer = Writer