// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

syntax = "proto3";

package google.cloud.pubsublite.v1;

import "google/api/annotations.proto";
import "google/api/client.proto";
import "google/api/field_behavior.proto";
import "google/api/resource.proto";
import "google/cloud/pubsublite/v1/common.proto";

option cc_enable_arenas = true;
option csharp_namespace = "Google.Cloud.PubSubLite.V1";
option go_package = "cloud.google.com/go/pubsublite/apiv1/pubsublitepb;pubsublitepb";
option java_multiple_files = true;
option java_outer_classname = "CursorProto";
option java_package = "com.google.cloud.pubsublite.proto";
option php_namespace = "Google\\Cloud\\PubSubLite\\V1";
option ruby_package = "Google::Cloud::PubSubLite::V1";

// The service that a subscriber client application uses to manage committed
// cursors while receiving messsages. A cursor represents a subscriber's
// progress within a topic partition for a given subscription.
service CursorService {
  option (google.api.default_host) = "pubsublite.googleapis.com";
  option (google.api.oauth_scopes) =
      "https://www.googleapis.com/auth/cloud-platform";

  // Establishes a stream with the server for managing committed cursors.
  rpc StreamingCommitCursor(stream StreamingCommitCursorRequest)
      returns (stream StreamingCommitCursorResponse) {}

  // Updates the committed cursor.
  rpc CommitCursor(CommitCursorRequest) returns (CommitCursorResponse) {
    option (google.api.http) = {
      post: "/v1/cursor/{subscription=projects/*/locations/*/subscriptions/*}:commitCursor"
      body: "*"
    };
  }

  // Returns all committed cursor information for a subscription.
  rpc ListPartitionCursors(ListPartitionCursorsRequest)
      returns (ListPartitionCursorsResponse) {
    option (google.api.http) = {
      get: "/v1/cursor/{parent=projects/*/locations/*/subscriptions/*}/cursors"
    };
    option (google.api.method_signature) = "parent";
  }
}

// The first streaming request that must be sent on a newly-opened stream. The
// client must wait for the response before sending subsequent requests on the
// stream.
message InitialCommitCursorRequest {
  // The subscription for which to manage committed cursors.
  string subscription = 1;

  // The partition for which to manage committed cursors. Partitions are zero
  // indexed, so `partition` must be in the range [0, topic.num_partitions).
  int64 partition = 2;
}

// Response to an InitialCommitCursorRequest.
message InitialCommitCursorResponse {}

// Streaming request to update the committed cursor. Subsequent
// SequencedCommitCursorRequests override outstanding ones.
message SequencedCommitCursorRequest {
  // The new value for the committed cursor.
  Cursor cursor = 1;
}

// Response to a SequencedCommitCursorRequest.
message SequencedCommitCursorResponse {
  // The number of outstanding SequencedCommitCursorRequests acknowledged by
  // this response. Note that SequencedCommitCursorRequests are acknowledged in
  // the order that they are received.
  int64 acknowledged_commits = 1;
}

// A request sent from the client to the server on a stream.
message StreamingCommitCursorRequest {
  // The type of request this is.
  oneof request {
    // Initial request on the stream.
    InitialCommitCursorRequest initial = 1;

    // Request to commit a new cursor value.
    SequencedCommitCursorRequest commit = 2;
  }
}

// Response to a StreamingCommitCursorRequest.
message StreamingCommitCursorResponse {
  // The type of request this is.
  oneof request {
    // Initial response on the stream.
    InitialCommitCursorResponse initial = 1;

    // Response to committing a new cursor value.
    SequencedCommitCursorResponse commit = 2;
  }
}

// Request for CommitCursor.
message CommitCursorRequest {
  // The subscription for which to update the cursor.
  string subscription = 1;

  // The partition for which to update the cursor. Partitions are zero indexed,
  // so `partition` must be in the range [0, topic.num_partitions).
  int64 partition = 2;

  // The new value for the committed cursor.
  Cursor cursor = 3;
}

// Response for CommitCursor.
message CommitCursorResponse {}

// Request for ListPartitionCursors.
message ListPartitionCursorsRequest {
  // Required. The subscription for which to retrieve cursors.
  // Structured like
  // `projects/{project_number}/locations/{location}/subscriptions/{subscription_id}`.
  string parent = 1 [
    (google.api.field_behavior) = REQUIRED,
    (google.api.resource_reference) = {
      type: "pubsublite.googleapis.com/Subscription"
    }
  ];

  // The maximum number of cursors to return. The service may return fewer than
  // this value.
  // If unset or zero, all cursors for the parent will be returned.
  int32 page_size = 2;

  // A page token, received from a previous `ListPartitionCursors` call.
  // Provide this to retrieve the subsequent page.
  //
  // When paginating, all other parameters provided to `ListPartitionCursors`
  // must match the call that provided the page token.
  string page_token = 3;
}

// A pair of a Cursor and the partition it is for.
message PartitionCursor {
  // The partition this is for.
  int64 partition = 1;

  // The value of the cursor.
  Cursor cursor = 2;
}

// Response for ListPartitionCursors
message ListPartitionCursorsResponse {
  // The partition cursors from this request.
  repeated PartitionCursor partition_cursors = 1;

  // A token, which can be sent as `page_token` to retrieve the next page.
  // If this field is omitted, there are no subsequent pages.
  string next_page_token = 2;
}
