RandomAccessByteIterable.coffee | |
---|---|
Addr64 = (require './Addr64.coffee').Addr64
LogUtil = (require './LogUtil.coffee').LogUtil
ByteIterable = (require '../database/ByteIterable.coffee').ByteIterable
CompoundByteIteratorBase =
(require './iterate/CompoundByteIteratorBase.coffee').CompoundByteIteratorBase
ArrayByteIterable =
(require '../database/impl/iterate/ArrayByteIterable.coffee').
ArrayByteIterable
UnsupportedOperationError =
(require '../errors/UnsupportedOperationError.coffee').
UnsupportedOperationError
BlockNotFoundError =
(require '../errors/BlockNotFoundError.coffee').BlockNotFoundError | |
This iterable is used for access to data stored in loggables when reading from log. This iterable is emitter. | class RandomAccessByteIterable extends ByteIterable |
@private | reader: undefined |
@private | address: undefined |
@private | class CompoundByteIterator extends CompoundByteIteratorBase |
@private | currentAddress: undefined |
@private | obj: undefined
@create$Addr64: (address, obj, o) ->
if !o? then o = new CompoundByteIterator
o.currentAddress = address
o.obj = obj
return o
copy: () ->
return CompoundByteIterator.create$Addr64 @currentAddress |
TODO return won't work Get next iterator. As the data we want to get can be stored on disk, this method emits 'nextIterator' or returns iterator. | nextIterator$emit: () ->
cache = @obj.reader.getCache()
alignment = @currentAddress.logAnd$Addr64(cache.getPageMask()).toFloat()
alignedAddress = @currentAddress.plus$int(-alignment)
addrString = Addr64.
addr64ToString$Addr64$int(alignedAddress, LogUtil.LOG_NAME_BASE)
emit = true
onGetPage = (buffer) =>
readBytes = buffer.length
if readBytes <= alignment
if emit
@emit 'nextIterator', null
return undefined
else
return null
@currentAddress = @currentAddress.plus$int(readBytes - alignment)
cache.on 'getPage' + addrString, (buffer) =>
onGetPage buffer
@emit 'nextIterator', (ArrayByteIterable.create$Buffer buffer).
iterator(alignment)
buffer = cache.getPage$Addr64$emit @currentAddress
if buffer?
emit = false
cache.removeAllListeners 'getPage' + addrString
if (onGetPage buffer) == null
return
else
return (ArrayByteIterable.create$Buffer buffer).iterator$int(alignment)
else
return undefined
class CompoundByteIteratorAutoAdvance extends CompoundByteIterator
@create: (obj, o) ->
if !o? then o = new CompoundByteIterator
o = CompoundByteIterator.create obj.address, obj, o
return o
skip$int: (length) ->
result = super length
@obj.address = @obj.address.plus$int length
return result |
Emits 'next' or returns next element (see CompoundByteIteratorBase). | next$emit: () ->
@obj.address = @obj.address.plus$int 1
result = super
return result
getIterable: () ->
return @obj
onFail$String: (message) ->
super message + ", address is: " + @obj.address |
Constructor. @param address. @param reader. | @create$Addr64$Reader: (address, reader, o) ->
if !o? then o = new RandomAccessByteIterable
o.address = address
o.reader = reader
return o
@EMPTY = RandomAccessByteIterable.create$Addr64$Reader 0, null
@EMPTY.iterator = () ->
return ByteIterable.EMPTY_ITERATOR
@EMPTY.iterator$int = (offset) ->
return ByteIterable.EMPTY_ITERATOR
@EMPTY.advance$int = (offset) ->
getAddress: () ->
return @address |
Increase address. @param offset | advance$int: (offset) ->
@address = @address.plus$int offset
iteratorAutoAdvance: () ->
return CompoundByteIteratorAutoAdvance.create this
iterator: () ->
return CompoundByteIterator.create$Addr64 @address, this
iterator$int: (offset) ->
return CompoundByteIterator.create$Addr64 @address.plus$int(offset), this
getBytesUnsafe: () ->
throw new UnsupportedOperationError()
getLength: () ->
throw new UnsupportedOperationError()
compareTo$ByteIterable$emit: (right) ->
throw new UnsupportedOperationError() |
Compares this iterable with another one. The second iterable can't be RandomAccessByteIterable, because it must be completely in memory when comparing. This method can emit 'compare'. @param offset offset inside this iterable. @param myLen the length of this iterable. @param right the second iterable. @return <0 if this iterable is less than the second one, 0 if they are equal and >0 otherwise. | compare$int$int$ByteIterable$emit: (offset, myLen, right) ->
cache = @reader.getCache()
pageSize = cache.getPageSize()
alignedAddress = @address.plus$int offset
endAddress = alignedAddress.plus$int myLen
diff = endAddress.logAnd$Addr64(cache.getPageMask()).toFloat()
endAddress = endAddress.plus$int(-diff)
leftStep = alignedAddress.logAnd$Addr64(cache.getPageMask()).toFloat()
alignedAddress = alignedAddress.plus$int(-leftStep)
addrStr = Addr64.
addr64ToString$Addr64$int(alignedAddress, LogUtil.LOG_NAME_BASE)
emit = false
onGetFirstPage = (leftArray) =>
leftLen = leftArray.length
if leftArray.length <= leftStep
throw new BlockNotFoundError(alignedAddress)
rightArray = right.getBytesUnsafe()
rightLen = right.getLength()
rightStep = 0
beforeGetPage = () =>
limit = Math.min myLen, Math.min(leftLen - leftStep, rightLen)
while rightStep < limit
b1 = leftArray[leftStep++]
b2 = rightArray[rightStep++]
if (b1 != b2)
if emit
@emit 'compare', (b1 & 0xff) - (b2 & 0xff)
return undefined
else
return (b1 & 0xff) - (b2 & 0xff)
if (rightStep == rightLen || !alignedAddress.less$Addr64(endAddress))
if emit
return undefined
@emit 'compare', myLen - rightLen
else
return myLen - rightLen
alignedAddress = alignedAddress.plus$int(pageSize)
addrStr = Addr64.
addr64ToString$Addr64$int(alignedAddress, LogUtil.LOG_NAME_BASE)
cache.once 'getPage' + addrStr, (leftArray) =>
onGetPage leftArray
cycleStep = () =>
res = beforeGetPage()
if res?
return res
else
leftArray = cache.getPage$Addr64$emit(alignedAddress)
if leftArray?
cache.removeAllListeners 'getPage' + addrStr
onGetPage leftArray
else
emit = true
return undefined
onGetPage = (leftArray) =>
leftLen = leftArray.length
leftStep = 0
return cycleStep()
return cycleStep()
cache.once 'getPage' + addrStr, (leftArray) =>
onGetFirstPage leftArray
leftArray = cache.getPage$Addr64$emit alignedAddress
if leftArray?
cache.removeAllListeners 'getPage' + addrStr
onGetFirstPage leftArray
else
emit = true
return undefined |
Copy this iterable. @param offset the offset inside iterable. @return new RandomAccessByteIterable. | clone$int: (offset) ->
return RandomAccessByteIterable.
create$Addr64$Reader @address.plus$int(offset), @reader |
public int binarySearch(@NotNull final IByteIterableComparator comparator, final ByteIterable key, int low, int high, final int bytesPerLong, BinarySearchPolicy searchPolicy) { final LogCache cache = log.cache final int pageSize = log.getCachePageSize() long leftAddress = -1 byte[] leftPage = null long rightAddress = -1 byte[] rightPage = null final BinarySearchIterator it = new BinarySearchIterator(pageSize) while (low <= high) { final int mid = searchPolicy.getMedian(low, high) final long midAddress = address + (long) (mid * bytesPerLong) it.offset = (int) (midAddress % (long) pageSize) it.address = midAddress - it.offset boolean loaded = false if (it.address == leftAddress) { it.page = leftPage } else if (it.address == rightAddress) { it.page = rightPage } else | #{ |
it.page = cache.getPage(log, it.address).getBytesUnsafe() loaded = true } final int cmp final long address final byte[] page if (pageSize - it.offset < bytesPerLong) { final long nextAddress = (address = it.address) + pageSize if (rightAddress == nextAddress) { it.nextPage = rightPage } else | #{ |
it.nextPage = cache.getPage(log, nextAddress).getBytesUnsafe() loaded = true } page = it.page cmp = comparator.compare(LongBinding.entryToUnsignedLong(it.asCompound(), bytesPerLong), key) } else | #{ |
cmp = comparator.compare(LongBinding.entryToUnsignedLong(it, bytesPerLong), key) page = it.page address = it.address } if (cmp < 0) { searchPolicy.moveRight(); low = mid + 1; if (loaded) { leftAddress = it.address; leftPage = it.page; } } else if (cmp > 0) { //left > right searchPolicy.moveLeft(); high = mid - 1; if (loaded) { rightAddress = address; rightPage = page; } } else { searchPolicy.hit(); return mid; // key found } } return -(low + 1); // key not found. } | |
private static class BinarySearchIterator implements ByteIterator { private byte[] page; private byte[] nextPage; private int offset; private long address; private final int pageSize; private BinarySearchIterator(int pageSize) { this.pageSize = pageSize; } private CompoundByteIteratorBase asCompound() { return new CompoundByteIteratorBase(this) { @Override protected ByteIterator nextIterator() { page = nextPage; address += pageSize; offset = 0; return BinarySearchIterator.this; } }; } @Override public boolean hasNext() { return offset < pageSize; } @Override public byte next() { return page[offset++]; } @Override public long skip(long length) { throw new UnsupportedOperationException(); } } | exports.RandomAccessByteIterable = RandomAccessByteIterable
|