Jump To …

LogCache.coffee

fs = require 'fs'
Addr64 = (require './Addr64.coffee').Addr64
LogUtil = (require './LogUtil.coffee').LogUtil
StackList = (require './StackList.coffee').StackList
EventEmitter = (require 'events').EventEmitter

This class provides cache for reading data from disk. It is connected with some directory on disk that contains files that all but one have fixed size. Every file contain several pages of fixed size. Cache provides rapid access to often asked pages and slow access for rearly used pages. Page size is expected to be less or equal to file size. Cache is used by Log for reading data.

class LogCache extends EventEmitter
  @MINIMUM_PAGE_SIZE: LogUtil.LOG_BLOCK_ALIGNMENT
  @DEFAULT_OPEN_FILES_COUNT: 16
  @MINIMUM_MEM_USAGE_PERCENT: 5
  @MAXIMUM_MEM_USAGE_PERCENT: 95

@private

  directory: undefined

@private

  fileSize: undefined

@private

  fileMask: undefined

@private

  fileOffset: undefined

@private

  fileAddr: undefined

@private

  pageSize: undefined

@private

  pageMask: undefined

@private

  pageOffset: undefined

@private

  pageAddr: undefined

@private

  pages: undefined

@private

  pageRequests: undefined

Constructor.

@param directory the directory with log files. @param logFileSize the logarithm of file size. @param logPageSize the logarithm of page size. @param maxNumberOfPages maximum number of pages to be stored in cache. PageRequests[key] contains true of the page that starts with addr "key" was requested recently and is waited to be read from disk (or is already in cache); othrewise it is undefined.

  @create$String$int$int$int: (directory, fileSize, pageSize, maxNumberOfPages, o) ->
    if !o? then o = new LogCache
    o.directory = directory
    o.fileSize = fileSize
    o.pageSize = pageSize
    o.fileMask = Addr64.create$int$int 0, (o.fileSize - 1)
    o.pageMask = Addr64.create$int$int 0, (o.pageSize - 1)
    o.pages = new StackList maxNumberOfPages
    o.pageRequests = new Object()
    return o

  getFileMask: () ->
    return @fileMask

  getPageMask: () ->
    return @pageMask

  getPageSize: () ->
    return @pageSize

  getPageAddr: () ->
    return @pageAddr

  getPageOffset: () ->
    return @pageOffset

  getHighAddress: () ->
    return LogUtil.getHighAddress$String @directory

Get all information about page from any addr inside it. PageAddr and fileAddr contain addrs of the beginning of file (which is used to determine file name - fn) and page (which is used to determine page name). PageOffset is an offset inside page, and fileOffset is the position where this page begins in this file.

@param addr address inside cache page.

  parseAddr$Addr64: (addr) ->
    @pageOffset = addr.logAnd$Addr64(@pageMask).toFloat()
    @pageAddr = addr.plus$int(-@pageOffset)
    @fileOffset = addr.logAnd$Addr64(@fileMask).toFloat()
    @fileAddr = addr.plus$int(-@fileOffset)
    @fileOffset -= @pageOffset
    @fn = @directory + LogUtil.getLogFilename$Addr64 @fileAddr

Get page that is next after page with this addr inside it.

@param addr address inside cache page. @return page (Buffer) or undefined if page is not in cache. If undefined is returned, 'getPageXXX' is emitted, where XXX is page addr.

  getNextPage$Addr64$emit: (addr) ->
    addr = addr.plus$int @pageSize
    return @getPage$Addr64$emit addr

Get page with this addr inside it.

@param addr address inside cache page. @return page (Buffer) or undefined if page is not in cache. If undefined is returned, 'getPageXXX' is emitted, where XXX is page addr.

  getPage$Addr64$emit: (addr) ->
    @parseAddr$Addr64 addr
    addrStr = Addr64.addr64ToString$Addr64$int(@pageAddr, LogUtil.LOG_NAME_BASE)
    if (@pages.get addrStr) != undefined
      @pages.moveToTop addrStr
      return @pages.get addrStr
    else if @pageRequests[addrStr] == undefined
      @pageRequests[addrStr] = true
      pageAddr = @pageAddr
      fileOffset = @fileOffset
      fileAddr = @fileAddr
      fn = @fn
      fs.open fn, "r", "0666", (err, fd) =>
        throw err if err
        options = {flags: "r", encoding: null, mode: "0666", start: fileOffset}
        readStream = fs.createReadStream fn, options
        emitGetPage = true
        bytesRead = 0
        buffer = new Buffer @pageSize
        readStream.on 'data', (data) =>
          if emitGetPage
            if data.length < @pageSize - bytesRead
              data.copy buffer, bytesRead, 0, data.length
              bytesRead += data.length
            else
              data.copy buffer, bytesRead, 0, @pageSize - bytesRead
              bytesRead = @pageSize
          if bytesRead == @pageSize && emitGetPage
            if @pages.length == @pages.limit
              @pageRequests[@pages.bottomKey()] = undefined
            @pageRequests[addrStr] = undefined
            @pages.push addrStr, buffer
            @emit 'getPage' + addrStr, buffer, addrStr
            emitGetPage = false
        readStream.on 'error', (err) =>
          console.log err
        readStream.on 'end', () =>
          if emitGetPage
            if @pages.length == @pages.limit
              @pageRequests[@pages.bottomKey()] = undefined
            @pageRequests[addrStr] = undefined
            @pages.push addrStr, buffer
            @emit 'getPage' + addrStr, buffer, addrStr
            emitGetPage = false
      return undefined
    else
      return undefined

exports.LogCache = LogCache