/*
 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
 * SPDX-License-Identifier: Apache-2.0.
 */

/**
 *
 * @packageDocumentation
 * @module mqtt_request_response
 * @mergeTarget
 *
 */

import * as protocol_client_adapter from "./mqtt_request_response/protocol_adapter";
import * as subscription_manager from "./mqtt_request_response/subscription_manager";
import {MqttClientConnection} from "./mqtt";
import {Mqtt5Client} from "./mqtt5";
import * as mqtt_request_response from "../common/mqtt_request_response";
import * as mqtt_request_response_internal from "../common/mqtt_request_response_internal";
import {BufferedEventEmitter} from "../common/event";
import {CrtError} from "./error";
import {LiftedPromise, newLiftedPromise} from "../common/promise";
import * as io from "../common/io";
import * as mqtt_shared from "../common/mqtt_shared";

export * from "../common/mqtt_request_response";

enum OperationState {
    /* creation -> in event loop enqueue */
    None,

    /* in event loop queue -> non blocked response from subscription manager */
    Queued,

    /* subscribing response from sub manager -> subscription success/failure event */
    PendingSubscription,

    /* (request only) subscription success -> (publish failure OR correlated response received) */
    PendingResponse,

    /* (streaming only) subscription success -> (operation finished OR subscription ended event) */
    Subscribed,

    /* (streaming only) (subscription failure OR subscription ended) -> operation close/terminate */
    Terminal,
}

function operationStateToString(state: OperationState) {
    switch(state) {
        case OperationState.None:
            return "None";
        case OperationState.Queued:
            return "Queued";
        case OperationState.PendingSubscription:
            return "PendingSubscription";
        case OperationState.PendingResponse:
            return "PendingResponse";
        case OperationState.Subscribed:
            return "Subscribed";
        case OperationState.Terminal:
            return "Terminal";
        default:
            return "Unknown";
    }
}

enum OperationType {
    RequestResponse,
    Streaming
}

interface Operation {
    id: number,
    type: OperationType,
    state: OperationState,
    pendingSubscriptionCount: number,
    inClientTables: boolean
}

interface RequestResponseOperation extends Operation {
    options: mqtt_request_response.RequestResponseOperationOptions,
    resultPromise: LiftedPromise<mqtt_request_response.Response>
}

interface StreamingOperation extends Operation {
    options: mqtt_request_response.StreamingOperationOptions,
    operation: StreamingOperationInternal,
}

interface ResponsePathEntry {
    refCount: number,
    correlationTokenPath?: string[],
}

interface ServiceTaskWrapper {
    serviceTask : ReturnType<typeof setTimeout>;
    nextServiceTime : number;
}

function areClientOptionsValid(options: mqtt_request_response.RequestResponseClientOptions) : boolean {
    if (!options) {
        return false;
    }

    if (!options.maxRequestResponseSubscriptions) {
        return false;
    }

    if (!Number.isInteger(options.maxRequestResponseSubscriptions)) {
        return false;
    }

    if (options.maxRequestResponseSubscriptions < 2) {
        return false;
    }

    if (!options.maxStreamingSubscriptions) {
        return false;
    }

    if (!Number.isInteger(options.maxStreamingSubscriptions)) {
        return false;
    }

    if (options.operationTimeoutInSeconds) {
        if (!Number.isInteger(options.operationTimeoutInSeconds)) {
            return false;
        }

        if (options.operationTimeoutInSeconds <= 0) {
            return false;
        }
    }

    return true;
}

interface StreamingOperationInternalOptions {
    close: () => void,
    open: () => void
}

/**
 * An AWS MQTT service streaming operation.  A streaming operation listens to messages on
 * a particular topic, deserializes them using a service model, and emits the modeled data as Javascript events.
 */
export class StreamingOperationBase extends BufferedEventEmitter implements mqtt_request_response.IStreamingOperation {

    private internalOptions: StreamingOperationInternalOptions;
    private state = mqtt_request_response_internal.StreamingOperationState.None;

    constructor(options: StreamingOperationInternalOptions) {
        super();
        this.internalOptions = options;
    }

    /**
     * Triggers the streaming operation to start listening to the configured stream of events.  Has no effect on an
     * already-open operation.  It is an error to attempt to re-open a closed streaming operation.
     */
    open() : void {
        if (this.state == mqtt_request_response_internal.StreamingOperationState.None) {
            this.internalOptions.open();
            this.state = mqtt_request_response_internal.StreamingOperationState.Open;
        } else if (this.state == mqtt_request_response_internal.StreamingOperationState.Closed) {
            throw new CrtError("MQTT streaming operation already closed");
        }
    }

    /**
     * Stops a streaming operation from listening to the configured stream of events
     */
    close(): void {
        if (this.state != mqtt_request_response_internal.StreamingOperationState.Closed) {
            this.state = mqtt_request_response_internal.StreamingOperationState.Closed;
            this.internalOptions.close();
        }
    }

    /**
     * Event emitted when the stream's subscription status changes.
     *
     * Listener type: {@link SubscriptionStatusListener}
     *
     * @event
     */
    static SUBSCRIPTION_STATUS : string = 'subscriptionStatus';

    /**
     * Event emitted when a stream message is received
     *
     * Listener type: {@link IncomingPublishListener}
     *
     * @event
     */
    static INCOMING_PUBLISH : string = 'incomingPublish';

    on(event: 'subscriptionStatus', listener: mqtt_request_response.SubscriptionStatusListener): this;

    on(event: 'incomingPublish', listener: mqtt_request_response.IncomingPublishListener): this;

    on(event: string | symbol, listener: (...args: any[]) => void): this {
        super.on(event, listener);
        return this;
    }
}

class StreamingOperationInternal extends StreamingOperationBase {

    private constructor(options: StreamingOperationInternalOptions) {
        super(options);
    }

    static newInternal(options: StreamingOperationInternalOptions) : StreamingOperationInternal {
        let operation = new StreamingOperationInternal(options);

        return operation;
    }

    triggerIncomingPublishEvent(publishEvent: mqtt_request_response.IncomingPublishEvent) : void {
        process.nextTick(() => {
            this.emit(StreamingOperationBase.INCOMING_PUBLISH, publishEvent);
        });
    }

    triggerSubscriptionStatusUpdateEvent(statusEvent: mqtt_request_response.SubscriptionStatusEvent) : void {
        process.nextTick(() => {
            this.emit(StreamingOperationBase.SUBSCRIPTION_STATUS, statusEvent);
        });
    }
}

/**
 * Native implementation of an MQTT-based request-response client tuned for AWS MQTT services.
 *
 * Supports streaming operations (listen to a stream of modeled events from an MQTT topic) and request-response
 * operations (performs the subscribes, publish, and incoming publish correlation and error checking needed to
 * perform simple request-response operations over MQTT).
 */
export class RequestResponseClient extends BufferedEventEmitter implements mqtt_request_response.IRequestResponseClient {

    private static logSubject = "RequestResponseClient";

    private readonly operationTimeoutInSeconds: number;
    private nextOperationId : number = 1;
    private protocolClientAdapter : protocol_client_adapter.ProtocolClientAdapter;
    private subscriptionManager : subscription_manager.SubscriptionManager;
    private state : mqtt_request_response_internal.RequestResponseClientState = mqtt_request_response_internal.RequestResponseClientState.Ready;
    private serviceTask? : ServiceTaskWrapper;

    private operations : Map<number, Operation> = new Map<number, Operation>();
    private streamingOperationsByTopicFilter : Map<string, Set<number>> = new Map<string, Set<number>>(); // topic filter -> set of operation ids
    private correlationTokenPathsByResponsePaths : Map<string, ResponsePathEntry> = new Map<string, ResponsePathEntry>(); // response topic -> response path entry
    private operationsByCorrelationToken : Map<string, number> = new Map<string, number>(); // correlation token -> operation id

    private operationQueue : Array<number> = new Array<number>;

    constructor(protocolClientAdapter: protocol_client_adapter.ProtocolClientAdapter, options: mqtt_request_response.RequestResponseClientOptions) {
        if (!areClientOptionsValid(options)) {
            throw new CrtError("Invalid client options passed to RequestResponseClient constructor");
        }

        super();

        this.operationTimeoutInSeconds = options.operationTimeoutInSeconds ?? 60;
        this.protocolClientAdapter = protocolClientAdapter;

        this.protocolClientAdapter.addListener(protocol_client_adapter.ProtocolClientAdapter.PUBLISH_COMPLETION, this.handlePublishCompletionEvent.bind(this));
        this.protocolClientAdapter.addListener(protocol_client_adapter.ProtocolClientAdapter.CONNECTION_STATUS, this.handleConnectionStatusEvent.bind(this));
        this.protocolClientAdapter.addListener(protocol_client_adapter.ProtocolClientAdapter.INCOMING_PUBLISH, this.handleIncomingPublishEvent.bind(this));

        let config : subscription_manager.SubscriptionManagerConfig = {
            maxRequestResponseSubscriptions: options.maxRequestResponseSubscriptions,
            maxStreamingSubscriptions: options.maxStreamingSubscriptions,
            operationTimeoutInSeconds: this.operationTimeoutInSeconds,
        }

        this.subscriptionManager = new subscription_manager.SubscriptionManager(protocolClientAdapter, config);

        this.subscriptionManager.addListener(subscription_manager.SubscriptionManager.SUBSCRIBE_SUCCESS, this.handleSubscribeSuccessEvent.bind(this));
        this.subscriptionManager.addListener(subscription_manager.SubscriptionManager.SUBSCRIBE_FAILURE, this.handleSubscribeFailureEvent.bind(this));
        this.subscriptionManager.addListener(subscription_manager.SubscriptionManager.SUBSCRIPTION_ENDED, this.handleSubscriptionEndedEvent.bind(this));
        this.subscriptionManager.addListener(subscription_manager.SubscriptionManager.STREAMING_SUBSCRIPTION_ESTABLISHED, this.handleStreamingSubscriptionEstablishedEvent.bind(this));
        this.subscriptionManager.addListener(subscription_manager.SubscriptionManager.STREAMING_SUBSCRIPTION_LOST, this.handleStreamingSubscriptionLostEvent.bind(this));
        this.subscriptionManager.addListener(subscription_manager.SubscriptionManager.STREAMING_SUBSCRIPTION_HALTED, this.handleStreamingSubscriptionHaltedEvent.bind(this));
        this.subscriptionManager.addListener(subscription_manager.SubscriptionManager.SUBSCRIPTION_ORPHANED, this.handleSubscriptionOrphanedEvent.bind(this));
        this.subscriptionManager.addListener(subscription_manager.SubscriptionManager.UNSUBSCRIBE_COMPLETE, this.handleUnsubscribeCompleteEvent.bind(this));
    }

    /**
     * Creates a new MQTT service request-response client that uses an MQTT5 client as the protocol implementation.
     *
     * @param protocolClient protocol client to use for all operations
     * @param options configuration options for the desired request-response client
     */
    static newFromMqtt5(protocolClient: Mqtt5Client, options: mqtt_request_response.RequestResponseClientOptions): RequestResponseClient {
        if (!protocolClient) {
            throw new CrtError("protocol client is null");
        }

        let adapter = protocol_client_adapter.ProtocolClientAdapter.newFrom5(protocolClient);
        let client = new RequestResponseClient(adapter, options);

        return client;
    }

    /**
     * Creates a new MQTT service request-response client that uses an MQTT311 client as the protocol implementation.
     *
     * @param protocolClient protocol client to use for all operations
     * @param options configuration options for the desired request-response client
     */
    static newFromMqtt311(protocolClient: MqttClientConnection, options: mqtt_request_response.RequestResponseClientOptions) : RequestResponseClient {
        if (!protocolClient) {
            throw new CrtError("protocol client is null");
        }

        let adapter = protocol_client_adapter.ProtocolClientAdapter.newFrom311(protocolClient);
        let client = new RequestResponseClient(adapter, options);

        return client;
    }

    /**
     * Triggers cleanup of native resources associated with the request-response client.  Closing a client will fail
     * all incomplete requests and close all outstanding streaming operations.
     *
     * This must be called when finished with a client; otherwise, native resources will leak.
     */
    close(): void {
        if (this.state != mqtt_request_response_internal.RequestResponseClientState.Closed) {
            io.logInfo(RequestResponseClient.logSubject, `closing MQTT RequestResponseClient`);
            this.state = mqtt_request_response_internal.RequestResponseClientState.Closed;
            this.closeAllOperations();

            this.protocolClientAdapter.close();
            this.subscriptionManager.close();
        }
    }

    /**
     * Submits a request to the request-response client.
     *
     * @param requestOptions description of the request to perform
     *
     * Returns a promise that resolves to a response to the request or an error describing how the request attempt
     * failed.
     *
     * A "successful" request-response execution flow is defined as "the service sent a response payload that
     * correlates with the request payload."  Upon deserialization (which is the responsibility of the service model
     * client, one layer up), such a payload may actually indicate a failure.
     */
    async submitRequest(requestOptions: mqtt_request_response.RequestResponseOperationOptions): Promise<mqtt_request_response.Response> {
        let resultPromise : LiftedPromise<mqtt_request_response.Response> = newLiftedPromise();

        if (this.state == mqtt_request_response_internal.RequestResponseClientState.Closed) {
            resultPromise.reject(new CrtError("MQTT request-response client has already been closed"));
            return resultPromise.promise;
        }

        try {
            validateRequestOptions(requestOptions);
        } catch (err) {
            resultPromise.reject(err);
            return resultPromise.promise;
        }

        let id = this.nextOperationId;
        this.nextOperationId++;

        let operation : RequestResponseOperation = {
            id: id,
            type: OperationType.RequestResponse,
            state: OperationState.Queued,
            pendingSubscriptionCount: requestOptions.subscriptionTopicFilters.length,
            inClientTables: false,
            options: requestOptions,
            resultPromise: resultPromise,
        };

        this.operations.set(id, operation);
        this.operationQueue.push(id);

        setTimeout(() => {
            try {
                this.completeRequestResponseOperationWithError(id, new CrtError("Operation timeout"));
            } catch (err) {
                ;
            }
        }, this.operationTimeoutInSeconds * 1000);

        this.wakeServiceTask();

        io.logInfo(RequestResponseClient.logSubject, `request-response operation with id "${id}" submitted to operation queue`);

        return resultPromise.promise;
    }

    /**
     * Creates a new streaming operation from a set of configuration options.  A streaming operation provides a
     * mechanism for listening to a specific event stream from an AWS MQTT-based service.
     *
     * @param streamOptions configuration options for the streaming operation
     *
     * browser/node implementers are covariant by returning an implementation of IStreamingOperation.  This split
     * is necessary because event listening (which streaming operations need) cannot be modeled on an interface.
     */
    createStream(streamOptions: mqtt_request_response.StreamingOperationOptions) : StreamingOperationBase {
        if (this.state == mqtt_request_response_internal.RequestResponseClientState.Closed) {
            throw new CrtError("MQTT request-response client has already been closed");
        }

        validateStreamingOptions(streamOptions);

        let id = this.nextOperationId;
        this.nextOperationId++;

        let internalOptions: StreamingOperationInternalOptions = {
            open: () => { this.openStreamingOperation(id); },
            close: () => { this.closeStreamingOperation(id); },
        };

        let internalOperation = StreamingOperationInternal.newInternal(internalOptions);

        let operation : StreamingOperation = {
            id: id,
            type: OperationType.Streaming,
            state: OperationState.None,
            pendingSubscriptionCount: 1,
            inClientTables: false,
            options: streamOptions,
            operation: internalOperation
        };

        this.operations.set(id, operation);

        return internalOperation;
    }

    private canOperationDequeue(operation: Operation) : boolean {
        if (operation.type != OperationType.RequestResponse) {
            return true;
        }

        let rrOperation = operation as RequestResponseOperation;
        let correlationToken = rrOperation.options.correlationToken ?? "";

        return !this.operationsByCorrelationToken.has(correlationToken);
    }

    private static buildSuscriptionListFromOperation(operation : Operation) : string[] {
        if (operation.type == OperationType.RequestResponse) {
            let rrOperation = operation as RequestResponseOperation;
            return rrOperation.options.subscriptionTopicFilters;
        } else {
            let streamingOperation = operation as StreamingOperation;
            return new Array(streamingOperation.options.subscriptionTopicFilter);
        }
    }

    private addOperationToInProgressTables(operation: Operation) {
        if (operation.type == OperationType.Streaming) {
            let streamingOperation = operation as StreamingOperation;
            let filter = streamingOperation.options.subscriptionTopicFilter;
            let existingSet = this.streamingOperationsByTopicFilter.get(filter);
            if (!existingSet) {
                existingSet = new Set<number>();
                this.streamingOperationsByTopicFilter.set(filter, existingSet);

                io.logDebug(RequestResponseClient.logSubject, `adding topic filter "${filter}" to streaming subscriptions table`);
            }

            existingSet.add(operation.id);
            io.logDebug(RequestResponseClient.logSubject, `adding operation ${operation.id} to streaming subscriptions table under topic filter "${filter}"`);
        } else {
            let rrOperation = operation as RequestResponseOperation;

            let correlationToken = rrOperation.options.correlationToken ?? "";
            this.operationsByCorrelationToken.set(correlationToken, operation.id);

            io.logDebug(RequestResponseClient.logSubject, `operation ${operation.id} registered with correlation token "${correlationToken}"`);

            for (let path of rrOperation.options.responsePaths) {
                let existingEntry = this.correlationTokenPathsByResponsePaths.get(path.topic);
                if (!existingEntry) {
                    existingEntry = {
                        refCount: 0
                    };

                    if (path.correlationTokenJsonPath) {
                        existingEntry.correlationTokenPath = path.correlationTokenJsonPath.split('.');
                    }

                    this.correlationTokenPathsByResponsePaths.set(path.topic, existingEntry);

                    io.logDebug(RequestResponseClient.logSubject, `adding response path "${path.topic}" to response path table`);
                }

                existingEntry.refCount++;
                io.logDebug(RequestResponseClient.logSubject, `operation ${operation.id} adding reference to response path "${path.topic}"`);
            }
        }

        operation.inClientTables = true;
    }

    private handleAcquireSubscriptionResult(operation: Operation, result: subscription_manager.AcquireSubscriptionResult) {
        if (result == subscription_manager.AcquireSubscriptionResult.Failure || result == subscription_manager.AcquireSubscriptionResult.NoCapacity) {
            this.completeOperationWithError(operation.id, new CrtError(`Acquire subscription error: ${subscription_manager.acquireSubscriptionResultToString(result)}`));
            return;
        }

        this.addOperationToInProgressTables(operation);

        if (result == subscription_manager.AcquireSubscriptionResult.Subscribing) {
            this.changeOperationState(operation, OperationState.PendingSubscription);
            return;
        }

        if (operation.type == OperationType.Streaming) {
            this.changeOperationState(operation, OperationState.Subscribed);

            let streamingOperation = operation as StreamingOperation;
            streamingOperation.operation.triggerSubscriptionStatusUpdateEvent({
                type: mqtt_request_response.SubscriptionStatusEventType.SubscriptionEstablished
            });
        } else {
            this.applyRequestResponsePublish(operation as RequestResponseOperation);
        }
    }

    private service() {
        this.serviceTask = undefined;

        if (this.state != mqtt_request_response_internal.RequestResponseClientState.Ready) {
            return;
        }

        this.subscriptionManager.purge();

        io.logDebug(RequestResponseClient.logSubject, `servicing operation queue with ${this.operationQueue.length} entries`);
        while (this.operationQueue.length > 0) {
            let headId = this.operationQueue[0];
            let operation = this.operations.get(headId);
            if (!operation) {
                this.operationQueue.shift();
                continue;
            }

            if (!this.canOperationDequeue(operation)) {
                io.logDebug(RequestResponseClient.logSubject, `operation ${headId} cannot be dequeued`);
                break;
            }

            let acquireOptions : subscription_manager.AcquireSubscriptionConfig = {
                topicFilters: RequestResponseClient.buildSuscriptionListFromOperation(operation),
                operationId: headId,
                type: (operation.type == OperationType.RequestResponse) ? subscription_manager.SubscriptionType.RequestResponse : subscription_manager.SubscriptionType.EventStream,
            };

            let acquireResult = this.subscriptionManager.acquireSubscription(acquireOptions);
            io.logDebug(RequestResponseClient.logSubject, `servicing queued operation ${operation.id} yielded acquire subscription result of "${subscription_manager.acquireSubscriptionResultToString(acquireResult)}"`);
            if (acquireResult == subscription_manager.AcquireSubscriptionResult.Blocked) {
                break;
            }

            this.operationQueue.shift();
            this.handleAcquireSubscriptionResult(operation, acquireResult);
        }
    }

    private clearServiceTask() {
        if (this.serviceTask) {
            clearTimeout(this.serviceTask.serviceTask);
            this.serviceTask = undefined;
        }
    }

    private tryScheduleServiceTask(serviceTime: number) {
        if (this.serviceTask) {
            if (serviceTime >= this.serviceTask.nextServiceTime) {
                return;
            }

            this.clearServiceTask();
        }

        let futureMs = Math.max(0, Date.now() - serviceTime);
        this.serviceTask = {
            serviceTask: setTimeout(() => { this.service(); }, futureMs),
            nextServiceTime: serviceTime,
        }

        io.logDebug(RequestResponseClient.logSubject, `service task scheduled for execution in ${futureMs} MS`);
    }

    private wakeServiceTask() : void {
        this.tryScheduleServiceTask(Date.now());
    }

    private closeAllOperations() : void {
        let operations = Array.from(this.operations).map(([key, value]) => key);
        for (let id of operations) {
            this.completeOperationWithError(id, new CrtError("Request-response client closed"));
        }
    }

    private removeStreamingOperationFromTopicFilterSet(topicFilter: string, id: number) {
        let operationSet = this.streamingOperationsByTopicFilter.get(topicFilter);
        if (!operationSet) {
            return;
        }

        operationSet.delete(id);
        io.logDebug(RequestResponseClient.logSubject, `removed operation ${id} from streaming topic filter table entry for "${topicFilter}"`);
        if (operationSet.size > 0) {
            return;
        }

        this.streamingOperationsByTopicFilter.delete(topicFilter);
        io.logDebug(RequestResponseClient.logSubject, `removed streaming topic filter table entry for "${topicFilter}"`);
    }

    private decRefResponsePaths(topic: string) {
        let pathEntry = this.correlationTokenPathsByResponsePaths.get(topic);
        if (!pathEntry) {
            return;
        }

        pathEntry.refCount--;
        io.logDebug(RequestResponseClient.logSubject, `dec-refing response path entry for "${topic}", ${pathEntry.refCount} references left`);
        if (pathEntry.refCount < 1) {
            io.logDebug(RequestResponseClient.logSubject, `removing response path entry for "${topic}"`);
            this.correlationTokenPathsByResponsePaths.delete(topic);
        }
    }

    private removeRequestResponseOperation(operation: RequestResponseOperation) {
        io.logDebug(RequestResponseClient.logSubject, `removing request-response operation ${operation.id} from client state`);
        this.operations.delete(operation.id);

        if (operation.inClientTables) {
            for (let responsePath of operation.options.responsePaths) {
                this.decRefResponsePaths(responsePath.topic);
            }

            let correlationToken = operation.options.correlationToken ?? "";
            this.operationsByCorrelationToken.delete(correlationToken);
        }

        let releaseOptions : subscription_manager.ReleaseSubscriptionsConfig = {
            topicFilters: operation.options.subscriptionTopicFilters,
            operationId: operation.id,
        };
        this.subscriptionManager.releaseSubscription(releaseOptions);
    }

    private removeStreamingOperation(operation: StreamingOperation) {
        io.logDebug(RequestResponseClient.logSubject, `removing streaming operation ${operation.id} from client state`);
        this.operations.delete(operation.id);

        if (operation.inClientTables) {
            this.removeStreamingOperationFromTopicFilterSet(operation.options.subscriptionTopicFilter, operation.id);
        }

        let releaseOptions : subscription_manager.ReleaseSubscriptionsConfig = {
            topicFilters: new Array<string>(operation.options.subscriptionTopicFilter),
            operationId: operation.id,
        };
        this.subscriptionManager.releaseSubscription(releaseOptions);
    }

    private removeOperation(id: number) {
        let operation = this.operations.get(id);
        if (!operation) {
            return;
        }

        if (operation.type == OperationType.RequestResponse) {
            this.removeRequestResponseOperation(operation as RequestResponseOperation);
        } else {
            this.removeStreamingOperation(operation as StreamingOperation);
        }
    }

    private completeRequestResponseOperationWithError(id: number, err: CrtError) {
        let operation = this.operations.get(id);
        if (!operation) {
            return;
        }

        io.logInfo(RequestResponseClient.logSubject, `request-response operation ${id} completed with error: "${JSON.stringify(err)}"`);

        this.removeOperation(id);

        if (operation.type != OperationType.RequestResponse) {
            return;
        }

        let rrOperation = operation as RequestResponseOperation;
        let promise = rrOperation.resultPromise;

        promise.reject(err);
    }

    private haltStreamingOperationWithError(id: number, err: CrtError) {
        let operation = this.operations.get(id);
        if (!operation) {
            return;
        }

        io.logInfo(RequestResponseClient.logSubject, `streaming operation ${id} halted with error: "${JSON.stringify(err)}"`);

        this.removeOperation(id);

        if (operation.type != OperationType.Streaming) {
            return;
        }

        let streamingOperation = operation as StreamingOperation;
        if (operation.state != OperationState.Terminal && operation.state != OperationState.None) {
            streamingOperation.operation.triggerSubscriptionStatusUpdateEvent({
                type: mqtt_request_response.SubscriptionStatusEventType.SubscriptionHalted,
                error: err
            });
        }

        this.changeOperationState(operation, OperationState.Terminal);

        // this is mostly a no-op except it's the only way we can guarantee that the streaming operation state also gets
        // flipped to closed
        streamingOperation.operation.close();
    }

    private completeOperationWithError(id: number, err: CrtError) {
        let operation = this.operations.get(id);
        if (!operation) {
            return;
        }

        if (operation.type == OperationType.RequestResponse) {
            this.completeRequestResponseOperationWithError(id, err);
        } else {
            this.haltStreamingOperationWithError(id, err);
        }
    }

    private completeRequestResponseOperationWithResponse(id: number, responseTopic: string, payload: ArrayBuffer) {
        let operation = this.operations.get(id);
        if (!operation) {
            return;
        }

        io.logInfo(RequestResponseClient.logSubject, `request-response operation ${id} successfully completed with response"`);

        this.removeOperation(id);

        if (operation.type != OperationType.RequestResponse) {
            return;
        }

        let rrOperation = operation as RequestResponseOperation;
        let promise = rrOperation.resultPromise;

        promise.resolve({
            topic: responseTopic,
            payload: payload
        });
    }

    private handlePublishCompletionEvent(event: protocol_client_adapter.PublishCompletionEvent) {
        if (this.state != mqtt_request_response_internal.RequestResponseClientState.Ready) {
            return;
        }

        let id = event.completionData as number;
        if (event.err) {
            this.completeRequestResponseOperationWithError(id, event.err as CrtError);
        } else {
            io.logDebug(RequestResponseClient.logSubject, `request-response operation ${id} successfully published request payload"`);
        }
    }

    private handleConnectionStatusEvent(event: protocol_client_adapter.ConnectionStatusEvent) {
        if (this.state != mqtt_request_response_internal.RequestResponseClientState.Ready) {
            return;
        }

        if (event.status == protocol_client_adapter.ConnectionState.Connected && this.operationQueue.length > 0) {
            this.wakeServiceTask();
        }
    }

    private handleIncomingPublishEventStreaming(event: protocol_client_adapter.IncomingPublishEvent, operations: Set<number>) {
        if (this.state != mqtt_request_response_internal.RequestResponseClientState.Ready) {
            return;
        }

        for (let id of operations) {
            let operation = this.operations.get(id);
            if (!operation) {
                continue;
            }

            if (operation.type != OperationType.Streaming) {
                continue;
            }

            let streamingOperation = operation as StreamingOperation;
            streamingOperation.operation.triggerIncomingPublishEvent({
                topic: event.topic,
                payload: event.payload
            });
        }
    }

    private handleIncomingPublishEventRequestResponse(event: protocol_client_adapter.IncomingPublishEvent, responsePathEntry: ResponsePathEntry) {

        io.logDebug(RequestResponseClient.logSubject, `processing incoming publish event on response path topic "${event.topic}"`);
        if (!event.payload) {
            io.logError(RequestResponseClient.logSubject, `incoming publish on response path topic "${event.topic}" has no payload`);
            return;
        }

        try {
            let correlationToken : string | undefined = undefined;

            if (!responsePathEntry.correlationTokenPath) {
                correlationToken = "";
            } else {
                let payloadAsString = new TextDecoder().decode(new Uint8Array(event.payload));
                let payloadAsJson = JSON.parse(payloadAsString);
                let segmentValue : any = payloadAsJson;
                for (let segment of responsePathEntry.correlationTokenPath) {
                    let segmentPropertyValue = segmentValue[segment];
                    if (!segmentPropertyValue) {
                        io.logError(RequestResponseClient.logSubject, `incoming publish on response path topic "${event.topic}" does not have a correlation token at the expected JSON path`);
                        break;
                    }

                    segmentValue = segmentValue[segment];
                }

                if (segmentValue && typeof(segmentValue) === "string") {
                    correlationToken = segmentValue as string;
                }
            }

            if (correlationToken === undefined) {
                io.logError(RequestResponseClient.logSubject, `A valid correlation token could not be inferred for incoming publish on response path topic "${event.topic}"`);
                return;
            }

            let id = this.operationsByCorrelationToken.get(correlationToken);
            if (!id) {
                io.logDebug(RequestResponseClient.logSubject, `incoming publish on response path topic "${event.topic}" with correlation token "${correlationToken}" does not have an originating request entry`);
                return;
            }

            this.completeRequestResponseOperationWithResponse(id, event.topic, event.payload);
        } catch (err) {
            io.logError(RequestResponseClient.logSubject, `incoming publish on response path topic "${event.topic}" triggered exception: ${JSON.stringify(err)}`);
        }
    }

    private handleIncomingPublishEvent(event: protocol_client_adapter.IncomingPublishEvent) {
        if (this.state != mqtt_request_response_internal.RequestResponseClientState.Ready) {
            return;
        }

        let responsePathEntry = this.correlationTokenPathsByResponsePaths.get(event.topic);
        if (responsePathEntry) {
            this.handleIncomingPublishEventRequestResponse(event, responsePathEntry);
        }

        let streamingOperationSet = this.streamingOperationsByTopicFilter.get(event.topic);
        if (streamingOperationSet) {
            this.handleIncomingPublishEventStreaming(event, streamingOperationSet);
        }
    }

    private handleSubscribeSuccessEvent(event: subscription_manager.SubscribeSuccessEvent) {
        if (this.state != mqtt_request_response_internal.RequestResponseClientState.Ready) {
            return;
        }

        io.logDebug(RequestResponseClient.logSubject, `subscribe success event received for operation ${event.operationId} using topic filter "${event.topicFilter}"`);
        let operation = this.operations.get(event.operationId);
        if (!operation) {
            return;
        }

        let rrOperation = operation as RequestResponseOperation;
        rrOperation.pendingSubscriptionCount--;
        if (rrOperation.pendingSubscriptionCount === 0) {
            this.applyRequestResponsePublish(rrOperation);
        } else {
            io.logDebug(RequestResponseClient.logSubject, `operation ${event.operationId} has ${rrOperation.pendingSubscriptionCount} pending subscriptions left`);
        }
    }

    private handleSubscribeFailureEvent(event: subscription_manager.SubscribeFailureEvent) {
        if (this.state != mqtt_request_response_internal.RequestResponseClientState.Ready) {
            return;
        }

        io.logDebug(RequestResponseClient.logSubject, `subscribe failure event received for operation ${event.operationId} using topic filter "${event.topicFilter}"`);
        this.completeRequestResponseOperationWithError(event.operationId, new CrtError("Subscribe failure"));
    }

    private handleSubscriptionEndedEvent(event: subscription_manager.SubscriptionEndedEvent) {
        if (this.state != mqtt_request_response_internal.RequestResponseClientState.Ready) {
            return;
        }

        io.logDebug(RequestResponseClient.logSubject, `subscription ended event received for operation ${event.operationId} using topic filter "${event.topicFilter}"`);
        this.completeRequestResponseOperationWithError(event.operationId, new CrtError("Subscription Ended Early"));
    }

    private handleStreamingSubscriptionEstablishedEvent(event: subscription_manager.StreamingSubscriptionEstablishedEvent) {
        if (this.state != mqtt_request_response_internal.RequestResponseClientState.Ready) {
            return;
        }

        let operation = this.operations.get(event.operationId);
        if (!operation) {
            return;
        }

        if (operation.state == OperationState.Terminal) {
            return;
        }

        if (operation.type != OperationType.Streaming) {
            return;
        }

        let streamingOperation = operation as StreamingOperation;
        streamingOperation.operation.triggerSubscriptionStatusUpdateEvent({
            type: mqtt_request_response.SubscriptionStatusEventType.SubscriptionEstablished
        });

        this.changeOperationState(operation, OperationState.Subscribed);
    }

    private handleStreamingSubscriptionLostEvent(event: subscription_manager.StreamingSubscriptionLostEvent) {
        if (this.state != mqtt_request_response_internal.RequestResponseClientState.Ready) {
            return;
        }

        let operation = this.operations.get(event.operationId);
        if (!operation) {
            return;
        }

        if (operation.state == OperationState.Terminal) {
            return;
        }

        if (operation.type != OperationType.Streaming) {
            return;
        }

        let streamingOperation = operation as StreamingOperation;
        streamingOperation.operation.triggerSubscriptionStatusUpdateEvent({
            type: mqtt_request_response.SubscriptionStatusEventType.SubscriptionLost,
        });
    }

    private handleStreamingSubscriptionHaltedEvent(event: subscription_manager.StreamingSubscriptionHaltedEvent) {
        if (this.state != mqtt_request_response_internal.RequestResponseClientState.Ready) {
            return;
        }

        let operation = this.operations.get(event.operationId);
        if (!operation) {
            return;
        }

        if (operation.state == OperationState.Terminal) {
            return;
        }

        if (operation.type != OperationType.Streaming) {
            return;
        }

        let streamingOperation = operation as StreamingOperation;
        streamingOperation.operation.triggerSubscriptionStatusUpdateEvent({
            type: mqtt_request_response.SubscriptionStatusEventType.SubscriptionHalted,
            error: new CrtError(`Subscription Failure for topic filter "${event.topicFilter}"`)
        });

        this.changeOperationState(operation, OperationState.Terminal);
    }

    private handleSubscriptionOrphanedEvent(event: subscription_manager.SubscriptionOrphanedEvent) {
        if (this.state != mqtt_request_response_internal.RequestResponseClientState.Ready) {
            return;
        }

        io.logDebug(RequestResponseClient.logSubject, `subscription orphaned event received for topic filter "${event.topicFilter}"`);
        this.wakeServiceTask();
    }

    private handleUnsubscribeCompleteEvent(event: subscription_manager.UnsubscribeCompleteEvent) {
        if (this.state != mqtt_request_response_internal.RequestResponseClientState.Ready) {
            return;
        }

        io.logDebug(RequestResponseClient.logSubject, `unsubscribe completion event received for topic filter "${event.topicFilter}"`);
        this.wakeServiceTask();
    }

    private changeOperationState(operation: Operation, state: OperationState) {
        if (state == operation.state) {
            return;
        }

        io.logDebug(RequestResponseClient.logSubject, `operation ${operation.id} changing state from "${operationStateToString(operation.state)}" to "${operationStateToString(state)}"`);

        operation.state = state;
    }

    private applyRequestResponsePublish(operation: RequestResponseOperation) {
        let publishOptions = {
            topic: operation.options.publishTopic,
            payload: operation.options.payload,
            timeoutInSeconds: this.operationTimeoutInSeconds,
            completionData: operation.id
        };

        try {
            io.logDebug(RequestResponseClient.logSubject, `submitting publish for request-response operation ${operation.id}`);
            this.protocolClientAdapter.publish(publishOptions);
            this.changeOperationState(operation, OperationState.PendingResponse);
        } catch (err) {
            let errorStringified = JSON.stringify(err);
            this.completeRequestResponseOperationWithError(operation.id, new CrtError(`Publish error: "${errorStringified}"`));
            io.logError(RequestResponseClient.logSubject, `request-response operation ${operation.id} synchronously failed publish step due to error: ${errorStringified}`);
        }
    }

    private openStreamingOperation(id: number) {
        if (this.state != mqtt_request_response_internal.RequestResponseClientState.Ready) {
            throw new CrtError(`Attempt to open streaming operation with id "${id}" after client closed`);
        }

        let operation = this.operations.get(id);
        if (!operation) {
            throw new CrtError(`Attempt to open untracked streaming operation with id "${id}"`);
        }

        if (operation.state != OperationState.None) {
            throw new CrtError(`Attempt to open already-opened streaming operation with id "${id}"`);
        }

        operation.state = OperationState.Queued;
        this.operationQueue.push(id);

        this.wakeServiceTask();

        io.logInfo(RequestResponseClient.logSubject, `streaming operation with id "${id}" submitted to operation queue`);
    }

    private closeStreamingOperation(id: number) {
        let operation = this.operations.get(id);
        if (!operation) {
            // don't throw here intentionally; there's a bit of a recursive tangle with closing streaming operations
            return;
        }

        this.haltStreamingOperationWithError(id, new CrtError("Streaming operation closed"));
    }
}

function validateResponsePath(responsePath: mqtt_request_response.ResponsePath) {
    if (!mqtt_shared.isValidTopic(responsePath.topic)) {
        throw new CrtError(`"${JSON.stringify(responsePath.topic)})" is not a valid topic`);
    }

    if (responsePath.correlationTokenJsonPath) {
        if (typeof(responsePath.correlationTokenJsonPath) !== 'string') {
            throw new CrtError(`"${JSON.stringify(responsePath.correlationTokenJsonPath)})" is not a valid correlation token path`);
        }
    }
}

function validateRequestOptions(requestOptions: mqtt_request_response.RequestResponseOperationOptions) {
    if (!requestOptions) {
        throw new CrtError("Invalid request options - null options");
    }

    if (!requestOptions.subscriptionTopicFilters) {
        throw new CrtError("Invalid request options - null subscriptionTopicFilters");
    }

    if (!Array.isArray(requestOptions.subscriptionTopicFilters)) {
        throw new CrtError("Invalid request options - subscriptionTopicFilters is not an array");
    }

    if (requestOptions.subscriptionTopicFilters.length === 0) {
        throw new CrtError("Invalid request options - subscriptionTopicFilters is empty");
    }

    for (const topicFilter of requestOptions.subscriptionTopicFilters) {
        if (!mqtt_shared.isValidTopicFilter(topicFilter)) {
            throw new CrtError(`Invalid request options - "${JSON.stringify(topicFilter)}" is not a valid topic filter`);
        }
    }

    if (!requestOptions.responsePaths) {
        throw new CrtError("Invalid request options - null responsePaths");
    }

    if (!Array.isArray(requestOptions.responsePaths)) {
        throw new CrtError("Invalid request options - responsePaths is not an array");
    }

    if (requestOptions.responsePaths.length === 0) {
        throw new CrtError("Invalid request options - responsePaths is empty");
    }

    for (const responsePath of requestOptions.responsePaths) {
        try {
            validateResponsePath(responsePath);
        } catch (err) {
            throw new CrtError(`Invalid request options - invalid response path: ${JSON.stringify(err)}`);
        }
    }

    if (!requestOptions.publishTopic) {
        throw new CrtError("Invalid request options - null publishTopic");
    }

    if (!mqtt_shared.isValidTopic(requestOptions.publishTopic)) {
        throw new CrtError(`Invalid request options - "${JSON.stringify(requestOptions.publishTopic)}" is not a valid topic`);
    }

    if (!requestOptions.payload) {
        throw new CrtError("Invalid request options - null payload");
    }

    if (requestOptions.payload.byteLength == 0) {
        throw new CrtError("Invalid request options - empty payload");
    }

    if (requestOptions.correlationToken) {
        if (typeof(requestOptions.correlationToken) !== 'string') {
            throw new CrtError("Invalid request options - correlationToken is not a string");
        }
    } else if (requestOptions.correlationToken === null) {
        throw new CrtError("Invalid request options - correlationToken null");
    }
}

function validateStreamingOptions(streamOptions: mqtt_request_response.StreamingOperationOptions) {
    if (!streamOptions) {
        throw new CrtError("Invalid streaming options - null options");
    }

    if (!streamOptions.subscriptionTopicFilter) {
        throw new CrtError("Invalid streaming options - null subscriptionTopicFilter");
    }

    if (typeof(streamOptions.subscriptionTopicFilter) !== 'string') {
        throw new CrtError("Invalid streaming options - subscriptionTopicFilter not a string");
    }

    if (!mqtt_shared.isValidTopicFilter(streamOptions.subscriptionTopicFilter)) {
        throw new CrtError("Invalid streaming options - subscriptionTopicFilter not a valid topic filter");
    }
}
