Jump To …

RandomAccessByteIterable.coffee

Addr64 = (require './Addr64.coffee').Addr64
LogUtil = (require './LogUtil.coffee').LogUtil
ByteIterable = (require '../database/ByteIterable.coffee').ByteIterable
CompoundByteIteratorBase =
  (require './iterate/CompoundByteIteratorBase.coffee').CompoundByteIteratorBase
ArrayByteIterable =
  (require '../database/impl/iterate/ArrayByteIterable.coffee').
  ArrayByteIterable
UnsupportedOperationError =
  (require '../errors/UnsupportedOperationError.coffee').
  UnsupportedOperationError
BlockNotFoundError =
  (require '../errors/BlockNotFoundError.coffee').BlockNotFoundError

This iterable is used for access to data stored in loggables when reading from log. This iterable is emitter.

class RandomAccessByteIterable extends ByteIterable

@private

  reader: undefined

@private

  address: undefined

@private

  class CompoundByteIterator extends CompoundByteIteratorBase

@private

    currentAddress: undefined

@private

    obj: undefined

    @create$Addr64: (address, obj, o) ->
      if !o? then o = new CompoundByteIterator
      o.currentAddress = address
      o.obj = obj
      return o

    copy: () ->
      return CompoundByteIterator.create$Addr64 @currentAddress

TODO return won't work Get next iterator. As the data we want to get can be stored on disk, this method emits 'nextIterator' or returns iterator.

    nextIterator$emit: () ->
      cache = @obj.reader.getCache()
      alignment = @currentAddress.logAnd$Addr64(cache.getPageMask()).toFloat()
      alignedAddress = @currentAddress.plus$int(-alignment)
      addrString = Addr64.
      addr64ToString$Addr64$int(alignedAddress, LogUtil.LOG_NAME_BASE)
      emit = true
      onGetPage = (buffer) =>
        readBytes = buffer.length
        if readBytes <= alignment
          if emit
            @emit 'nextIterator', null
            return undefined
          else
            return null
        @currentAddress = @currentAddress.plus$int(readBytes - alignment)
      cache.on 'getPage' + addrString, (buffer) =>
        onGetPage buffer
        @emit 'nextIterator', (ArrayByteIterable.create$Buffer buffer).
        iterator(alignment)
      buffer = cache.getPage$Addr64$emit @currentAddress
      if buffer?
        emit = false
        cache.removeAllListeners 'getPage' + addrString
        if (onGetPage buffer) == null
          return
        else
          return (ArrayByteIterable.create$Buffer buffer).iterator$int(alignment)
      else
        return undefined

  class CompoundByteIteratorAutoAdvance extends CompoundByteIterator
    @create: (obj, o) ->
      if !o? then o = new CompoundByteIterator
      o = CompoundByteIterator.create obj.address, obj, o
      return o

    skip$int: (length) ->
      result = super length
      @obj.address = @obj.address.plus$int length
      return result

Emits 'next' or returns next element (see CompoundByteIteratorBase).

    next$emit: () ->
      @obj.address = @obj.address.plus$int 1
      result = super
      return result

    getIterable: () ->
      return @obj

    onFail$String: (message) ->
      super message + ", address is: " + @obj.address

Constructor.

@param address. @param reader.

  @create$Addr64$Reader: (address, reader, o) ->
    if !o? then o = new RandomAccessByteIterable
    o.address = address
    o.reader = reader
    return o

  @EMPTY = RandomAccessByteIterable.create$Addr64$Reader 0, null
  @EMPTY.iterator = () ->
      return ByteIterable.EMPTY_ITERATOR
  @EMPTY.iterator$int = (offset) ->
      return ByteIterable.EMPTY_ITERATOR
  @EMPTY.advance$int = (offset) ->

  getAddress: () ->
    return @address

Increase address.

@param offset

  advance$int: (offset) ->
    @address = @address.plus$int offset

  iteratorAutoAdvance: () ->
    return CompoundByteIteratorAutoAdvance.create this

  iterator: () ->
    return CompoundByteIterator.create$Addr64 @address, this

  iterator$int: (offset) ->
    return CompoundByteIterator.create$Addr64 @address.plus$int(offset), this

  getBytesUnsafe: () ->
    throw new UnsupportedOperationError()

  getLength: () ->
    throw new UnsupportedOperationError()

  compareTo$ByteIterable$emit: (right) ->
    throw new UnsupportedOperationError()

Compares this iterable with another one. The second iterable can't be RandomAccessByteIterable, because it must be completely in memory when comparing. This method can emit 'compare'.

@param offset offset inside this iterable. @param myLen the length of this iterable. @param right the second iterable. @return <0 if this iterable is less than the second one, 0 if they are equal and >0 otherwise.

  compare$int$int$ByteIterable$emit: (offset, myLen, right) ->
    cache = @reader.getCache()
    pageSize = cache.getPageSize()
    alignedAddress = @address.plus$int offset
    endAddress = alignedAddress.plus$int myLen
    diff = endAddress.logAnd$Addr64(cache.getPageMask()).toFloat()
    endAddress = endAddress.plus$int(-diff)
    leftStep = alignedAddress.logAnd$Addr64(cache.getPageMask()).toFloat()
    alignedAddress = alignedAddress.plus$int(-leftStep)
    addrStr = Addr64.
    addr64ToString$Addr64$int(alignedAddress, LogUtil.LOG_NAME_BASE)

    emit = false
    onGetFirstPage = (leftArray) =>
      leftLen = leftArray.length
      if leftArray.length <= leftStep
        throw new BlockNotFoundError(alignedAddress)
      rightArray = right.getBytesUnsafe()
      rightLen = right.getLength()
      rightStep = 0

      beforeGetPage = () =>
        limit = Math.min myLen, Math.min(leftLen - leftStep, rightLen)
        while rightStep < limit
          b1 = leftArray[leftStep++]
          b2 = rightArray[rightStep++]
          if (b1 != b2)
            if emit
              @emit 'compare', (b1 & 0xff) - (b2 & 0xff)
              return undefined
            else
              return (b1 & 0xff) - (b2 & 0xff)
        if (rightStep == rightLen || !alignedAddress.less$Addr64(endAddress))
          if emit
            return undefined
            @emit 'compare', myLen - rightLen
          else
            return myLen - rightLen
        alignedAddress = alignedAddress.plus$int(pageSize)
        addrStr = Addr64.
        addr64ToString$Addr64$int(alignedAddress, LogUtil.LOG_NAME_BASE)
        cache.once 'getPage' + addrStr, (leftArray) =>
          onGetPage leftArray

      cycleStep = () =>
        res = beforeGetPage()
        if res?
          return res
        else
          leftArray = cache.getPage$Addr64$emit(alignedAddress)
          if leftArray?
            cache.removeAllListeners 'getPage' + addrStr
            onGetPage leftArray
          else
            emit = true
            return undefined

      onGetPage = (leftArray) =>
        leftLen = leftArray.length
        leftStep = 0
        return cycleStep()

      return cycleStep()

    cache.once 'getPage' + addrStr, (leftArray) =>
      onGetFirstPage leftArray
    leftArray = cache.getPage$Addr64$emit alignedAddress
    if leftArray?
      cache.removeAllListeners 'getPage' + addrStr
      onGetFirstPage leftArray
    else
      emit = true
      return undefined

Copy this iterable.

@param offset the offset inside iterable. @return new RandomAccessByteIterable.

  clone$int: (offset) ->
    return RandomAccessByteIterable.
    create$Addr64$Reader @address.plus$int(offset), @reader

public int binarySearch(@NotNull final IByteIterableComparator comparator, final ByteIterable key, int low, int high, final int bytesPerLong, BinarySearchPolicy searchPolicy) { final LogCache cache = log.cache final int pageSize = log.getCachePageSize() long leftAddress = -1 byte[] leftPage = null long rightAddress = -1 byte[] rightPage = null final BinarySearchIterator it = new BinarySearchIterator(pageSize) while (low <= high) { final int mid = searchPolicy.getMedian(low, high) final long midAddress = address + (long) (mid * bytesPerLong) it.offset = (int) (midAddress % (long) pageSize) it.address = midAddress - it.offset boolean loaded = false if (it.address == leftAddress) { it.page = leftPage } else if (it.address == rightAddress) { it.page = rightPage } else

  #{

it.page = cache.getPage(log, it.address).getBytesUnsafe() loaded = true } final int cmp final long address final byte[] page if (pageSize - it.offset < bytesPerLong) { final long nextAddress = (address = it.address) + pageSize if (rightAddress == nextAddress) { it.nextPage = rightPage } else

  #{

it.nextPage = cache.getPage(log, nextAddress).getBytesUnsafe() loaded = true } page = it.page cmp = comparator.compare(LongBinding.entryToUnsignedLong(it.asCompound(), bytesPerLong), key) } else

  #{

cmp = comparator.compare(LongBinding.entryToUnsignedLong(it, bytesPerLong), key) page = it.page address = it.address } if (cmp < 0) { searchPolicy.moveRight(); low = mid + 1; if (loaded) { leftAddress = it.address; leftPage = it.page; } } else if (cmp > 0) { //left > right searchPolicy.moveLeft(); high = mid - 1; if (loaded) { rightAddress = address; rightPage = page; } } else { searchPolicy.hit(); return mid; // key found } } return -(low + 1); // key not found. }

private static class BinarySearchIterator implements ByteIterator { private byte[] page; private byte[] nextPage; private int offset; private long address;

private final int pageSize;

private BinarySearchIterator(int pageSize) { this.pageSize = pageSize; }

private CompoundByteIteratorBase asCompound() { return new CompoundByteIteratorBase(this) { @Override protected ByteIterator nextIterator() { page = nextPage; address += pageSize; offset = 0; return BinarySearchIterator.this; } }; }

@Override public boolean hasNext() { return offset < pageSize; }

@Override public byte next() { return page[offset++]; }

@Override public long skip(long length) { throw new UnsupportedOperationException(); } }

exports.RandomAccessByteIterable = RandomAccessByteIterable