/*
 *
 * Copyright 2015 gRPC authors.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 */

#ifndef GRPC_CORE_LIB_TRANSPORT_BYTE_STREAM_H
#define GRPC_CORE_LIB_TRANSPORT_BYTE_STREAM_H

#include <grpc/support/port_platform.h>

#include <grpc/slice_buffer.h>
#include "src/core/lib/gprpp/abstract.h"
#include "src/core/lib/gprpp/orphanable.h"
#include "src/core/lib/iomgr/closure.h"

/** Internal bit flag for grpc_begin_message's \a flags signaling the use of
 * compression for the message */
#define GRPC_WRITE_INTERNAL_COMPRESS (0x80000000u)
/** Mask of all valid internal flags. */
#define GRPC_WRITE_INTERNAL_USED_MASK (GRPC_WRITE_INTERNAL_COMPRESS)

namespace grpc_core {

class ByteStream : public Orphanable {
 public:
  virtual ~ByteStream() {}

  // Returns true if the bytes are available immediately (in which case
  // on_complete will not be called), or false if the bytes will be available
  // asynchronously (in which case on_complete will be called when they
  // are available).
  //
  // max_size_hint can be set as a hint as to the maximum number
  // of bytes that would be acceptable to read.
  virtual bool Next(size_t max_size_hint,
                    grpc_closure* on_complete) GRPC_ABSTRACT;

  // Returns the next slice in the byte stream when it is available, as
  // indicated by Next().
  //
  // Once a slice is returned into *slice, it is owned by the caller.
  virtual grpc_error* Pull(grpc_slice* slice) GRPC_ABSTRACT;

  // Shuts down the byte stream.
  //
  // If there is a pending call to on_complete from Next(), it will be
  // invoked with the error passed to Shutdown().
  //
  // The next call to Pull() (if any) will return the error passed to
  // Shutdown().
  virtual void Shutdown(grpc_error* error) GRPC_ABSTRACT;

  uint32_t length() const { return length_; }
  uint32_t flags() const { return flags_; }

  void set_flags(uint32_t flags) { flags_ = flags; }

  GRPC_ABSTRACT_BASE_CLASS

 protected:
  ByteStream(uint32_t length, uint32_t flags)
      : length_(length), flags_(flags) {}

 private:
  const uint32_t length_;
  uint32_t flags_;
};

//
// SliceBufferByteStream
//
// A ByteStream that wraps a slice buffer.
//

class SliceBufferByteStream : public ByteStream {
 public:
  // Removes all slices in slice_buffer, leaving it empty.
  SliceBufferByteStream(grpc_slice_buffer* slice_buffer, uint32_t flags);

  ~SliceBufferByteStream();

  void Orphan() override;

  bool Next(size_t max_size_hint, grpc_closure* on_complete) override;
  grpc_error* Pull(grpc_slice* slice) override;
  void Shutdown(grpc_error* error) override;

 private:
  grpc_error* shutdown_error_ = GRPC_ERROR_NONE;
  grpc_slice_buffer backing_buffer_;
};

//
// CachingByteStream
//
// A ByteStream that that wraps an underlying byte stream but caches
// the resulting slices in a slice buffer.  If an initial attempt fails
// without fully draining the underlying stream, a new caching stream
// can be created from the same underlying cache, in which case it will
// return whatever is in the backing buffer before continuing to read the
// underlying stream.
//
// NOTE: No synchronization is done, so it is not safe to have multiple
// CachingByteStreams simultaneously drawing from the same underlying
// ByteStreamCache at the same time.
//

class ByteStreamCache {
 public:
  class CachingByteStream : public ByteStream {
   public:
    explicit CachingByteStream(ByteStreamCache* cache);

    ~CachingByteStream();

    void Orphan() override;

    bool Next(size_t max_size_hint, grpc_closure* on_complete) override;
    grpc_error* Pull(grpc_slice* slice) override;
    void Shutdown(grpc_error* error) override;

    // Resets the byte stream to the start of the underlying stream.
    void Reset();

   private:
    ByteStreamCache* cache_;
    size_t cursor_ = 0;
    size_t offset_ = 0;
    grpc_error* shutdown_error_ = GRPC_ERROR_NONE;
  };

  explicit ByteStreamCache(OrphanablePtr<ByteStream> underlying_stream);

  ~ByteStreamCache();

  // Must not be destroyed while still in use by a CachingByteStream.
  void Destroy();

  grpc_slice_buffer* cache_buffer() { return &cache_buffer_; }

 private:
  OrphanablePtr<ByteStream> underlying_stream_;
  uint32_t length_;
  uint32_t flags_;
  grpc_slice_buffer cache_buffer_;
};

}  // namespace grpc_core

#endif /* GRPC_CORE_LIB_TRANSPORT_BYTE_STREAM_H */
