Jump To …

Log.coffee

fs = require 'fs'
Addr64 = (require './Addr64.coffee').Addr64
LogUtil = (require './LogUtil.coffee').LogUtil
EventEmitter = (require 'events').EventEmitter
DataIterator = (require './DataIterator.coffee').DataIterator
LoggableIterator = (require './LoggableIterator.coffee').LoggableIterator

This class implements the virtual file space conception. The data is actually stored in several files but the user of Log doesn't need to know anything about it. Takes WriteLogable when writing and returns ReadLogable when reading. Deals with Addr64 and allows the maximum capacity of 2^64 bytes.

class Log extends EventEmitter

@private

  logConfig: undefined

@private

  directory: undefined

@private

  fileSize: undefined

@private

  writer: undefined

@private

  reader: undefined

@private

  lastFlushAddr: undefined

@private

  fn: undefined

@private

  offset: undefined

Constructor.

@param logConfig.

  @create$LogConfig: (logConfig, o) ->
    if !o? then o = new Log
    o.logConfig = logConfig
    o.directory = logConfig.getDir()
    o.fileSize = logConfig.getFileSize()
    o.writer = logConfig.getWriter()
    o.reader = logConfig.getReader()
    o.lastFlushAddr = undefined
    return o

  getReader: () ->
    return @reader

  hasAddress$Addr64: (address) ->
    return address.less$Addr64 @writer.getHighAddress()

  getHighAddress: () ->
    return @writer.getHighAddress()

Returns iterator which reads raw bytes of the log starting from specified address.

@param address @return instance of ByteIterator

  readIteratorFrom$Addr64: (address) ->
    return DataIterator.create$LogCache$Addr64 @reader.getCache(), address

@private Gets filename and offset from address.

@param addr address to be parsed.

  parseAddr$Addr64: (addr) ->
    @offset = addr.logAnd$Addr64(@reader.getCache().getFileMask()).toFloat()
    @fn = @directory + LogUtil.getLogFilename$Addr64 addr

  getLastLoggableOfType$int$emit: (type) ->
    nullAddr = Addr64.create$int$int 0, 0
    lastLoggableOfType = undefined
    addr = @getHighAddress()
    @parseAddr$Addr64 addr
    startAddr = addr.plus$int(-@offset)
    finishAddr = addr
    li = LoggableIterator.create$Log$Addr64 this, startAddr
    lastRead = li.getHighAddress()

    if !startAddr.equals$Addr64 finishAddr

      moveToPrevFileOrFinish = () =>
        if lastLoggableOfType != undefined
          @emit 'getLastLoggableOfType', lastLoggableOfType
        else
          if !startAddr.equals$Addr64 nullAddr
            finishAddr = startAddr.plus$int 0
            startAddr = startAddr.plus$int(-@fileSize * LogUtil.LOG_BLOCK_ALIGNMENT)
            li.it = @readIteratorFrom$Addr64 startAddr
            @parseAddr$Addr64 startAddr
            li.once 'next', (loggable) =>
              onGetLogable loggable
            loggable = li.next$emit()
            if loggable?
              li.removeAllListeners 'next'
              onGetLogable loggable
          else
            @emit 'getLastLoggableOfType', lastLoggableOfType

      continueReading = () =>
        if !li.getHighAddress().equals$Addr64 finishAddr
          b = li.hasNext()
          if b
            li.once 'next', (loggable) =>
              onGetLogable loggable
            loggable = li.next$emit()
            if loggable?
              li.removeAllListeners 'next'
              onGetLogable loggable
          else
            moveToPrevFileOrFinish()
        else
          moveToPrevFileOrFinish()

      onGetLogable = (loggable) =>
        if loggable.getType() == type
          lastLoggableOfType = loggable
        continueReading()

      li.once 'next', (loggable) =>
        onGetLogable loggable
      loggable = li.next$emit()
      if loggable?
        li.removeAllListeners 'next'
        onGetLogable loggable
    else
      @emit 'getLastLoggableOfType', undefined

Reads Loggable.

@param addr Addr64 object that points at position of addr in log. Event 'read' is emitted when done. Note: the data in this Logable is not actually read! Take dataIterator from logable and use its methods for reading.

  read$Addr64$emit: (addr) ->
    addrString = Addr64.addr64ToString$Addr64$int(addr, LogUtil.LOG_NAME_BASE)
    @reader.once 'read' + addrString, (loggable) =>
      @emit 'read' + addrString, loggable
    @reader.read$Addr64$emit addr

Reads Loggable.

@param it DataIterator that points at position of addr in log. Event 'read' is emitted when done. Note: the data in this Logable is not actually read! Take dataIterator from logable and use its methods for reading.

  read$DataIterator$emit: (it) ->
    addrString = Addr64.
    addr64ToString$Addr64$int(it.getHighAddress(), LogUtil.LOG_NAME_BASE)
    @reader.once 'read' + addrString, (loggable) =>
      @emit 'read' + addrString, loggable
    @reader.read$DataIterator$emit it

Reads Loggable.

@param addr Addr64 object that points at position of addr in log. @param it DataIterator that points at position of addr in log. Event 'read' is emitted when done. Note: the data in this Logable is not actually read! Take dataIterator from logable and use its methods for reading.

  read$DataIterator$Addr64$emit: (it, addr) ->
    addrString = Addr64.
    addr64ToString$Addr64$int(it.getHighAddress(), LogUtil.LOG_NAME_BASE)
    @reader.once 'read' + addrString, (loggable) =>
      @emit 'read' + addrString, loggable
    @reader.read$DataIterator$emit it

Pad current file with null loggables. Null loggable takes only one byte in the log,so each file of the log with arbitrary alignment can be padded with nulls. Padding with nulls is automatically performed when a loggable to be written can't be placed within the appendable file without overcome of the value of fileLengthBound. This feature allows to guarantee that each file starts with a new loggable, no loggable can begin in one file and end in another. Also, this simplifies reading algorithm: if we started reading by address it definitely should finish within current file.

  padWithNulls: () ->
    @writer.padWithNulls()

Writes specified loggable continuously in a single file.

@param loggable the loggable to write. @return address where the loggable was placed or undefined value if the loggable can't be written continuously in current appendable file.

  writeContinuosly$Loggable: (loggable) ->
    return @writer.write$Loggable loggable

Writes a loggable to the end of the log If padding is needed, it is performed, but loggable is not written

@param loggable - loggable to write. @return address where the loggable was placed.

  tryWrite$Loggable: (loggable) ->
    res = @writeContinuosly$Loggable loggable
    if !res?
      @padWithNulls()
    return res

Writes a loggable to the end of the log padding the log with nulls if necessary. So auto-alignment guarantees the loggable to be placed in a single file. The exact time of writing is not known. Use close or flush to be sure that data is stored on disk.

@param loggable - loggable to write. @return address where the loggable was placed.

  write$Loggable: (loggable) ->
    @lastFlushAddr = undefined
    @lastStructureId = loggable.structureId
    res = @writeContinuosly$Loggable loggable
    if !res?
      @padWithNulls()
      res = @writeContinuosly$Loggable loggable
    return res

Closes all streams. This is the last operation, it's impossible to write after it. Use flush if you want to write more.

  close$emit: () ->
    @writer.once 'close', () =>
      @emit 'close'
    @writer.close$emit()

Flushes data. Use this method to be sure that data is stored on disk and you can continue. Do not continue writing before event 'flush' is emitted.

@return addr of the end of data flushed.

  flush$emit: () ->
    if @lastFlushAddr == undefined
      @writer.once 'flush', () =>
        @emit 'flush'
      @lastFlushAddr = @writer.flush$emit()
    else
      @emit 'flush'
    return @lastFlushAddr

exports.Log = Log