Jump To …

Reader.coffee

EventEmitter = (require 'events').EventEmitter
Addr64 = (require '../log/Addr64.coffee').Addr64
LogUtil = (require '../log/LogUtil.coffee').LogUtil
LogCache = (require '../log/LogCache.coffee').LogCache
RandomAccessLoggable =
  (require '../log/RandomAccessLoggable.coffee').RandomAccessLoggable
NullLoggable = (require '../log/NullLoggable.coffee').NullLoggable
DataIterator = (require '../log/DataIterator.coffee').DataIterator
CompressedUnsignedLongByteIterable =
  (require '../log/iterate/CompressedUnsignedLongByteIterable.coffee').
  CompressedUnsignedLongByteIterable
RandomAccessByteIterable =
  (require '../log/RandomAccessByteIterable.coffee').
  RandomAccessByteIterable
LoggableFactory = (require '../log/LoggableFactory.coffee').LoggableFactory

This class manages reading logables from file system using cache.

class Reader extends EventEmitter

@private

  directory: undefined

@private

  openFiles: undefined

@private

  fileSize: undefined

@private

  pageSize: undefined

@private

  cache: undefined

@private

  initializingAtAddress: undefined

Constructor.

@param directory the directory where log is placed. @param fileSize the file size. @param pageSize the page size. @param openFiles maximum number of cache pages to be held.

  @create$String$int$int$int: (directory, fileSize, pageSize, openFiles, o) ->
    if !o? then o = new Reader
    o.directory = directory
    o.fileSize = fileSize
    o.pageSize = pageSize
    o.openFiles = openFiles
    o.cache = LogCache.create$String$int$int$int o.directory,
            o.fileSize * LogUtil.LOG_BLOCK_ALIGNMENT, o.pageSize, o.openFiles
    o.cache.setMaxListeners 0
    o.setMaxListeners 0
    o.initializingAtAddress = new Object()
    return o

  getCache: () ->
    return @cache

@private Init loggable.

@param it the DataIterator to the position in log where loggable is.

  initLoggable$DataIterator: (it) ->
    addr = it.getAddress()
    pageAddr = addr.plus$int(-addr.logAnd$Addr64(@cache.getPageMask()).
    toFloat())
    addrStr = Addr64.addr64ToString$Addr64$int addr, LogUtil.LOG_NAME_BASE
    pageAddrStr = Addr64.addr64ToString$Addr64$int pageAddr,
            LogUtil.LOG_NAME_BASE
    @cache.on 'getPage' + pageAddrStr, (buffer, addrString) =>
      if @initializingAtAddress[addrStr] == undefined
        @initLoggableValues$DataIterator$String it, addrStr
    b = @cache.getPage$Addr64$emit addr
    if b?
      @initLoggableValues$DataIterator$String it, addrStr

@private Init loggable fields when first buffer is got.

@param it the DataIterator to the position in log where loggable is. @param globalAddrString the string with this loggable address. @return emits readXXX event, where XXX is loggable address string.

  initLoggableValues$DataIterator$String: (it, globalAddrString) ->
    addr = it.getAddress()
    @initializingAtAddress[globalAddrString] = true
    k = 1
    nums = new Object()
    setFields = () =>
      if !(NullLoggable.isNullLoggable$int nums[1])
        type = nums[1]
        address = addr.copy()
        structureId = nums[2]
        dataLength = nums[3]
        dataAddress = it.getHighAddress()
        headerLength = dataAddress.diff$Addr64 addr
        length = headerLength + dataLength
        if dataLength == 0
          data = RandomAccessByteIterable.EMPTY
        else
          data = RandomAccessByteIterable.create$Addr64$Reader dataAddress, this
        loggable = LoggableFactory.
        create$Addr64$int$int$RandomAccessByteIterable$int$int(address, type,
                length, data, dataLength, structureId)
        if !(loggable.type >= 0 && loggable.structureId >= 0 &&
                loggable.length >= 0)
          addr = addr.plus$int(loggable.length - loggable.headerLength)
        else

TODO exception?

          loggable.isCorrupted = true
      else
        loggable = NullLoggable.create$Addr64 addr
      @emit 'read' + globalAddrString, loggable
    onGetInt = (i, it) =>
      if i?
        @it = it
        nums[k] = i
        if !(NullLoggable.isNullLoggable$int nums[1])
          while k < 3
            k += 1
            nums[k] = CompressedUnsignedLongByteIterable.
            getInt$ByteIterator$emit(@it)
            if !nums[k]?
              break
      setFields()
    while k < 4
      oldIt = it.copy()
      addrString = Addr64.addr64ToString$Addr64$int(oldIt.addr,
              LogUtil.LOG_NAME_BASE)
      CompressedUnsignedLongByteIterable.EMITTER.once 'getInt' + addrString, (i, it) =>
        onGetInt i, it
      nums[k] = CompressedUnsignedLongByteIterable.getInt$ByteIterator$emit(it)
      if k == 1 && (NullLoggable.isNullLoggable$int nums[1])
        break
      if !nums[k]?
        break
      else
        CompressedUnsignedLongByteIterable.EMITTER.
        removeAllListeners 'getInt' + addrString
      k += 1
    if k == 4 || (NullLoggable.isNullLoggable$int nums[1])
      setFields()
    return undefined

Reads ReadLogable from addr.

@param addr Addr64 object that points at position of addr in log. Event 'read' is emitted when done. Note: the data in this Loggable is not actually read!

  read$Addr64$emit: (addr) ->
    it = DataIterator.create$LogCache$Addr64 @cache, addr
    @initLoggable$DataIterator it

Reads ReadLogable from DataIterator.

@param it DataIterator that points at position of addr in log. Event 'read' is emitted when done. Note: the data in this Loggable is not actually read!

  read$DataIterator$emit: (it) ->
    @initLoggable$DataIterator it

exports.Reader = Reader