// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

syntax = "proto3";

package google.cloud.pubsublite.v1;

import "google/api/annotations.proto";
import "google/api/client.proto";
import "google/api/field_behavior.proto";
import "google/cloud/pubsublite/v1/common.proto";

option cc_enable_arenas = true;
option csharp_namespace = "Google.Cloud.PubSubLite.V1";
option go_package = "cloud.google.com/go/pubsublite/apiv1/pubsublitepb;pubsublitepb";
option java_multiple_files = true;
option java_outer_classname = "SubscriberProto";
option java_package = "com.google.cloud.pubsublite.proto";
option php_namespace = "Google\\Cloud\\PubSubLite\\V1";
option ruby_package = "Google::Cloud::PubSubLite::V1";

// The service that a subscriber client application uses to receive messages
// from subscriptions.
service SubscriberService {
  option (google.api.default_host) = "pubsublite.googleapis.com";
  option (google.api.oauth_scopes) =
      "https://www.googleapis.com/auth/cloud-platform";

  // Establishes a stream with the server for receiving messages.
  rpc Subscribe(stream SubscribeRequest) returns (stream SubscribeResponse) {}
}

// The service that a subscriber client application uses to determine which
// partitions it should connect to.
service PartitionAssignmentService {
  option (google.api.default_host) = "pubsublite.googleapis.com";
  option (google.api.oauth_scopes) =
      "https://www.googleapis.com/auth/cloud-platform";

  // Assign partitions for this client to handle for the specified subscription.
  //
  // The client must send an InitialPartitionAssignmentRequest first.
  // The server will then send at most one unacknowledged PartitionAssignment
  // outstanding on the stream at a time.
  // The client should send a PartitionAssignmentAck after updating the
  // partitions it is connected to to reflect the new assignment.
  rpc AssignPartitions(stream PartitionAssignmentRequest)
      returns (stream PartitionAssignment) {}
}

// The first request that must be sent on a newly-opened stream. The client must
// wait for the response before sending subsequent requests on the stream.
message InitialSubscribeRequest {
  // The subscription from which to receive messages.
  string subscription = 1;

  // The partition from which to receive messages. Partitions are zero indexed,
  // so `partition` must be in the range [0, topic.num_partitions).
  int64 partition = 2;

  // Optional. Initial target location within the message backlog. If not set,
  // messages will be delivered from the commit cursor for the given
  // subscription and partition.
  SeekRequest initial_location = 4 [(google.api.field_behavior) = OPTIONAL];
}

// Response to an InitialSubscribeRequest.
message InitialSubscribeResponse {
  // The cursor from which the subscriber will start receiving messages once
  // flow control tokens become available.
  Cursor cursor = 1;
}

// Request to update the stream's delivery cursor based on the given target.
// Resets the server available tokens to 0. SeekRequests past head result in
// stream breakage.
//
// SeekRequests may not be sent while another SeekRequest is outstanding (i.e.,
// has not received a SeekResponse) on the same stream.
message SeekRequest {
  // A special target in the partition that takes no other parameters.
  enum NamedTarget {
    // Default value. This value is unused.
    NAMED_TARGET_UNSPECIFIED = 0;

    // A target corresponding to the most recently published message in the
    // partition.
    HEAD = 1;

    // A target corresponding to the committed cursor for the given subscription
    // and topic partition.
    COMMITTED_CURSOR = 2;
  }

  // The target to seek to. Must be set.
  oneof target {
    // A named target.
    NamedTarget named_target = 1;

    // A target corresponding to the cursor, pointing to anywhere in the
    // topic partition.
    Cursor cursor = 2;
  }
}

// Response to a SeekRequest.
message SeekResponse {
  // The new delivery cursor for the current stream.
  Cursor cursor = 1;
}

// Request to grant tokens to the server, requesting delivery of messages when
// they become available.
message FlowControlRequest {
  // The number of message tokens to grant. Must be greater than or equal to 0.
  int64 allowed_messages = 1;

  // The number of byte tokens to grant. Must be greater than or equal to 0.
  int64 allowed_bytes = 2;
}

// A request sent from the client to the server on a stream.
message SubscribeRequest {
  // The type of request this is.
  oneof request {
    // Initial request on the stream.
    InitialSubscribeRequest initial = 1;

    // Request to update the stream's delivery cursor.
    SeekRequest seek = 2;

    // Request to grant tokens to the server,
    FlowControlRequest flow_control = 3;
  }
}

// Response containing a list of messages. Upon delivering a MessageResponse to
// the client, the server:
// *  Updates the stream's delivery cursor to one greater than the cursor of the
//    last message in the list.
// *  Subtracts the total number of bytes and messages from the tokens available
//    to the server.
message MessageResponse {
  // Messages from the topic partition.
  repeated SequencedMessage messages = 1;
}

// Response to SubscribeRequest.
message SubscribeResponse {
  // The type of response this is.
  oneof response {
    // Initial response on the stream.
    InitialSubscribeResponse initial = 1;

    // Response to a Seek operation.
    SeekResponse seek = 2;

    // Response containing messages from the topic partition.
    MessageResponse messages = 3;
  }
}

// The first request that must be sent on a newly-opened stream. The client must
// wait for the response before sending subsequent requests on the stream.
message InitialPartitionAssignmentRequest {
  // The subscription name. Structured like:
  // projects/<project number>/locations/<zone name>/subscriptions/<subscription
  // id>
  string subscription = 1;

  // An opaque, unique client identifier. This field must be exactly 16 bytes
  // long and is interpreted as an unsigned 128 bit integer. Other size values
  // will be rejected and the stream will be failed with a non-retryable error.
  //
  // This field is large enough to fit a uuid from standard uuid algorithms like
  // uuid1 or uuid4, which should be used to generate this number. The same
  // identifier should be reused following disconnections with retryable stream
  // errors.
  bytes client_id = 2;
}

// PartitionAssignments should not race with acknowledgements. There
// should be exactly one unacknowledged PartitionAssignment at a time. If not,
// the client must break the stream.
message PartitionAssignment {
  // The list of partition numbers this subscriber is assigned to.
  repeated int64 partitions = 1;
}

// Acknowledge receipt and handling of the previous assignment.
// If not sent within a short period after receiving the assignment,
// partitions may remain unassigned for a period of time until the
// client is known to be inactive, after which time the server will break the
// stream.
message PartitionAssignmentAck {}

// A request on the PartitionAssignment stream.
message PartitionAssignmentRequest {
  // The type of request this is.
  oneof request {
    // Initial request on the stream.
    InitialPartitionAssignmentRequest initial = 1;

    // Acknowledgement of a partition assignment.
    PartitionAssignmentAck ack = 2;
  }
}
