// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

#ifndef PARQUET_UTIL_MEMORY_H
#define PARQUET_UTIL_MEMORY_H

#include <atomic>
#include <cstdint>
#include <cstdlib>
#include <cstring>
#include <memory>
#include <string>
#include <vector>

#include "arrow/buffer.h"
#include "arrow/io/interfaces.h"
#include "arrow/io/memory.h"
#include "arrow/memory_pool.h"
#include "arrow/status.h"
#include "arrow/util/compression.h"

#include "parquet/exception.h"
#include "parquet/types.h"
#include "parquet/util/macros.h"
#include "parquet/util/visibility.h"

namespace parquet {

static inline std::unique_ptr<::arrow::Codec> GetCodecFromArrow(Compression::type codec) {
  std::unique_ptr<::arrow::Codec> result;
  switch (codec) {
    case Compression::UNCOMPRESSED:
      break;
    case Compression::SNAPPY:
      PARQUET_THROW_NOT_OK(::arrow::Codec::Create(::arrow::Compression::SNAPPY, &result));
      break;
    case Compression::GZIP:
      PARQUET_THROW_NOT_OK(::arrow::Codec::Create(::arrow::Compression::GZIP, &result));
      break;
    case Compression::LZO:
      PARQUET_THROW_NOT_OK(::arrow::Codec::Create(::arrow::Compression::LZO, &result));
      break;
    case Compression::BROTLI:
      PARQUET_THROW_NOT_OK(::arrow::Codec::Create(::arrow::Compression::BROTLI, &result));
      break;
    case Compression::LZ4:
      PARQUET_THROW_NOT_OK(::arrow::Codec::Create(::arrow::Compression::LZ4, &result));
      break;
    case Compression::ZSTD:
      PARQUET_THROW_NOT_OK(::arrow::Codec::Create(::arrow::Compression::ZSTD, &result));
      break;
    default:
      break;
  }
  return result;
}

static constexpr int64_t kInMemoryDefaultCapacity = 1024;

using Buffer = ::arrow::Buffer;
using MutableBuffer = ::arrow::MutableBuffer;
using ResizableBuffer = ::arrow::ResizableBuffer;
using PoolBuffer = ::arrow::PoolBuffer;

template <class T>
class PARQUET_EXPORT Vector {
 public:
  explicit Vector(int64_t size, ::arrow::MemoryPool* pool);
  void Resize(int64_t new_size);
  void Reserve(int64_t new_capacity);
  void Assign(int64_t size, const T val);
  void Swap(Vector<T>& v);
  inline T& operator[](int64_t i) const { return data_[i]; }

  const T* data() const { return data_; }

 private:
  std::unique_ptr<PoolBuffer> buffer_;
  int64_t size_;
  int64_t capacity_;
  T* data_;

  DISALLOW_COPY_AND_ASSIGN(Vector);
};

/// A ChunkedAllocator maintains a list of memory chunks from which it
/// allocates memory in response to Allocate() calls; Chunks stay around for
/// the lifetime of the allocator or until they are passed on to another
/// allocator.
//
/// An Allocate() call will attempt to allocate memory from the chunk that was most
/// recently added; if that chunk doesn't have enough memory to
/// satisfy the allocation request, the free chunks are searched for one that is
/// big enough otherwise a new chunk is added to the list.
/// The current_chunk_idx_ always points to the last chunk with allocated memory.
/// In order to keep allocation overhead low, chunk sizes double with each new one
/// added, until they hit a maximum size.
//
///     Example:
///     ChunkedAllocator* p = new ChunkedAllocator();
///     for (int i = 0; i < 1024; ++i) {
/// returns 8-byte aligned memory (effectively 24 bytes):
///       .. = p->Allocate(17);
///     }
/// at this point, 17K have been handed out in response to Allocate() calls and
/// 28K of chunks have been allocated (chunk sizes: 4K, 8K, 16K)
/// We track total and peak allocated bytes. At this point they would be the same:
/// 28k bytes.  A call to Clear will return the allocated memory so
/// total_allocate_bytes_
/// becomes 0 while peak_allocate_bytes_ remains at 28k.
///     p->Clear();
/// the entire 1st chunk is returned:
///     .. = p->Allocate(4 * 1024);
/// 4K of the 2nd chunk are returned:
///     .. = p->Allocate(4 * 1024);
/// a new 20K chunk is created
///     .. = p->Allocate(20 * 1024);
//
///      ChunkedAllocator* p2 = new ChunkedAllocator();
/// the new ChunkedAllocator receives all chunks containing data from p
///      p2->AcquireData(p, false);
/// At this point p.total_allocated_bytes_ would be 0 while p.peak_allocated_bytes_
/// remains unchanged.
/// The one remaining (empty) chunk is released:
///    delete p;

class PARQUET_EXPORT ChunkedAllocator {
 public:
  explicit ChunkedAllocator(::arrow::MemoryPool* pool = ::arrow::default_memory_pool());

  /// Frees all chunks of memory and subtracts the total allocated bytes
  /// from the registered limits.
  ~ChunkedAllocator();

  /// Allocates 8-byte aligned section of memory of 'size' bytes at the end
  /// of the the current chunk. Creates a new chunk if there aren't any chunks
  /// with enough capacity.
  uint8_t* Allocate(int size);

  /// Returns 'byte_size' to the current chunk back to the mem pool. This can
  /// only be used to return either all or part of the previous allocation returned
  /// by Allocate().
  void ReturnPartialAllocation(int byte_size);

  /// Makes all allocated chunks available for re-use, but doesn't delete any chunks.
  void Clear();

  /// Deletes all allocated chunks. FreeAll() or AcquireData() must be called for
  /// each mem pool
  void FreeAll();

  /// Absorb all chunks that hold data from src. If keep_current is true, let src hold on
  /// to its last allocated chunk that contains data.
  /// All offsets handed out by calls to GetCurrentOffset() for 'src' become invalid.
  void AcquireData(ChunkedAllocator* src, bool keep_current);

  std::string DebugString();

  int64_t total_allocated_bytes() const { return total_allocated_bytes_; }
  int64_t peak_allocated_bytes() const { return peak_allocated_bytes_; }
  int64_t total_reserved_bytes() const { return total_reserved_bytes_; }

  /// Return sum of chunk_sizes_.
  int64_t GetTotalChunkSizes() const;

 private:
  friend class ChunkedAllocatorTest;
  static const int INITIAL_CHUNK_SIZE = 4 * 1024;

  /// The maximum size of chunk that should be allocated. Allocations larger than this
  /// size will get their own individual chunk.
  static const int MAX_CHUNK_SIZE = 1024 * 1024;

  struct ChunkInfo {
    uint8_t* data;  // Owned by the ChunkInfo.
    int64_t size;   // in bytes

    /// bytes allocated via Allocate() in this chunk
    int64_t allocated_bytes;

    explicit ChunkInfo(int64_t size, uint8_t* buf);

    ChunkInfo() : data(nullptr), size(0), allocated_bytes(0) {}
  };

  /// chunk from which we served the last Allocate() call;
  /// always points to the last chunk that contains allocated data;
  /// chunks 0..current_chunk_idx_ are guaranteed to contain data
  /// (chunks_[i].allocated_bytes > 0 for i: 0..current_chunk_idx_);
  /// -1 if no chunks present
  int current_chunk_idx_;

  /// The size of the next chunk to allocate.
  int64_t next_chunk_size_;

  /// sum of allocated_bytes_
  int64_t total_allocated_bytes_;

  /// Maximum number of bytes allocated from this pool at one time.
  int64_t peak_allocated_bytes_;

  /// sum of all bytes allocated in chunks_
  int64_t total_reserved_bytes_;

  std::vector<ChunkInfo> chunks_;

  ::arrow::MemoryPool* pool_;

  /// Find or allocated a chunk with at least min_size spare capacity and update
  /// current_chunk_idx_. Also updates chunks_, chunk_sizes_ and allocated_bytes_
  /// if a new chunk needs to be created.
  bool FindChunk(int64_t min_size);

  /// Check integrity of the supporting data structures; always returns true but DCHECKs
  /// all invariants.
  /// If 'current_chunk_empty' is false, checks that the current chunk contains data.
  bool CheckIntegrity(bool current_chunk_empty);

  /// Return offset to unoccpied space in current chunk.
  int GetFreeOffset() const {
    if (current_chunk_idx_ == -1) return 0;
    return static_cast<int>(chunks_[current_chunk_idx_].allocated_bytes);
  }

  template <bool CHECK_LIMIT_FIRST>
  uint8_t* Allocate(int size);
};

// File input and output interfaces that translate arrow::Status to exceptions

class PARQUET_EXPORT FileInterface {
 public:
  virtual ~FileInterface() = default;

  // Close the file
  virtual void Close() = 0;

  // Return the current position in the file relative to the start
  virtual int64_t Tell() = 0;
};

/// It is the responsibility of implementations to mind threadsafety of shared
/// resources
class PARQUET_EXPORT RandomAccessSource : virtual public FileInterface {
 public:
  virtual ~RandomAccessSource() = default;

  virtual int64_t Size() const = 0;

  // Returns bytes read
  virtual int64_t Read(int64_t nbytes, uint8_t* out) = 0;

  virtual std::shared_ptr<Buffer> Read(int64_t nbytes) = 0;

  virtual std::shared_ptr<Buffer> ReadAt(int64_t position, int64_t nbytes) = 0;

  /// Returns bytes read
  virtual int64_t ReadAt(int64_t position, int64_t nbytes, uint8_t* out) = 0;
};

class PARQUET_EXPORT OutputStream : virtual public FileInterface {
 public:
  virtual ~OutputStream() = default;

  // Copy bytes into the output stream
  virtual void Write(const uint8_t* data, int64_t length) = 0;
};

class PARQUET_EXPORT ArrowFileMethods : virtual public FileInterface {
 public:
  // No-op. Closing the file is the responsibility of the owner of the handle
  void Close() override;

  int64_t Tell() override;

 protected:
  virtual ::arrow::io::FileInterface* file_interface() = 0;
};

/// This interface depends on the threadsafety of the underlying Arrow file interface
class PARQUET_EXPORT ArrowInputFile : public ArrowFileMethods, public RandomAccessSource {
 public:
  explicit ArrowInputFile(
      const std::shared_ptr<::arrow::io::ReadableFileInterface>& file);

  int64_t Size() const override;

  // Returns bytes read
  int64_t Read(int64_t nbytes, uint8_t* out) override;

  std::shared_ptr<Buffer> Read(int64_t nbytes) override;

  std::shared_ptr<Buffer> ReadAt(int64_t position, int64_t nbytes) override;

  /// Returns bytes read
  int64_t ReadAt(int64_t position, int64_t nbytes, uint8_t* out) override;

  std::shared_ptr<::arrow::io::ReadableFileInterface> file() const { return file_; }

  // Diamond inheritance
  using ArrowFileMethods::Close;
  using ArrowFileMethods::Tell;

 private:
  ::arrow::io::FileInterface* file_interface() override;
  std::shared_ptr<::arrow::io::ReadableFileInterface> file_;
};

class PARQUET_EXPORT ArrowOutputStream : public ArrowFileMethods, public OutputStream {
 public:
  explicit ArrowOutputStream(const std::shared_ptr<::arrow::io::OutputStream> file);

  // Copy bytes into the output stream
  void Write(const uint8_t* data, int64_t length) override;

  std::shared_ptr<::arrow::io::OutputStream> file() { return file_; }

  // Diamond inheritance
  using ArrowFileMethods::Close;
  using ArrowFileMethods::Tell;

 private:
  ::arrow::io::FileInterface* file_interface() override;
  std::shared_ptr<::arrow::io::OutputStream> file_;
};

class PARQUET_EXPORT InMemoryOutputStream : public OutputStream {
 public:
  explicit InMemoryOutputStream(
      ::arrow::MemoryPool* pool = ::arrow::default_memory_pool(),
      int64_t initial_capacity = kInMemoryDefaultCapacity);

  virtual ~InMemoryOutputStream();

  // Close is currently a no-op with the in-memory stream
  virtual void Close() {}

  virtual int64_t Tell();

  virtual void Write(const uint8_t* data, int64_t length);

  // Clears the stream
  void Clear() { size_ = 0; }

  // Get pointer to the underlying buffer
  const Buffer& GetBufferRef() const { return *buffer_; }

  // Return complete stream as Buffer
  std::shared_ptr<Buffer> GetBuffer();

 private:
  // Mutable pointer to the current write position in the stream
  uint8_t* Head();

  std::shared_ptr<ResizableBuffer> buffer_;
  int64_t size_;
  int64_t capacity_;

  DISALLOW_COPY_AND_ASSIGN(InMemoryOutputStream);
};

// ----------------------------------------------------------------------
// Streaming input interfaces

// Interface for the column reader to get the bytes. The interface is a stream
// interface, meaning the bytes in order and once a byte is read, it does not
// need to be read again.
class InputStream {
 public:
  // Returns the next 'num_to_peek' without advancing the current position.
  // *num_bytes will contain the number of bytes returned which can only be
  // less than num_to_peek at end of stream cases.
  // Since the position is not advanced, calls to this function are idempotent.
  // The buffer returned to the caller is still owned by the input stream and must
  // stay valid until the next call to Peek() or Read().
  virtual const uint8_t* Peek(int64_t num_to_peek, int64_t* num_bytes) = 0;

  // Identical to Peek(), except the current position in the stream is advanced by
  // *num_bytes.
  virtual const uint8_t* Read(int64_t num_to_read, int64_t* num_bytes) = 0;

  // Advance the stream without reading
  virtual void Advance(int64_t num_bytes) = 0;

  virtual ~InputStream() {}

 protected:
  InputStream() {}
};

// Implementation of an InputStream when all the bytes are in memory.
class PARQUET_EXPORT InMemoryInputStream : public InputStream {
 public:
  InMemoryInputStream(RandomAccessSource* source, int64_t start, int64_t end);
  explicit InMemoryInputStream(const std::shared_ptr<Buffer>& buffer);
  virtual const uint8_t* Peek(int64_t num_to_peek, int64_t* num_bytes);
  virtual const uint8_t* Read(int64_t num_to_read, int64_t* num_bytes);

  virtual void Advance(int64_t num_bytes);

 private:
  std::shared_ptr<Buffer> buffer_;
  int64_t len_;
  int64_t offset_;
};

// Implementation of an InputStream when only some of the bytes are in memory.
class PARQUET_EXPORT BufferedInputStream : public InputStream {
 public:
  BufferedInputStream(::arrow::MemoryPool* pool, int64_t buffer_size,
                      RandomAccessSource* source, int64_t start, int64_t end);
  virtual const uint8_t* Peek(int64_t num_to_peek, int64_t* num_bytes);
  virtual const uint8_t* Read(int64_t num_to_read, int64_t* num_bytes);

  virtual void Advance(int64_t num_bytes);

 private:
  std::shared_ptr<PoolBuffer> buffer_;
  RandomAccessSource* source_;
  int64_t stream_offset_;
  int64_t stream_end_;
  int64_t buffer_offset_;
  int64_t buffer_size_;
};

std::shared_ptr<PoolBuffer> PARQUET_EXPORT AllocateBuffer(::arrow::MemoryPool* pool,
                                                          int64_t size = 0);

std::unique_ptr<PoolBuffer> PARQUET_EXPORT AllocateUniqueBuffer(::arrow::MemoryPool* pool,
                                                                int64_t size = 0);

}  // namespace parquet

#endif  // PARQUET_UTIL_MEMORY_H
