LogCache.coffee | |
---|---|
fs = require 'fs'
Addr64 = (require './Addr64.coffee').Addr64
LogUtil = (require './LogUtil.coffee').LogUtil
StackList = (require './StackList.coffee').StackList
EventEmitter = (require 'events').EventEmitter | |
This class provides cache for reading data from disk. It is connected with some directory on disk that contains files that all but one have fixed size. Every file contain several pages of fixed size. Cache provides rapid access to often asked pages and slow access for rearly used pages. Page size is expected to be less or equal to file size. Cache is used by Log for reading data. | class LogCache extends EventEmitter
@MINIMUM_PAGE_SIZE: LogUtil.LOG_BLOCK_ALIGNMENT
@DEFAULT_OPEN_FILES_COUNT: 16
@MINIMUM_MEM_USAGE_PERCENT: 5
@MAXIMUM_MEM_USAGE_PERCENT: 95 |
@private | directory: undefined |
@private | fileSize: undefined |
@private | fileMask: undefined |
@private | fileOffset: undefined |
@private | fileAddr: undefined |
@private | pageSize: undefined |
@private | pageMask: undefined |
@private | pageOffset: undefined |
@private | pageAddr: undefined |
@private | pages: undefined |
@private | pageRequests: undefined |
Constructor. @param directory the directory with log files. @param logFileSize the logarithm of file size. @param logPageSize the logarithm of page size. @param maxNumberOfPages maximum number of pages to be stored in cache. PageRequests[key] contains true of the page that starts with addr "key" was requested recently and is waited to be read from disk (or is already in cache); othrewise it is undefined. | @create$String$int$int$int: (directory, fileSize, pageSize, maxNumberOfPages, o) ->
if !o? then o = new LogCache
o.directory = directory
o.fileSize = fileSize
o.pageSize = pageSize
o.fileMask = Addr64.create$int$int 0, (o.fileSize - 1)
o.pageMask = Addr64.create$int$int 0, (o.pageSize - 1)
o.pages = new StackList maxNumberOfPages
o.pageRequests = new Object()
return o
getFileMask: () ->
return @fileMask
getPageMask: () ->
return @pageMask
getPageSize: () ->
return @pageSize
getPageAddr: () ->
return @pageAddr
getPageOffset: () ->
return @pageOffset
getHighAddress: () ->
return LogUtil.getHighAddress$String @directory |
Get all information about page from any addr inside it. PageAddr and fileAddr contain addrs of the beginning of file (which is used to determine file name - fn) and page (which is used to determine page name). PageOffset is an offset inside page, and fileOffset is the position where this page begins in this file. @param addr address inside cache page. | parseAddr$Addr64: (addr) ->
@pageOffset = addr.logAnd$Addr64(@pageMask).toFloat()
@pageAddr = addr.plus$int(-@pageOffset)
@fileOffset = addr.logAnd$Addr64(@fileMask).toFloat()
@fileAddr = addr.plus$int(-@fileOffset)
@fileOffset -= @pageOffset
@fn = @directory + LogUtil.getLogFilename$Addr64 @fileAddr |
Get page that is next after page with this addr inside it. @param addr address inside cache page. @return page (Buffer) or undefined if page is not in cache. If undefined is returned, 'getPageXXX' is emitted, where XXX is page addr. | getNextPage$Addr64$emit: (addr) ->
addr = addr.plus$int @pageSize
return @getPage$Addr64$emit addr |
Get page with this addr inside it. @param addr address inside cache page. @return page (Buffer) or undefined if page is not in cache. If undefined is returned, 'getPageXXX' is emitted, where XXX is page addr. | getPage$Addr64$emit: (addr) ->
@parseAddr$Addr64 addr
addrStr = Addr64.addr64ToString$Addr64$int(@pageAddr, LogUtil.LOG_NAME_BASE)
if (@pages.get addrStr) != undefined
@pages.moveToTop addrStr
return @pages.get addrStr
else if @pageRequests[addrStr] == undefined
@pageRequests[addrStr] = true
pageAddr = @pageAddr
fileOffset = @fileOffset
fileAddr = @fileAddr
fn = @fn
fs.open fn, "r", "0666", (err, fd) =>
throw err if err
options = {flags: "r", encoding: null, mode: "0666", start: fileOffset}
readStream = fs.createReadStream fn, options
emitGetPage = true
bytesRead = 0
buffer = new Buffer @pageSize
readStream.on 'data', (data) =>
if emitGetPage
if data.length < @pageSize - bytesRead
data.copy buffer, bytesRead, 0, data.length
bytesRead += data.length
else
data.copy buffer, bytesRead, 0, @pageSize - bytesRead
bytesRead = @pageSize
if bytesRead == @pageSize && emitGetPage
if @pages.length == @pages.limit
@pageRequests[@pages.bottomKey()] = undefined
@pageRequests[addrStr] = undefined
@pages.push addrStr, buffer
@emit 'getPage' + addrStr, buffer, addrStr
emitGetPage = false
readStream.on 'error', (err) =>
console.log err
readStream.on 'end', () =>
if emitGetPage
if @pages.length == @pages.limit
@pageRequests[@pages.bottomKey()] = undefined
@pageRequests[addrStr] = undefined
@pages.push addrStr, buffer
@emit 'getPage' + addrStr, buffer, addrStr
emitGetPage = false
return undefined
else
return undefined
exports.LogCache = LogCache
|