Reader.coffee | |
---|---|
EventEmitter = (require 'events').EventEmitter
Addr64 = (require '../log/Addr64.coffee').Addr64
LogUtil = (require '../log/LogUtil.coffee').LogUtil
LogCache = (require '../log/LogCache.coffee').LogCache
RandomAccessLoggable =
(require '../log/RandomAccessLoggable.coffee').RandomAccessLoggable
NullLoggable = (require '../log/NullLoggable.coffee').NullLoggable
DataIterator = (require '../log/DataIterator.coffee').DataIterator
CompressedUnsignedLongByteIterable =
(require '../log/iterate/CompressedUnsignedLongByteIterable.coffee').
CompressedUnsignedLongByteIterable
RandomAccessByteIterable =
(require '../log/RandomAccessByteIterable.coffee').
RandomAccessByteIterable
LoggableFactory = (require '../log/LoggableFactory.coffee').LoggableFactory | |
This class manages reading logables from file system using cache. | class Reader extends EventEmitter |
@private | directory: undefined |
@private | openFiles: undefined |
@private | fileSize: undefined |
@private | pageSize: undefined |
@private | cache: undefined |
@private | initializingAtAddress: undefined |
Constructor. @param directory the directory where log is placed. @param fileSize the file size. @param pageSize the page size. @param openFiles maximum number of cache pages to be held. | @create$String$int$int$int: (directory, fileSize, pageSize, openFiles, o) ->
if !o? then o = new Reader
o.directory = directory
o.fileSize = fileSize
o.pageSize = pageSize
o.openFiles = openFiles
o.cache = LogCache.create$String$int$int$int o.directory,
o.fileSize * LogUtil.LOG_BLOCK_ALIGNMENT, o.pageSize, o.openFiles
o.cache.setMaxListeners 0
o.setMaxListeners 0
o.initializingAtAddress = new Object()
return o
getCache: () ->
return @cache |
@private Init loggable. @param it the DataIterator to the position in log where loggable is. | initLoggable$DataIterator: (it) ->
addr = it.getAddress()
pageAddr = addr.plus$int(-addr.logAnd$Addr64(@cache.getPageMask()).
toFloat())
addrStr = Addr64.addr64ToString$Addr64$int addr, LogUtil.LOG_NAME_BASE
pageAddrStr = Addr64.addr64ToString$Addr64$int pageAddr,
LogUtil.LOG_NAME_BASE
@cache.on 'getPage' + pageAddrStr, (buffer, addrString) =>
if @initializingAtAddress[addrStr] == undefined
@initLoggableValues$DataIterator$String it, addrStr
b = @cache.getPage$Addr64$emit addr
if b?
@initLoggableValues$DataIterator$String it, addrStr |
@private Init loggable fields when first buffer is got. @param it the DataIterator to the position in log where loggable is. @param globalAddrString the string with this loggable address. @return emits readXXX event, where XXX is loggable address string. | initLoggableValues$DataIterator$String: (it, globalAddrString) ->
addr = it.getAddress()
@initializingAtAddress[globalAddrString] = true
k = 1
nums = new Object()
setFields = () =>
if !(NullLoggable.isNullLoggable$int nums[1])
type = nums[1]
address = addr.copy()
structureId = nums[2]
dataLength = nums[3]
dataAddress = it.getHighAddress()
headerLength = dataAddress.diff$Addr64 addr
length = headerLength + dataLength
if dataLength == 0
data = RandomAccessByteIterable.EMPTY
else
data = RandomAccessByteIterable.create$Addr64$Reader dataAddress, this
loggable = LoggableFactory.
create$Addr64$int$int$RandomAccessByteIterable$int$int(address, type,
length, data, dataLength, structureId)
if !(loggable.type >= 0 && loggable.structureId >= 0 &&
loggable.length >= 0)
addr = addr.plus$int(loggable.length - loggable.headerLength)
else |
TODO exception? | loggable.isCorrupted = true
else
loggable = NullLoggable.create$Addr64 addr
@emit 'read' + globalAddrString, loggable
onGetInt = (i, it) =>
if i?
@it = it
nums[k] = i
if !(NullLoggable.isNullLoggable$int nums[1])
while k < 3
k += 1
nums[k] = CompressedUnsignedLongByteIterable.
getInt$ByteIterator$emit(@it)
if !nums[k]?
break
setFields()
while k < 4
oldIt = it.copy()
addrString = Addr64.addr64ToString$Addr64$int(oldIt.addr,
LogUtil.LOG_NAME_BASE)
CompressedUnsignedLongByteIterable.EMITTER.once 'getInt' + addrString, (i, it) =>
onGetInt i, it
nums[k] = CompressedUnsignedLongByteIterable.getInt$ByteIterator$emit(it)
if k == 1 && (NullLoggable.isNullLoggable$int nums[1])
break
if !nums[k]?
break
else
CompressedUnsignedLongByteIterable.EMITTER.
removeAllListeners 'getInt' + addrString
k += 1
if k == 4 || (NullLoggable.isNullLoggable$int nums[1])
setFields()
return undefined |
Reads ReadLogable from addr. @param addr Addr64 object that points at position of addr in log. Event 'read' is emitted when done. Note: the data in this Loggable is not actually read! | read$Addr64$emit: (addr) ->
it = DataIterator.create$LogCache$Addr64 @cache, addr
@initLoggable$DataIterator it |
Reads ReadLogable from DataIterator. @param it DataIterator that points at position of addr in log. Event 'read' is emitted when done. Note: the data in this Loggable is not actually read! | read$DataIterator$emit: (it) ->
@initLoggable$DataIterator it
exports.Reader = Reader
|