// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

#include "parquet/arrow/writer.h"

#include <algorithm>
#include <string>
#include <vector>

#include "arrow/api.h"
#include "arrow/compute/api.h"
#include "arrow/util/bit-util.h"
#include "arrow/visitor_inline.h"

#include "parquet/arrow/schema.h"
#include "parquet/util/logging.h"

using arrow::Array;
using arrow::BinaryArray;
using arrow::BooleanArray;
using arrow::ChunkedArray;
using arrow::Decimal128Array;
using arrow::Field;
using arrow::FixedSizeBinaryArray;
using arrow::Int16Array;
using arrow::Int16Builder;
using arrow::ListArray;
using arrow::MemoryPool;
using arrow::NumericArray;
using arrow::PoolBuffer;
using arrow::PrimitiveArray;
using arrow::Status;
using arrow::Table;
using arrow::TimeUnit;

using arrow::compute::Cast;
using arrow::compute::CastOptions;
using arrow::compute::FunctionContext;

using parquet::ParquetFileWriter;
using parquet::ParquetVersion;
using parquet::schema::GroupNode;

namespace parquet {
namespace arrow {

namespace BitUtil = ::arrow::BitUtil;

std::shared_ptr<ArrowWriterProperties> default_arrow_writer_properties() {
  static std::shared_ptr<ArrowWriterProperties> default_writer_properties =
      ArrowWriterProperties::Builder().build();
  return default_writer_properties;
}

namespace {

class LevelBuilder {
 public:
  explicit LevelBuilder(MemoryPool* pool)
      : def_levels_(::arrow::int16(), pool), rep_levels_(::arrow::int16(), pool) {}

  Status VisitInline(const Array& array);

  template <typename T>
  typename std::enable_if<std::is_base_of<::arrow::FlatArray, T>::value, Status>::type
  Visit(const T& array) {
    array_offsets_.push_back(static_cast<int32_t>(array.offset()));
    valid_bitmaps_.push_back(array.null_bitmap_data());
    null_counts_.push_back(array.null_count());
    values_array_ = std::make_shared<T>(array.data());
    return Status::OK();
  }

  Status Visit(const ListArray& array) {
    array_offsets_.push_back(static_cast<int32_t>(array.offset()));
    valid_bitmaps_.push_back(array.null_bitmap_data());
    null_counts_.push_back(array.null_count());
    offsets_.push_back(array.raw_value_offsets());

    min_offset_idx_ = array.value_offset(min_offset_idx_);
    max_offset_idx_ = array.value_offset(max_offset_idx_);

    return VisitInline(*array.values());
  }

#define NOT_IMPLEMENTED_VISIT(ArrowTypePrefix)                             \
  Status Visit(const ::arrow::ArrowTypePrefix##Array& array) {             \
    return Status::NotImplemented("Level generation for " #ArrowTypePrefix \
                                  " not supported yet");                   \
  }

  NOT_IMPLEMENTED_VISIT(Struct)
  NOT_IMPLEMENTED_VISIT(Union)
  NOT_IMPLEMENTED_VISIT(Dictionary)

  Status GenerateLevels(const Array& array, const std::shared_ptr<Field>& field,
                        int64_t* values_offset, int64_t* num_values, int64_t* num_levels,
                        const std::shared_ptr<PoolBuffer>& def_levels_scratch,
                        std::shared_ptr<Buffer>* def_levels_out,
                        std::shared_ptr<Buffer>* rep_levels_out,
                        std::shared_ptr<Array>* values_array) {
    // Work downwards to extract bitmaps and offsets
    min_offset_idx_ = 0;
    max_offset_idx_ = array.length();
    RETURN_NOT_OK(VisitInline(array));
    *num_values = max_offset_idx_ - min_offset_idx_;
    *values_offset = min_offset_idx_;
    *values_array = values_array_;

    // Walk downwards to extract nullability
    std::shared_ptr<Field> current_field = field;
    nullable_.push_back(current_field->nullable());
    while (current_field->type()->num_children() > 0) {
      if (current_field->type()->num_children() > 1) {
        return Status::NotImplemented(
            "Fields with more than one child are not supported.");
      } else {
        current_field = current_field->type()->child(0);
      }
      nullable_.push_back(current_field->nullable());
    }

    // Generate the levels.
    if (nullable_.size() == 1) {
      // We have a PrimitiveArray
      *rep_levels_out = nullptr;
      if (nullable_[0]) {
        RETURN_NOT_OK(
            def_levels_scratch->Resize(array.length() * sizeof(int16_t), false));
        auto def_levels_ptr =
            reinterpret_cast<int16_t*>(def_levels_scratch->mutable_data());
        if (array.null_count() == 0) {
          std::fill(def_levels_ptr, def_levels_ptr + array.length(), 1);
        } else if (array.null_count() == array.length()) {
          std::fill(def_levels_ptr, def_levels_ptr + array.length(), 0);
        } else {
          ::arrow::internal::BitmapReader valid_bits_reader(
              array.null_bitmap_data(), array.offset(), array.length());
          for (int i = 0; i < array.length(); i++) {
            def_levels_ptr[i] = valid_bits_reader.IsSet() ? 1 : 0;
            valid_bits_reader.Next();
          }
        }

        *def_levels_out = def_levels_scratch;
      } else {
        *def_levels_out = nullptr;
      }
      *num_levels = array.length();
    } else {
      RETURN_NOT_OK(rep_levels_.Append(0));
      RETURN_NOT_OK(HandleListEntries(0, 0, 0, array.length()));

      std::shared_ptr<Array> def_levels_array;
      std::shared_ptr<Array> rep_levels_array;

      RETURN_NOT_OK(def_levels_.Finish(&def_levels_array));
      RETURN_NOT_OK(rep_levels_.Finish(&rep_levels_array));

      *def_levels_out = static_cast<PrimitiveArray*>(def_levels_array.get())->values();
      *rep_levels_out = static_cast<PrimitiveArray*>(rep_levels_array.get())->values();
      *num_levels = rep_levels_array->length();
    }

    return Status::OK();
  }

  Status HandleList(int16_t def_level, int16_t rep_level, int64_t index) {
    if (nullable_[rep_level]) {
      if (null_counts_[rep_level] == 0 ||
          BitUtil::GetBit(valid_bitmaps_[rep_level], index + array_offsets_[rep_level])) {
        return HandleNonNullList(static_cast<int16_t>(def_level + 1), rep_level, index);
      } else {
        return def_levels_.Append(def_level);
      }
    } else {
      return HandleNonNullList(def_level, rep_level, index);
    }
  }

  Status HandleNonNullList(int16_t def_level, int16_t rep_level, int64_t index) {
    int32_t inner_offset = offsets_[rep_level][index];
    int32_t inner_length = offsets_[rep_level][index + 1] - inner_offset;
    int64_t recursion_level = rep_level + 1;
    if (inner_length == 0) {
      return def_levels_.Append(def_level);
    }
    if (recursion_level < static_cast<int64_t>(offsets_.size())) {
      return HandleListEntries(static_cast<int16_t>(def_level + 1),
                               static_cast<int16_t>(rep_level + 1), inner_offset,
                               inner_length);
    } else {
      // We have reached the leaf: primitive list, handle remaining nullables
      for (int64_t i = 0; i < inner_length; i++) {
        if (i > 0) {
          RETURN_NOT_OK(rep_levels_.Append(static_cast<int16_t>(rep_level + 1)));
        }
        if (nullable_[recursion_level] &&
            ((null_counts_[recursion_level] == 0) ||
             BitUtil::GetBit(valid_bitmaps_[recursion_level],
                             inner_offset + i + array_offsets_[recursion_level]))) {
          RETURN_NOT_OK(def_levels_.Append(static_cast<int16_t>(def_level + 2)));
        } else {
          // This can be produced in two case:
          //  * elements are nullable and this one is null (i.e. max_def_level = def_level
          //  + 2)
          //  * elements are non-nullable (i.e. max_def_level = def_level + 1)
          RETURN_NOT_OK(def_levels_.Append(static_cast<int16_t>(def_level + 1)));
        }
      }
      return Status::OK();
    }
  }

  Status HandleListEntries(int16_t def_level, int16_t rep_level, int64_t offset,
                           int64_t length) {
    for (int64_t i = 0; i < length; i++) {
      if (i > 0) {
        RETURN_NOT_OK(rep_levels_.Append(rep_level));
      }
      RETURN_NOT_OK(HandleList(def_level, rep_level, offset + i));
    }
    return Status::OK();
  }

 private:
  Int16Builder def_levels_;
  Int16Builder rep_levels_;

  std::vector<int64_t> null_counts_;
  std::vector<const uint8_t*> valid_bitmaps_;
  std::vector<const int32_t*> offsets_;
  std::vector<int32_t> array_offsets_;
  std::vector<bool> nullable_;

  int64_t min_offset_idx_;
  int64_t max_offset_idx_;
  std::shared_ptr<Array> values_array_;
};

Status LevelBuilder::VisitInline(const Array& array) {
  return VisitArrayInline(array, this);
}

struct ColumnWriterContext {
  ColumnWriterContext(MemoryPool* memory_pool, ArrowWriterProperties* properties)
      : memory_pool(memory_pool), properties(properties) {
    this->data_buffer = std::make_shared<PoolBuffer>(memory_pool);
    this->def_levels_buffer = std::make_shared<PoolBuffer>(memory_pool);
  }

  template <typename T>
  Status GetScratchData(const int64_t num_values, T** out) {
    RETURN_NOT_OK(this->data_buffer->Resize(num_values * sizeof(T), false));
    *out = reinterpret_cast<T*>(this->data_buffer->mutable_data());
    return Status::OK();
  }

  MemoryPool* memory_pool;
  ArrowWriterProperties* properties;

  // Buffer used for storing the data of an array converted to the physical type
  // as expected by parquet-cpp.
  std::shared_ptr<PoolBuffer> data_buffer;

  // We use the shared ownership of this buffer
  std::shared_ptr<PoolBuffer> def_levels_buffer;
};

Status GetLeafType(const ::arrow::DataType& type, ::arrow::Type::type* leaf_type) {
  if (type.id() == ::arrow::Type::LIST || type.id() == ::arrow::Type::STRUCT) {
    if (type.num_children() != 1) {
      return Status::Invalid("Nested column branch had multiple children");
    }
    return GetLeafType(*type.child(0)->type(), leaf_type);
  } else {
    *leaf_type = type.id();
    return Status::OK();
  }
}

class ArrowColumnWriter {
 public:
  ArrowColumnWriter(ColumnWriterContext* ctx, ColumnWriter* column_writer,
                    const std::shared_ptr<Field>& field)
      : ctx_(ctx), writer_(column_writer), field_(field) {}

  Status Write(const Array& data);

  Status Write(const ChunkedArray& data, int64_t offset, const int64_t size) {
    int64_t absolute_position = 0;
    int chunk_index = 0;
    int64_t chunk_offset = 0;
    while (chunk_index < data.num_chunks() && absolute_position < offset) {
      const int64_t chunk_length = data.chunk(chunk_index)->length();
      if (absolute_position + chunk_length > offset) {
        // Relative offset into the chunk to reach the desired start offset for
        // writing
        chunk_offset = offset - absolute_position;
        break;
      } else {
        ++chunk_index;
        absolute_position += chunk_length;
      }
    }

    if (absolute_position >= data.length()) {
      return Status::Invalid("Cannot write data at offset past end of chunked array");
    }

    int64_t values_written = 0;
    while (values_written < size) {
      const Array& chunk = *data.chunk(chunk_index);
      const int64_t available_values = chunk.length() - chunk_offset;
      const int64_t chunk_write_size = std::min(size - values_written, available_values);

      // The chunk offset here will be 0 except for possibly the first chunk
      // because of the advancing logic above
      std::shared_ptr<Array> array_to_write = chunk.Slice(chunk_offset, chunk_write_size);
      RETURN_NOT_OK(Write(*array_to_write));

      if (chunk_write_size == available_values) {
        chunk_offset = 0;
        ++chunk_index;
      }
      values_written += chunk_write_size;
    }

    return Status::OK();
  }

  Status Close() {
    PARQUET_CATCH_NOT_OK(writer_->Close());
    return Status::OK();
  }

 private:
  template <typename ParquetType, typename ArrowType>
  Status TypedWriteBatch(const Array& data, int64_t num_levels, const int16_t* def_levels,
                         const int16_t* rep_levels);

  Status WriteTimestamps(const Array& data, int64_t num_levels, const int16_t* def_levels,
                         const int16_t* rep_levels);

  Status WriteTimestampsCoerce(const Array& data, int64_t num_levels,
                               const int16_t* def_levels, const int16_t* rep_levels);

  template <typename ParquetType, typename ArrowType>
  Status WriteNonNullableBatch(const ArrowType& type, int64_t num_values,
                               int64_t num_levels, const int16_t* def_levels,
                               const int16_t* rep_levels,
                               const typename ArrowType::c_type* values);

  template <typename ParquetType, typename ArrowType>
  Status WriteNullableBatch(const ArrowType& type, int64_t num_values, int64_t num_levels,
                            const int16_t* def_levels, const int16_t* rep_levels,
                            const uint8_t* valid_bits, int64_t valid_bits_offset,
                            const typename ArrowType::c_type* values);

  template <typename ParquetType>
  Status WriteBatch(int64_t num_levels, const int16_t* def_levels,
                    const int16_t* rep_levels,
                    const typename ParquetType::c_type* values) {
    auto typed_writer = static_cast<TypedColumnWriter<ParquetType>*>(writer_);
    PARQUET_CATCH_NOT_OK(
        typed_writer->WriteBatch(num_levels, def_levels, rep_levels, values));
    return Status::OK();
  }

  template <typename ParquetType>
  Status WriteBatchSpaced(int64_t num_levels, const int16_t* def_levels,
                          const int16_t* rep_levels, const uint8_t* valid_bits,
                          int64_t valid_bits_offset,
                          const typename ParquetType::c_type* values) {
    auto typed_writer = static_cast<TypedColumnWriter<ParquetType>*>(writer_);
    PARQUET_CATCH_NOT_OK(typed_writer->WriteBatchSpaced(
        num_levels, def_levels, rep_levels, valid_bits, valid_bits_offset, values));
    return Status::OK();
  }

  ColumnWriterContext* ctx_;
  ColumnWriter* writer_;
  std::shared_ptr<Field> field_;
};

template <typename ParquetType, typename ArrowType>
Status ArrowColumnWriter::TypedWriteBatch(const Array& array, int64_t num_levels,
                                          const int16_t* def_levels,
                                          const int16_t* rep_levels) {
  using ArrowCType = typename ArrowType::c_type;

  const auto& data = static_cast<const PrimitiveArray&>(array);
  auto values =
      reinterpret_cast<const ArrowCType*>(data.values()->data()) + data.offset();

  if (writer_->descr()->schema_node()->is_required() || (data.null_count() == 0)) {
    // no nulls, just dump the data
    RETURN_NOT_OK((WriteNonNullableBatch<ParquetType, ArrowType>(
        static_cast<const ArrowType&>(*array.type()), array.length(), num_levels,
        def_levels, rep_levels, values)));
  } else {
    const uint8_t* valid_bits = data.null_bitmap_data();
    RETURN_NOT_OK((WriteNullableBatch<ParquetType, ArrowType>(
        static_cast<const ArrowType&>(*array.type()), data.length(), num_levels,
        def_levels, rep_levels, valid_bits, data.offset(), values)));
  }
  return Status::OK();
}

template <typename ParquetType, typename ArrowType>
Status ArrowColumnWriter::WriteNonNullableBatch(
    const ArrowType& type, int64_t num_values, int64_t num_levels,
    const int16_t* def_levels, const int16_t* rep_levels,
    const typename ArrowType::c_type* values) {
  using ParquetCType = typename ParquetType::c_type;
  ParquetCType* buffer;
  RETURN_NOT_OK(ctx_->GetScratchData<ParquetCType>(num_values, &buffer));

  std::copy(values, values + num_values, buffer);

  return WriteBatch<ParquetType>(num_levels, def_levels, rep_levels, buffer);
}

template <>
Status ArrowColumnWriter::WriteNonNullableBatch<Int32Type, ::arrow::Date64Type>(
    const ::arrow::Date64Type& type, int64_t num_values, int64_t num_levels,
    const int16_t* def_levels, const int16_t* rep_levels, const int64_t* values) {
  int32_t* buffer;
  RETURN_NOT_OK(ctx_->GetScratchData<int32_t>(num_levels, &buffer));

  for (int i = 0; i < num_values; i++) {
    buffer[i] = static_cast<int32_t>(values[i] / 86400000);
  }

  return WriteBatch<Int32Type>(num_levels, def_levels, rep_levels, buffer);
}

template <>
Status ArrowColumnWriter::WriteNonNullableBatch<Int32Type, ::arrow::Time32Type>(
    const ::arrow::Time32Type& type, int64_t num_values, int64_t num_levels,
    const int16_t* def_levels, const int16_t* rep_levels, const int32_t* values) {
  int32_t* buffer;
  RETURN_NOT_OK(ctx_->GetScratchData<int32_t>(num_levels, &buffer));
  if (type.unit() == TimeUnit::SECOND) {
    for (int i = 0; i < num_values; i++) {
      buffer[i] = values[i] * 1000;
    }
  } else {
    std::copy(values, values + num_values, buffer);
  }
  return WriteBatch<Int32Type>(num_levels, def_levels, rep_levels, buffer);
}

#define NONNULLABLE_BATCH_FAST_PATH(ParquetType, ArrowType, CType)                 \
  template <>                                                                      \
  Status ArrowColumnWriter::WriteNonNullableBatch<ParquetType, ArrowType>(         \
      const ArrowType& type, int64_t num_values, int64_t num_levels,               \
      const int16_t* def_levels, const int16_t* rep_levels, const CType* buffer) { \
    return WriteBatch<ParquetType>(num_levels, def_levels, rep_levels, buffer);    \
  }

NONNULLABLE_BATCH_FAST_PATH(Int32Type, ::arrow::Int32Type, int32_t)
NONNULLABLE_BATCH_FAST_PATH(Int32Type, ::arrow::Date32Type, int32_t)
NONNULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::Int64Type, int64_t)
NONNULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::Time64Type, int64_t)
NONNULLABLE_BATCH_FAST_PATH(FloatType, ::arrow::FloatType, float)
NONNULLABLE_BATCH_FAST_PATH(DoubleType, ::arrow::DoubleType, double)

template <typename ParquetType, typename ArrowType>
Status ArrowColumnWriter::WriteNullableBatch(
    const ArrowType& type, int64_t num_values, int64_t num_levels,
    const int16_t* def_levels, const int16_t* rep_levels, const uint8_t* valid_bits,
    int64_t valid_bits_offset, const typename ArrowType::c_type* values) {
  using ParquetCType = typename ParquetType::c_type;

  ParquetCType* buffer;
  RETURN_NOT_OK(ctx_->GetScratchData<ParquetCType>(num_levels, &buffer));
  for (int i = 0; i < num_values; i++) {
    buffer[i] = static_cast<ParquetCType>(values[i]);
  }

  return WriteBatchSpaced<ParquetType>(num_levels, def_levels, rep_levels, valid_bits,
                                       valid_bits_offset, buffer);
}

template <>
Status ArrowColumnWriter::WriteNullableBatch<Int32Type, ::arrow::Date64Type>(
    const ::arrow::Date64Type& type, int64_t num_values, int64_t num_levels,
    const int16_t* def_levels, const int16_t* rep_levels, const uint8_t* valid_bits,
    int64_t valid_bits_offset, const int64_t* values) {
  int32_t* buffer;
  RETURN_NOT_OK(ctx_->GetScratchData<int32_t>(num_values, &buffer));

  for (int i = 0; i < num_values; i++) {
    // Convert from milliseconds into days since the epoch
    buffer[i] = static_cast<int32_t>(values[i] / 86400000);
  }

  return WriteBatchSpaced<Int32Type>(num_levels, def_levels, rep_levels, valid_bits,
                                     valid_bits_offset, buffer);
}

template <>
Status ArrowColumnWriter::WriteNullableBatch<Int32Type, ::arrow::Time32Type>(
    const ::arrow::Time32Type& type, int64_t num_values, int64_t num_levels,
    const int16_t* def_levels, const int16_t* rep_levels, const uint8_t* valid_bits,
    int64_t valid_bits_offset, const int32_t* values) {
  int32_t* buffer;
  RETURN_NOT_OK(ctx_->GetScratchData<int32_t>(num_values, &buffer));

  if (type.unit() == TimeUnit::SECOND) {
    for (int i = 0; i < num_values; i++) {
      buffer[i] = values[i] * 1000;
    }
  } else {
    for (int i = 0; i < num_values; i++) {
      buffer[i] = values[i];
    }
  }
  return WriteBatchSpaced<Int32Type>(num_levels, def_levels, rep_levels, valid_bits,
                                     valid_bits_offset, buffer);
}

#define NULLABLE_BATCH_FAST_PATH(ParquetType, ArrowType, CType)                          \
  template <>                                                                            \
  Status ArrowColumnWriter::WriteNullableBatch<ParquetType, ArrowType>(                  \
      const ArrowType& type, int64_t num_values, int64_t num_levels,                     \
      const int16_t* def_levels, const int16_t* rep_levels, const uint8_t* valid_bits,   \
      int64_t valid_bits_offset, const CType* values) {                                  \
    return WriteBatchSpaced<ParquetType>(num_levels, def_levels, rep_levels, valid_bits, \
                                         valid_bits_offset, values);                     \
  }

NULLABLE_BATCH_FAST_PATH(Int32Type, ::arrow::Int32Type, int32_t)
NULLABLE_BATCH_FAST_PATH(Int32Type, ::arrow::Date32Type, int32_t)
NULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::Int64Type, int64_t)
NULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::Time64Type, int64_t)
NULLABLE_BATCH_FAST_PATH(FloatType, ::arrow::FloatType, float)
NULLABLE_BATCH_FAST_PATH(DoubleType, ::arrow::DoubleType, double)
NULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::TimestampType, int64_t)
NONNULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::TimestampType, int64_t)

template <>
Status ArrowColumnWriter::WriteNullableBatch<Int96Type, ::arrow::TimestampType>(
    const ::arrow::TimestampType& type, int64_t num_values, int64_t num_levels,
    const int16_t* def_levels, const int16_t* rep_levels, const uint8_t* valid_bits,
    int64_t valid_bits_offset, const int64_t* values) {
  Int96* buffer;
  RETURN_NOT_OK(ctx_->GetScratchData<Int96>(num_values, &buffer));
  if (type.unit() == TimeUnit::NANO) {
    for (int i = 0; i < num_values; i++) {
      internal::NanosecondsToImpalaTimestamp(values[i], &buffer[i]);
    }
  } else {
    return Status::NotImplemented("Only NANO timestamps are supported for Int96 writing");
  }
  return WriteBatchSpaced<Int96Type>(num_levels, def_levels, rep_levels, valid_bits,
                                     valid_bits_offset, buffer);
}

template <>
Status ArrowColumnWriter::WriteNonNullableBatch<Int96Type, ::arrow::TimestampType>(
    const ::arrow::TimestampType& type, int64_t num_values, int64_t num_levels,
    const int16_t* def_levels, const int16_t* rep_levels, const int64_t* values) {
  Int96* buffer;
  RETURN_NOT_OK(ctx_->GetScratchData<Int96>(num_values, &buffer));
  if (type.unit() == TimeUnit::NANO) {
    for (int i = 0; i < num_values; i++) {
      internal::NanosecondsToImpalaTimestamp(values[i], buffer + i);
    }
  } else {
    return Status::NotImplemented("Only NANO timestamps are supported for Int96 writing");
  }
  return WriteBatch<Int96Type>(num_levels, def_levels, rep_levels, buffer);
}

Status ArrowColumnWriter::WriteTimestamps(const Array& values, int64_t num_levels,
                                          const int16_t* def_levels,
                                          const int16_t* rep_levels) {
  const auto& type = static_cast<const ::arrow::TimestampType&>(*values.type());

  const bool is_nanosecond = type.unit() == TimeUnit::NANO;

  if (is_nanosecond && ctx_->properties->support_deprecated_int96_timestamps()) {
    return TypedWriteBatch<Int96Type, ::arrow::TimestampType>(values, num_levels,
                                                              def_levels, rep_levels);
  } else if (is_nanosecond ||
             (ctx_->properties->coerce_timestamps_enabled() &&
              (type.unit() != ctx_->properties->coerce_timestamps_unit()))) {
    // Casting is required. This covers several cases
    // * Nanoseconds -> cast to microseconds
    // * coerce_timestamps_enabled_, cast all timestamps to requested unit
    return WriteTimestampsCoerce(values, num_levels, def_levels, rep_levels);
  } else {
    // No casting of timestamps is required, take the fast path
    return TypedWriteBatch<Int64Type, ::arrow::TimestampType>(values, num_levels,
                                                              def_levels, rep_levels);
  }
}

Status ArrowColumnWriter::WriteTimestampsCoerce(const Array& array, int64_t num_levels,
                                                const int16_t* def_levels,
                                                const int16_t* rep_levels) {
  int64_t* buffer;
  RETURN_NOT_OK(ctx_->GetScratchData<int64_t>(num_levels, &buffer));

  const auto& data = static_cast<const ::arrow::TimestampArray&>(array);

  auto values = data.raw_values();
  const auto& type = static_cast<const ::arrow::TimestampType&>(*array.type());

  TimeUnit::type target_unit = ctx_->properties->coerce_timestamps_enabled()
                                   ? ctx_->properties->coerce_timestamps_unit()
                                   : TimeUnit::MICRO;
  auto target_type = ::arrow::timestamp(target_unit);

  auto DivideBy = [&](const int64_t factor) {
    for (int64_t i = 0; i < array.length(); i++) {
      if (!data.IsNull(i) && (values[i] % factor != 0)) {
        std::stringstream ss;
        ss << "Casting from " << type.ToString() << " to " << target_type->ToString()
           << " would lose data: " << values[i];
        return Status::Invalid(ss.str());
      }
      buffer[i] = values[i] / factor;
    }
    return Status::OK();
  };

  auto MultiplyBy = [&](const int64_t factor) {
    for (int64_t i = 0; i < array.length(); i++) {
      buffer[i] = values[i] * factor;
    }
    return Status::OK();
  };

  if (type.unit() == TimeUnit::NANO) {
    if (target_unit == TimeUnit::MICRO) {
      RETURN_NOT_OK(DivideBy(1000));
    } else {
      DCHECK_EQ(TimeUnit::MILLI, target_unit);
      RETURN_NOT_OK(DivideBy(1000000));
    }
  } else if (type.unit() == TimeUnit::SECOND) {
    RETURN_NOT_OK(MultiplyBy(target_unit == TimeUnit::MICRO ? 1000000 : 1000));
  } else if (type.unit() == TimeUnit::MILLI) {
    DCHECK_EQ(TimeUnit::MICRO, target_unit);
    RETURN_NOT_OK(MultiplyBy(1000));
  } else {
    DCHECK_EQ(TimeUnit::MILLI, target_unit);
    RETURN_NOT_OK(DivideBy(1000));
  }

  if (writer_->descr()->schema_node()->is_required() || (data.null_count() == 0)) {
    // no nulls, just dump the data
    RETURN_NOT_OK((WriteNonNullableBatch<Int64Type, ::arrow::TimestampType>(
        static_cast<const ::arrow::TimestampType&>(*target_type), array.length(),
        num_levels, def_levels, rep_levels, buffer)));
  } else {
    const uint8_t* valid_bits = data.null_bitmap_data();
    RETURN_NOT_OK((WriteNullableBatch<Int64Type, ::arrow::TimestampType>(
        static_cast<const ::arrow::TimestampType&>(*target_type), array.length(),
        num_levels, def_levels, rep_levels, valid_bits, data.offset(), buffer)));
  }
  return Status::OK();
}

// This specialization seems quite similar but it significantly differs in two points:
// * offset is added at the most latest time to the pointer as we have sub-byte access
// * Arrow data is stored bitwise thus we cannot use std::copy to transform from
//   ArrowType::c_type to ParquetType::c_type

template <>
Status ArrowColumnWriter::TypedWriteBatch<BooleanType, ::arrow::BooleanType>(
    const Array& array, int64_t num_levels, const int16_t* def_levels,
    const int16_t* rep_levels) {
  bool* buffer;
  RETURN_NOT_OK(ctx_->GetScratchData<bool>(array.length(), &buffer));

  const auto& data = static_cast<const BooleanArray&>(array);
  auto values = reinterpret_cast<const uint8_t*>(data.values()->data());

  int buffer_idx = 0;
  int64_t offset = array.offset();
  for (int i = 0; i < data.length(); i++) {
    if (!data.IsNull(i)) {
      buffer[buffer_idx++] = BitUtil::GetBit(values, offset + i);
    }
  }

  return WriteBatch<BooleanType>(num_levels, def_levels, rep_levels, buffer);
}

template <>
Status ArrowColumnWriter::TypedWriteBatch<Int32Type, ::arrow::NullType>(
    const Array& array, int64_t num_levels, const int16_t* def_levels,
    const int16_t* rep_levels) {
  return WriteBatch<Int32Type>(num_levels, def_levels, rep_levels, nullptr);
}

template <>
Status ArrowColumnWriter::TypedWriteBatch<ByteArrayType, ::arrow::BinaryType>(
    const Array& array, int64_t num_levels, const int16_t* def_levels,
    const int16_t* rep_levels) {
  ByteArray* buffer;
  RETURN_NOT_OK(ctx_->GetScratchData<ByteArray>(num_levels, &buffer));

  const auto& data = static_cast<const BinaryArray&>(array);

  // In the case of an array consisting of only empty strings or all null,
  // data.data() points already to a nullptr, thus data.data()->data() will
  // segfault.
  const uint8_t* values = nullptr;
  if (data.value_data()) {
    values = reinterpret_cast<const uint8_t*>(data.value_data()->data());
    DCHECK(values != nullptr);
  }

  // Slice offset is accounted for in raw_value_offsets
  const int32_t* value_offset = data.raw_value_offsets();

  if (writer_->descr()->schema_node()->is_required() || (data.null_count() == 0)) {
    // no nulls, just dump the data
    for (int64_t i = 0; i < data.length(); i++) {
      buffer[i] =
          ByteArray(value_offset[i + 1] - value_offset[i], values + value_offset[i]);
    }
  } else {
    int buffer_idx = 0;
    for (int64_t i = 0; i < data.length(); i++) {
      if (!data.IsNull(i)) {
        buffer[buffer_idx++] =
            ByteArray(value_offset[i + 1] - value_offset[i], values + value_offset[i]);
      }
    }
  }

  return WriteBatch<ByteArrayType>(num_levels, def_levels, rep_levels, buffer);
}

template <>
Status ArrowColumnWriter::TypedWriteBatch<FLBAType, ::arrow::FixedSizeBinaryType>(
    const Array& array, int64_t num_levels, const int16_t* def_levels,
    const int16_t* rep_levels) {
  const auto& data = static_cast<const FixedSizeBinaryArray&>(array);
  const int64_t length = data.length();

  FLBA* buffer;
  RETURN_NOT_OK(ctx_->GetScratchData<FLBA>(num_levels, &buffer));

  if (writer_->descr()->schema_node()->is_required() || data.null_count() == 0) {
    // no nulls, just dump the data
    // todo(advancedxy): use a writeBatch to avoid this step
    for (int64_t i = 0; i < length; i++) {
      buffer[i] = FixedLenByteArray(data.GetValue(i));
    }
  } else {
    int buffer_idx = 0;
    for (int64_t i = 0; i < length; i++) {
      if (!data.IsNull(i)) {
        buffer[buffer_idx++] = FixedLenByteArray(data.GetValue(i));
      }
    }
  }

  return WriteBatch<FLBAType>(num_levels, def_levels, rep_levels, buffer);
}

template <>
Status ArrowColumnWriter::TypedWriteBatch<FLBAType, ::arrow::Decimal128Type>(
    const Array& array, int64_t num_levels, const int16_t* def_levels,
    const int16_t* rep_levels) {
  const auto& data = static_cast<const Decimal128Array&>(array);
  const int64_t length = data.length();

  FLBA* buffer;
  RETURN_NOT_OK(ctx_->GetScratchData<FLBA>(num_levels, &buffer));

  const auto& decimal_type = static_cast<const ::arrow::Decimal128Type&>(*data.type());
  const int32_t offset =
      decimal_type.byte_width() - DecimalSize(decimal_type.precision());

  const bool does_not_have_nulls =
      writer_->descr()->schema_node()->is_required() || data.null_count() == 0;

  // TODO(phillipc): This is potentially very wasteful if we have a lot of nulls
  std::vector<uint64_t> big_endian_values(static_cast<size_t>(length) * 2);

  // TODO(phillipc): Look into whether our compilers will perform loop unswitching so we
  // don't have to keep writing two loops to handle the case where we know there are no
  // nulls
  if (does_not_have_nulls) {
    // no nulls, just dump the data
    // todo(advancedxy): use a writeBatch to avoid this step
    for (int64_t i = 0, j = 0; i < length; ++i, j += 2) {
      auto unsigned_64_bit = reinterpret_cast<const uint64_t*>(data.GetValue(i));
      big_endian_values[j] = ::arrow::BitUtil::ToBigEndian(unsigned_64_bit[1]);
      big_endian_values[j + 1] = ::arrow::BitUtil::ToBigEndian(unsigned_64_bit[0]);
      buffer[i] = FixedLenByteArray(
          reinterpret_cast<const uint8_t*>(&big_endian_values[j]) + offset);
    }
  } else {
    for (int64_t i = 0, buffer_idx = 0, j = 0; i < length; ++i) {
      if (!data.IsNull(i)) {
        auto unsigned_64_bit = reinterpret_cast<const uint64_t*>(data.GetValue(i));
        big_endian_values[j] = ::arrow::BitUtil::ToBigEndian(unsigned_64_bit[1]);
        big_endian_values[j + 1] = ::arrow::BitUtil::ToBigEndian(unsigned_64_bit[0]);
        buffer[buffer_idx++] = FixedLenByteArray(
            reinterpret_cast<const uint8_t*>(&big_endian_values[j]) + offset);
        j += 2;
      }
    }
  }

  return WriteBatch<FLBAType>(num_levels, def_levels, rep_levels, buffer);
}

Status ArrowColumnWriter::Write(const Array& data) {
  ::arrow::Type::type values_type;
  RETURN_NOT_OK(GetLeafType(*data.type(), &values_type));

  std::shared_ptr<Array> _values_array;
  int64_t values_offset;
  int64_t num_levels;
  int64_t num_values;
  LevelBuilder level_builder(ctx_->memory_pool);

  std::shared_ptr<Buffer> def_levels_buffer, rep_levels_buffer;
  RETURN_NOT_OK(level_builder.GenerateLevels(
      data, field_, &values_offset, &num_values, &num_levels, ctx_->def_levels_buffer,
      &def_levels_buffer, &rep_levels_buffer, &_values_array));
  const int16_t* def_levels = nullptr;
  if (def_levels_buffer) {
    def_levels = reinterpret_cast<const int16_t*>(def_levels_buffer->data());
  }
  const int16_t* rep_levels = nullptr;
  if (rep_levels_buffer) {
    rep_levels = reinterpret_cast<const int16_t*>(rep_levels_buffer->data());
  }
  std::shared_ptr<Array> values_array = _values_array->Slice(values_offset, num_values);

#define WRITE_BATCH_CASE(ArrowEnum, ArrowType, ParquetType)                            \
  case ::arrow::Type::ArrowEnum:                                                       \
    return TypedWriteBatch<ParquetType, ::arrow::ArrowType>(*values_array, num_levels, \
                                                            def_levels, rep_levels);

  switch (values_type) {
    case ::arrow::Type::UINT32: {
      if (writer_->properties()->version() == ParquetVersion::PARQUET_1_0) {
        // Parquet 1.0 reader cannot read the UINT_32 logical type. Thus we need
        // to use the larger Int64Type to store them lossless.
        return TypedWriteBatch<Int64Type, ::arrow::UInt32Type>(*values_array, num_levels,
                                                               def_levels, rep_levels);
      } else {
        return TypedWriteBatch<Int32Type, ::arrow::UInt32Type>(*values_array, num_levels,
                                                               def_levels, rep_levels);
      }
    }
      WRITE_BATCH_CASE(NA, NullType, Int32Type)
    case ::arrow::Type::TIMESTAMP:
      return WriteTimestamps(*values_array, num_levels, def_levels, rep_levels);
      WRITE_BATCH_CASE(BOOL, BooleanType, BooleanType)
      WRITE_BATCH_CASE(INT8, Int8Type, Int32Type)
      WRITE_BATCH_CASE(UINT8, UInt8Type, Int32Type)
      WRITE_BATCH_CASE(INT16, Int16Type, Int32Type)
      WRITE_BATCH_CASE(UINT16, UInt16Type, Int32Type)
      WRITE_BATCH_CASE(INT32, Int32Type, Int32Type)
      WRITE_BATCH_CASE(INT64, Int64Type, Int64Type)
      WRITE_BATCH_CASE(UINT64, UInt64Type, Int64Type)
      WRITE_BATCH_CASE(FLOAT, FloatType, FloatType)
      WRITE_BATCH_CASE(DOUBLE, DoubleType, DoubleType)
      WRITE_BATCH_CASE(BINARY, BinaryType, ByteArrayType)
      WRITE_BATCH_CASE(STRING, BinaryType, ByteArrayType)
      WRITE_BATCH_CASE(FIXED_SIZE_BINARY, FixedSizeBinaryType, FLBAType)
      WRITE_BATCH_CASE(DECIMAL, Decimal128Type, FLBAType)
      WRITE_BATCH_CASE(DATE32, Date32Type, Int32Type)
      WRITE_BATCH_CASE(DATE64, Date64Type, Int32Type)
      WRITE_BATCH_CASE(TIME32, Time32Type, Int32Type)
      WRITE_BATCH_CASE(TIME64, Time64Type, Int64Type)
    default:
      break;
  }
  std::stringstream ss;
  ss << "Data type not supported as list value: " << values_array->type()->ToString();
  return Status::NotImplemented(ss.str());
}

}  // namespace

// ----------------------------------------------------------------------
// FileWriter implementation

class FileWriter::Impl {
 public:
  Impl(MemoryPool* pool, std::unique_ptr<ParquetFileWriter> writer,
       const std::shared_ptr<ArrowWriterProperties>& arrow_properties)
      : writer_(std::move(writer)),
        row_group_writer_(nullptr),
        column_write_context_(pool, arrow_properties.get()),
        arrow_properties_(arrow_properties),
        closed_(false) {}

  Status NewRowGroup(int64_t chunk_size) {
    if (row_group_writer_ != nullptr) {
      PARQUET_CATCH_NOT_OK(row_group_writer_->Close());
    }
    PARQUET_CATCH_NOT_OK(row_group_writer_ = writer_->AppendRowGroup());
    return Status::OK();
  }

  Status Close() {
    if (!closed_) {
      // Make idempotent
      closed_ = true;
      if (row_group_writer_ != nullptr) {
        PARQUET_CATCH_NOT_OK(row_group_writer_->Close());
      }
      PARQUET_CATCH_NOT_OK(writer_->Close());
    }
    return Status::OK();
  }

  Status WriteColumnChunk(const Array& data) {
    // A bit awkward here since cannot instantiate ChunkedArray from const Array&
    ::arrow::ArrayVector chunks = {::arrow::MakeArray(data.data())};
    auto chunked_array = std::make_shared<::arrow::ChunkedArray>(chunks);
    return WriteColumnChunk(chunked_array, 0, data.length());
  }

  Status WriteColumnChunk(const std::shared_ptr<ChunkedArray>& data, int64_t offset,
                          const int64_t size) {
    // DictionaryArrays are not yet handled with a fast path. To still support
    // writing them as a workaround, we convert them back to their non-dictionary
    // representation.
    if (data->type()->id() == ::arrow::Type::DICTIONARY) {
      const ::arrow::DictionaryType& dict_type =
          static_cast<const ::arrow::DictionaryType&>(*data->type());

      // TODO(ARROW-1648): Remove this special handling once we require an Arrow
      // version that has this fixed.
      if (dict_type.dictionary()->type()->id() == ::arrow::Type::NA) {
        auto null_array = std::make_shared<::arrow::NullArray>(data->length());
        return WriteColumnChunk(*null_array);
      }

      FunctionContext ctx(this->memory_pool());
      ::arrow::compute::Datum cast_input(data);
      ::arrow::compute::Datum cast_output;
      RETURN_NOT_OK(Cast(&ctx, cast_input, dict_type.dictionary()->type(), CastOptions(),
                         &cast_output));
      return WriteColumnChunk(cast_output.chunked_array(), 0, data->length());
    }

    ColumnWriter* column_writer;
    PARQUET_CATCH_NOT_OK(column_writer = row_group_writer_->NextColumn());

    // TODO(wesm): This trick to construct a schema for one Parquet root node
    // will not work for arbitrary nested data
    int current_column_idx = row_group_writer_->current_column();
    std::shared_ptr<::arrow::Schema> arrow_schema;
    RETURN_NOT_OK(FromParquetSchema(writer_->schema(), {current_column_idx - 1},
                                    writer_->key_value_metadata(), &arrow_schema));

    ArrowColumnWriter arrow_writer(&column_write_context_, column_writer,
                                   arrow_schema->field(0));

    RETURN_NOT_OK(arrow_writer.Write(*data, offset, size));
    return arrow_writer.Close();
  }

  const WriterProperties& properties() const { return *writer_->properties(); }

  ::arrow::MemoryPool* memory_pool() const { return column_write_context_.memory_pool; }

  virtual ~Impl() {}

 private:
  friend class FileWriter;

  std::unique_ptr<ParquetFileWriter> writer_;
  RowGroupWriter* row_group_writer_;
  ColumnWriterContext column_write_context_;
  std::shared_ptr<ArrowWriterProperties> arrow_properties_;
  bool closed_;
};

Status FileWriter::NewRowGroup(int64_t chunk_size) {
  return impl_->NewRowGroup(chunk_size);
}

Status FileWriter::WriteColumnChunk(const ::arrow::Array& data) {
  return impl_->WriteColumnChunk(data);
}

Status FileWriter::WriteColumnChunk(const std::shared_ptr<::arrow::ChunkedArray>& data,
                                    const int64_t offset, const int64_t size) {
  return impl_->WriteColumnChunk(data, offset, size);
}

Status FileWriter::WriteColumnChunk(const std::shared_ptr<::arrow::ChunkedArray>& data) {
  return WriteColumnChunk(data, 0, data->length());
}

Status FileWriter::Close() { return impl_->Close(); }

MemoryPool* FileWriter::memory_pool() const { return impl_->memory_pool(); }

FileWriter::~FileWriter() {}

FileWriter::FileWriter(MemoryPool* pool, std::unique_ptr<ParquetFileWriter> writer,
                       const std::shared_ptr<ArrowWriterProperties>& arrow_properties)
    : impl_(new FileWriter::Impl(pool, std::move(writer), arrow_properties)) {}

Status FileWriter::Open(const ::arrow::Schema& schema, ::arrow::MemoryPool* pool,
                        const std::shared_ptr<OutputStream>& sink,
                        const std::shared_ptr<WriterProperties>& properties,
                        std::unique_ptr<FileWriter>* writer) {
  return Open(schema, pool, sink, properties, default_arrow_writer_properties(), writer);
}

Status FileWriter::Open(const ::arrow::Schema& schema, ::arrow::MemoryPool* pool,
                        const std::shared_ptr<OutputStream>& sink,
                        const std::shared_ptr<WriterProperties>& properties,
                        const std::shared_ptr<ArrowWriterProperties>& arrow_properties,
                        std::unique_ptr<FileWriter>* writer) {
  std::shared_ptr<SchemaDescriptor> parquet_schema;
  RETURN_NOT_OK(
      ToParquetSchema(&schema, *properties, *arrow_properties, &parquet_schema));

  auto schema_node = std::static_pointer_cast<GroupNode>(parquet_schema->schema_root());

  std::unique_ptr<ParquetFileWriter> base_writer =
      ParquetFileWriter::Open(sink, schema_node, properties, schema.metadata());

  writer->reset(new FileWriter(pool, std::move(base_writer), arrow_properties));
  return Status::OK();
}

Status FileWriter::Open(const ::arrow::Schema& schema, ::arrow::MemoryPool* pool,
                        const std::shared_ptr<::arrow::io::OutputStream>& sink,
                        const std::shared_ptr<WriterProperties>& properties,
                        std::unique_ptr<FileWriter>* writer) {
  auto wrapper = std::make_shared<ArrowOutputStream>(sink);
  return Open(schema, pool, wrapper, properties, writer);
}

Status FileWriter::Open(const ::arrow::Schema& schema, ::arrow::MemoryPool* pool,
                        const std::shared_ptr<::arrow::io::OutputStream>& sink,
                        const std::shared_ptr<WriterProperties>& properties,
                        const std::shared_ptr<ArrowWriterProperties>& arrow_properties,
                        std::unique_ptr<FileWriter>* writer) {
  auto wrapper = std::make_shared<ArrowOutputStream>(sink);
  return Open(schema, pool, wrapper, properties, arrow_properties, writer);
}

namespace {}  // namespace

Status FileWriter::WriteTable(const Table& table, int64_t chunk_size) {
  if (chunk_size <= 0) {
    return Status::Invalid("chunk size per row_group must be greater than 0");
  } else if (chunk_size > impl_->properties().max_row_group_length()) {
    chunk_size = impl_->properties().max_row_group_length();
  }

  for (int chunk = 0; chunk * chunk_size < table.num_rows(); chunk++) {
    int64_t offset = chunk * chunk_size;
    int64_t size = std::min(chunk_size, table.num_rows() - offset);

    RETURN_NOT_OK_ELSE(NewRowGroup(size), PARQUET_IGNORE_NOT_OK(Close()));
    for (int i = 0; i < table.num_columns(); i++) {
      auto chunked_data = table.column(i)->data();
      RETURN_NOT_OK_ELSE(WriteColumnChunk(chunked_data, offset, size),
                         PARQUET_IGNORE_NOT_OK(Close()));
    }
  }
  return Status::OK();
}

Status WriteTable(const ::arrow::Table& table, ::arrow::MemoryPool* pool,
                  const std::shared_ptr<OutputStream>& sink, int64_t chunk_size,
                  const std::shared_ptr<WriterProperties>& properties,
                  const std::shared_ptr<ArrowWriterProperties>& arrow_properties) {
  std::unique_ptr<FileWriter> writer;
  RETURN_NOT_OK(FileWriter::Open(*table.schema(), pool, sink, properties,
                                 arrow_properties, &writer));
  RETURN_NOT_OK(writer->WriteTable(table, chunk_size));
  return writer->Close();
}

Status WriteTable(const ::arrow::Table& table, ::arrow::MemoryPool* pool,
                  const std::shared_ptr<::arrow::io::OutputStream>& sink,
                  int64_t chunk_size, const std::shared_ptr<WriterProperties>& properties,
                  const std::shared_ptr<ArrowWriterProperties>& arrow_properties) {
  auto wrapper = std::make_shared<ArrowOutputStream>(sink);
  return WriteTable(table, pool, wrapper, chunk_size, properties, arrow_properties);
}

}  // namespace arrow
}  // namespace parquet
