/*
 * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
 * SPDX-License-Identifier: Apache-2.0.
 */


import * as subscription_manager from "./subscription_manager";
import {once} from "events";
import {newLiftedPromise} from "../../common/promise";
import {CrtError} from "../error";
import * as protocol_adapter_mock from "./protocol_adapter_mock";

jest.setTimeout(10000);


function createBasicSubscriptionManagerConfig() : subscription_manager.SubscriptionManagerConfig {
    return {
        maxRequestResponseSubscriptions: 2,
        maxStreamingSubscriptions: 1,
        operationTimeoutInSeconds: 30,
    };
}

test('Subscription Manager - Acquire Subscribing Success', async () => {
    let adapter = new protocol_adapter_mock.MockProtocolAdapter();
    adapter.connect();

    // @ts-ignore
    let subscriptionManager = new subscription_manager.SubscriptionManager(adapter, createBasicSubscriptionManagerConfig());

    let filter1 = "a/b/+";
    let filter2 = "hello/world";
    let filter3 = "a/b/events";
    let expectedApiCalls : Array<protocol_adapter_mock.ProtocolAdapterApiCall> = new Array<protocol_adapter_mock.ProtocolAdapterApiCall>(
        {
            methodName: 'subscribe',
            args: {
                topicFilter: filter1,
                timeoutInSeconds: 30
            }
        },
        {
            methodName: 'subscribe',
            args: {
                topicFilter: filter2,
                timeoutInSeconds: 30
            }
        },
        {
            methodName: 'subscribe',
            args: {
                topicFilter: filter3,
                timeoutInSeconds: 30
            }
        }
    );

    let subscribeSuccessPromise1 = once(subscriptionManager, subscription_manager.SubscriptionManager.SUBSCRIBE_SUCCESS);
    expect(subscriptionManager.acquireSubscription({
        operationId: 1,
        type: subscription_manager.SubscriptionType.RequestResponse,
        topicFilters: [filter1]
    })).toEqual(subscription_manager.AcquireSubscriptionResult.Subscribing);

    adapter.completeSubscribe(filter1);

    let subscribeSuccess1 = (await subscribeSuccessPromise1)[0];
    expect(subscribeSuccess1.topicFilter).toEqual(filter1);
    expect(subscribeSuccess1.operationId).toEqual(1);

    expect(adapter.getApiCalls()).toEqual(expectedApiCalls.slice(0, 1));

    let subscribeSuccessPromise2 = once(subscriptionManager, subscription_manager.SubscriptionManager.SUBSCRIBE_SUCCESS);
    expect(subscriptionManager.acquireSubscription({
        operationId: 2,
        type: subscription_manager.SubscriptionType.RequestResponse,
        topicFilters: [filter2]
    })).toEqual(subscription_manager.AcquireSubscriptionResult.Subscribing);

    adapter.completeSubscribe(filter2);

    let subscribeSuccess2 = (await subscribeSuccessPromise2)[0];
    expect(subscribeSuccess2.topicFilter).toEqual(filter2);
    expect(subscribeSuccess2.operationId).toEqual(2);

    expect(adapter.getApiCalls()).toEqual(expectedApiCalls.slice(0, 2));

    let streamingSubscriptionEstablishedPromise = once(subscriptionManager, subscription_manager.SubscriptionManager.STREAMING_SUBSCRIPTION_ESTABLISHED);

    expect(subscriptionManager.acquireSubscription({
        operationId: 3,
        type: subscription_manager.SubscriptionType.EventStream,
        topicFilters: [filter3]
    })).toEqual(subscription_manager.AcquireSubscriptionResult.Subscribing);

    adapter.completeSubscribe(filter3);

    let streamingSubscriptionEstablished = (await streamingSubscriptionEstablishedPromise)[0];
    expect(streamingSubscriptionEstablished.topicFilter).toEqual(filter3);
    expect(streamingSubscriptionEstablished.operationId).toEqual(3);

    expect(adapter.getApiCalls()).toEqual(expectedApiCalls);
});

test('Subscription Manager - Acquire Multiple Subscribing Success', async () => {
    let adapter = new protocol_adapter_mock.MockProtocolAdapter();
    adapter.connect();

    // @ts-ignore
    let subscriptionManager = new subscription_manager.SubscriptionManager(adapter, createBasicSubscriptionManagerConfig());

    let filter1 = "a/b/accepted";
    let filter2 = "a/b/rejected";
    let expectedApiCalls : Array<protocol_adapter_mock.ProtocolAdapterApiCall> = new Array<protocol_adapter_mock.ProtocolAdapterApiCall>(
        {
            methodName: 'subscribe',
            args: {
                topicFilter: filter1,
                timeoutInSeconds: 30
            }
        },
        {
            methodName: 'subscribe',
            args: {
                topicFilter: filter2,
                timeoutInSeconds: 30
            }
        },
    );

    let allPromise = newLiftedPromise<void>();
    let subscribeSuccesses = new Array<subscription_manager.SubscribeSuccessEvent>();
    subscriptionManager.addListener(subscription_manager.SubscriptionManager.SUBSCRIBE_SUCCESS, (event) => {
        subscribeSuccesses.push(event);
        if (subscribeSuccesses.length == 2) {
            allPromise.resolve();
        }
    });

    expect(subscriptionManager.acquireSubscription({
        operationId: 1,
        type: subscription_manager.SubscriptionType.RequestResponse,
        topicFilters: [filter1, filter2]
    })).toEqual(subscription_manager.AcquireSubscriptionResult.Subscribing);

    adapter.completeSubscribe(filter1);
    adapter.completeSubscribe(filter2);

    await allPromise.promise;

    let expectedSubscribeSuccesses = new Array<subscription_manager.SubscribeSuccessEvent>(
        {
            topicFilter: filter1,
            operationId: 1,
        },
        {
            topicFilter: filter2,
            operationId: 1,
        }
    );
    expect(subscribeSuccesses).toEqual(expectedSubscribeSuccesses);

    expect(adapter.getApiCalls()).toEqual(expectedApiCalls);
});

test('Subscription Manager - Acquire Existing Subscribing', async () => {
    let adapter = new protocol_adapter_mock.MockProtocolAdapter();
    adapter.connect();

    // @ts-ignore
    let subscriptionManager = new subscription_manager.SubscriptionManager(adapter, createBasicSubscriptionManagerConfig());

    let filter1 = "a/b/+";
    let filter2 = "hello/world";
    let expectedApiCalls : Array<protocol_adapter_mock.ProtocolAdapterApiCall> = new Array<protocol_adapter_mock.ProtocolAdapterApiCall>(
        {
            methodName: 'subscribe',
            args: {
                topicFilter: filter1,
                timeoutInSeconds: 30
            }
        },
        {
            methodName: 'subscribe',
            args: {
                topicFilter: filter2,
                timeoutInSeconds: 30
            }
        }
    );

    expect(subscriptionManager.acquireSubscription({
        operationId: 1,
        type: subscription_manager.SubscriptionType.RequestResponse,
        topicFilters: [filter1]
    })).toEqual(subscription_manager.AcquireSubscriptionResult.Subscribing);

    expect(subscriptionManager.acquireSubscription({
        operationId: 2,
        type: subscription_manager.SubscriptionType.EventStream,
        topicFilters: [filter2]
    })).toEqual(subscription_manager.AcquireSubscriptionResult.Subscribing);

    expect(adapter.getApiCalls()).toEqual(expectedApiCalls);

    expect(subscriptionManager.acquireSubscription({
        operationId: 3,
        type: subscription_manager.SubscriptionType.RequestResponse,
        topicFilters: [filter1]
    })).toEqual(subscription_manager.AcquireSubscriptionResult.Subscribing);

    expect(subscriptionManager.acquireSubscription({
        operationId: 4,
        type: subscription_manager.SubscriptionType.EventStream,
        topicFilters: [filter2]
    })).toEqual(subscription_manager.AcquireSubscriptionResult.Subscribing);

    expect(adapter.getApiCalls()).toEqual(expectedApiCalls);
});

test('Subscription Manager - Acquire Multi Existing Subscribing', async () => {
    let adapter = new protocol_adapter_mock.MockProtocolAdapter();
    adapter.connect();

    // @ts-ignore
    let subscriptionManager = new subscription_manager.SubscriptionManager(adapter, createBasicSubscriptionManagerConfig());

    let filter1 = "a/b/+";
    let filter2 = "hello/world";
    let expectedApiCalls : Array<protocol_adapter_mock.ProtocolAdapterApiCall> = new Array<protocol_adapter_mock.ProtocolAdapterApiCall>(
        {
            methodName: 'subscribe',
            args: {
                topicFilter: filter1,
                timeoutInSeconds: 30
            }
        },
        {
            methodName: 'subscribe',
            args: {
                topicFilter: filter2,
                timeoutInSeconds: 30
            }
        }
    );

    expect(subscriptionManager.acquireSubscription({
        operationId: 1,
        type: subscription_manager.SubscriptionType.RequestResponse,
        topicFilters: [filter1, filter2]
    })).toEqual(subscription_manager.AcquireSubscriptionResult.Subscribing);

    expect(adapter.getApiCalls()).toEqual(expectedApiCalls);

    expect(subscriptionManager.acquireSubscription({
        operationId: 2,
        type: subscription_manager.SubscriptionType.RequestResponse,
        topicFilters: [filter1, filter2]
    })).toEqual(subscription_manager.AcquireSubscriptionResult.Subscribing);

    expect(adapter.getApiCalls()).toEqual(expectedApiCalls);
});

test('Subscription Manager - Acquire Multi Partially Subscribed', async () => {
    let adapter = new protocol_adapter_mock.MockProtocolAdapter();
    adapter.connect();

    // @ts-ignore
    let subscriptionManager = new subscription_manager.SubscriptionManager(adapter, createBasicSubscriptionManagerConfig());

    let filter1 = "a/b/+";
    let filter2 = "hello/world";
    let expectedApiCalls : Array<protocol_adapter_mock.ProtocolAdapterApiCall> = new Array<protocol_adapter_mock.ProtocolAdapterApiCall>(
        {
            methodName: 'subscribe',
            args: {
                topicFilter: filter1,
                timeoutInSeconds: 30
            }
        },
        {
            methodName: 'subscribe',
            args: {
                topicFilter: filter2,
                timeoutInSeconds: 30
            }
        }
    );

    expect(subscriptionManager.acquireSubscription({
        operationId: 1,
        type: subscription_manager.SubscriptionType.RequestResponse,
        topicFilters: [filter1]
    })).toEqual(subscription_manager.AcquireSubscriptionResult.Subscribing);

    expect(adapter.getApiCalls()).toEqual(expectedApiCalls.slice(0, 1));

    expect(subscriptionManager.acquireSubscription({
        operationId: 2,
        type: subscription_manager.SubscriptionType.RequestResponse,
        topicFilters: [filter1, filter2]
    })).toEqual(subscription_manager.AcquireSubscriptionResult.Subscribing);

    expect(adapter.getApiCalls()).toEqual(expectedApiCalls);
});

test('Subscription Manager - Acquire Subscribed Success', async () => {
    let adapter = new protocol_adapter_mock.MockProtocolAdapter();
    adapter.connect();

    // @ts-ignore
    let subscriptionManager = new subscription_manager.SubscriptionManager(adapter, createBasicSubscriptionManagerConfig());

    let filter1 = "a/b/+";
    let filter2 = "hello/world";
    let expectedApiCalls : Array<protocol_adapter_mock.ProtocolAdapterApiCall> = new Array<protocol_adapter_mock.ProtocolAdapterApiCall>(
        {
            methodName: 'subscribe',
            args: {
                topicFilter: filter1,
                timeoutInSeconds: 30
            }
        },
        {
            methodName: 'subscribe',
            args: {
                topicFilter: filter2,
                timeoutInSeconds: 30
            }
        },
    );

    let subscribeSuccessPromise1 = once(subscriptionManager, subscription_manager.SubscriptionManager.SUBSCRIBE_SUCCESS);
    expect(subscriptionManager.acquireSubscription({
        operationId: 1,
        type: subscription_manager.SubscriptionType.RequestResponse,
        topicFilters: [filter1]
    })).toEqual(subscription_manager.AcquireSubscriptionResult.Subscribing);

    adapter.completeSubscribe(filter1);

    let subscribeSuccess1 = (await subscribeSuccessPromise1)[0];
    expect(subscribeSuccess1.topicFilter).toEqual(filter1);
    expect(subscribeSuccess1.operationId).toEqual(1);

    expect(adapter.getApiCalls()).toEqual(expectedApiCalls.slice(0, 1));

    let streamingSubscriptionEstablishedPromise = once(subscriptionManager, subscription_manager.SubscriptionManager.STREAMING_SUBSCRIPTION_ESTABLISHED);

    expect(subscriptionManager.acquireSubscription({
        operationId: 2,
        type: subscription_manager.SubscriptionType.EventStream,
        topicFilters: [filter2]
    })).toEqual(subscription_manager.AcquireSubscriptionResult.Subscribing);

    adapter.completeSubscribe(filter2);

    let streamingSubscriptionEstablished = (await streamingSubscriptionEstablishedPromise)[0];
    expect(streamingSubscriptionEstablished.topicFilter).toEqual(filter2);
    expect(streamingSubscriptionEstablished.operationId).toEqual(2);

    expect(adapter.getApiCalls()).toEqual(expectedApiCalls);

    expect(subscriptionManager.acquireSubscription({
        operationId: 3,
        type: subscription_manager.SubscriptionType.RequestResponse,
        topicFilters: [filter1]
    })).toEqual(subscription_manager.AcquireSubscriptionResult.Subscribed);

    expect(subscriptionManager.acquireSubscription({
        operationId: 4,
        type: subscription_manager.SubscriptionType.EventStream,
        topicFilters: [filter2]
    })).toEqual(subscription_manager.AcquireSubscriptionResult.Subscribed);

    expect(adapter.getApiCalls()).toEqual(expectedApiCalls);
});

test('Subscription Manager - Acquire Multi Subscribed Success', async () => {
    let adapter = new protocol_adapter_mock.MockProtocolAdapter();
    adapter.connect();

    // @ts-ignore
    let subscriptionManager = new subscription_manager.SubscriptionManager(adapter, createBasicSubscriptionManagerConfig());

    let filter1 = "a/b/+";
    let filter2 = "hello/world";
    let expectedApiCalls : Array<protocol_adapter_mock.ProtocolAdapterApiCall> = new Array<protocol_adapter_mock.ProtocolAdapterApiCall>(
        {
            methodName: 'subscribe',
            args: {
                topicFilter: filter1,
                timeoutInSeconds: 30
            }
        },
        {
            methodName: 'subscribe',
            args: {
                topicFilter: filter2,
                timeoutInSeconds: 30
            }
        },
    );

    let subscribeSuccessPromise1 = once(subscriptionManager, subscription_manager.SubscriptionManager.SUBSCRIBE_SUCCESS);
    expect(subscriptionManager.acquireSubscription({
        operationId: 1,
        type: subscription_manager.SubscriptionType.RequestResponse,
        topicFilters: [filter1]
    })).toEqual(subscription_manager.AcquireSubscriptionResult.Subscribing);

    adapter.completeSubscribe(filter1);

    let subscribeSuccess1 = (await subscribeSuccessPromise1)[0];
    expect(subscribeSuccess1.topicFilter).toEqual(filter1);
    expect(subscribeSuccess1.operationId).toEqual(1);

    expect(adapter.getApiCalls()).toEqual(expectedApiCalls.slice(0, 1));

    let subscribeSuccessPromise2 = once(subscriptionManager, subscription_manager.SubscriptionManager.SUBSCRIBE_SUCCESS);

    expect(subscriptionManager.acquireSubscription({
        operationId: 2,
        type: subscription_manager.SubscriptionType.RequestResponse,
        topicFilters: [filter2]
    })).toEqual(subscription_manager.AcquireSubscriptionResult.Subscribing);

    adapter.completeSubscribe(filter2);

    let subscribeSuccess2 = (await subscribeSuccessPromise2)[0];
    expect(subscribeSuccess2.topicFilter).toEqual(filter2);
    expect(subscribeSuccess2.operationId).toEqual(2);

    expect(adapter.getApiCalls()).toEqual(expectedApiCalls);

    expect(subscriptionManager.acquireSubscription({
        operationId: 3,
        type: subscription_manager.SubscriptionType.RequestResponse,
        topicFilters: [filter1, filter2]
    })).toEqual(subscription_manager.AcquireSubscriptionResult.Subscribed);

    expect(adapter.getApiCalls()).toEqual(expectedApiCalls);
});

test('Subscription Manager - Acquire Request-Response Blocked', async () => {
    let adapter = new protocol_adapter_mock.MockProtocolAdapter();
    adapter.connect();

    // @ts-ignore
    let subscriptionManager = new subscription_manager.SubscriptionManager(adapter, createBasicSubscriptionManagerConfig());

    let filter1 = "a/b/+";
    let filter2 = "hello/world";
    let filter3 = "fail/ure";

    let expectedApiCalls : Array<protocol_adapter_mock.ProtocolAdapterApiCall> = new Array<protocol_adapter_mock.ProtocolAdapterApiCall>(
        {
            methodName: 'subscribe',
            args: {
                topicFilter: filter1,
                timeoutInSeconds: 30
            }
        },
        {
            methodName: 'subscribe',
            args: {
                topicFilter: filter2,
                timeoutInSeconds: 30
            }
        },
    );

    expect(subscriptionManager.acquireSubscription({
        operationId: 1,
        type: subscription_manager.SubscriptionType.RequestResponse,
        topicFilters: [filter1]
    })).toEqual(subscription_manager.AcquireSubscriptionResult.Subscribing);

    expect(adapter.getApiCalls()).toEqual(expectedApiCalls.slice(0, 1));

    expect(subscriptionManager.acquireSubscription({
        operationId: 2,
        type: subscription_manager.SubscriptionType.RequestResponse,
        topicFilters: [filter2]
    })).toEqual(subscription_manager.AcquireSubscriptionResult.Subscribing);

    expect(adapter.getApiCalls()).toEqual(expectedApiCalls);

    expect(subscriptionManager.acquireSubscription({
        operationId: 3,
        type: subscription_manager.SubscriptionType.RequestResponse,
        topicFilters: [filter3]
    })).toEqual(subscription_manager.AcquireSubscriptionResult.Blocked);

    expect(adapter.getApiCalls()).toEqual(expectedApiCalls);
});

test('Subscription Manager - Acquire Multi Request-Response Partial Blocked', async () => {
    let adapter = new protocol_adapter_mock.MockProtocolAdapter();
    adapter.connect();

    // @ts-ignore
    let subscriptionManager = new subscription_manager.SubscriptionManager(adapter, createBasicSubscriptionManagerConfig());

    let filter1 = "a/b/+";
    let filter2 = "hello/world";
    let filter3 = "fail/ure";

    let expectedApiCalls : Array<protocol_adapter_mock.ProtocolAdapterApiCall> = new Array<protocol_adapter_mock.ProtocolAdapterApiCall>(
        {
            methodName: 'subscribe',
            args: {
                topicFilter: filter1,
                timeoutInSeconds: 30
            }
        },
    );

    expect(subscriptionManager.acquireSubscription({
        operationId: 1,
        type: subscription_manager.SubscriptionType.RequestResponse,
        topicFilters: [filter1]
    })).toEqual(subscription_manager.AcquireSubscriptionResult.Subscribing);

    expect(adapter.getApiCalls()).toEqual(expectedApiCalls);

    expect(subscriptionManager.acquireSubscription({
        operationId: 2,
        type: subscription_manager.SubscriptionType.RequestResponse,
        topicFilters: [filter2, filter3]
    })).toEqual(subscription_manager.AcquireSubscriptionResult.Blocked);

    expect(adapter.getApiCalls()).toEqual(expectedApiCalls);
});

test('Subscription Manager - Acquire Streaming Blocked', async () => {
    let adapter = new protocol_adapter_mock.MockProtocolAdapter();
    adapter.connect();

    // @ts-ignore
    let subscriptionManager = new subscription_manager.SubscriptionManager(adapter, createBasicSubscriptionManagerConfig());

    let filter1 = "a/b/+";
    let filter2 = "hello/world";

    let expectedApiCalls : Array<protocol_adapter_mock.ProtocolAdapterApiCall> = new Array<protocol_adapter_mock.ProtocolAdapterApiCall>(
        {
            methodName: 'subscribe',
            args: {
                topicFilter: filter1,
                timeoutInSeconds: 30
            }
        },
        {
            methodName: 'unsubscribe',
            args: {
                topicFilter: filter1,
                timeoutInSeconds: 30
            }
        },
    );

    let streamingSubscriptionEstablishedPromise = once(subscriptionManager, subscription_manager.SubscriptionManager.STREAMING_SUBSCRIPTION_ESTABLISHED);

    expect(subscriptionManager.acquireSubscription({
        operationId: 1,
        type: subscription_manager.SubscriptionType.EventStream,
        topicFilters: [filter1]
    })).toEqual(subscription_manager.AcquireSubscriptionResult.Subscribing);

    adapter.completeSubscribe(filter1);

    let streamingSubscriptionEstablished = (await streamingSubscriptionEstablishedPromise)[0];
    expect(streamingSubscriptionEstablished.topicFilter).toEqual(filter1);
    expect(streamingSubscriptionEstablished.operationId).toEqual(1);

    expect(adapter.getApiCalls()).toEqual(expectedApiCalls.slice(0, 1));

    let subscriptionOrphanedPromise = once(subscriptionManager, subscription_manager.SubscriptionManager.SUBSCRIPTION_ORPHANED);
    subscriptionManager.releaseSubscription({
        operationId: 1,
        topicFilters: [filter1]
    });

    let subscriptionOrphaned = (await subscriptionOrphanedPromise)[0];
    expect(subscriptionOrphaned.topicFilter).toEqual(filter1);

    subscriptionManager.purge();

    expect(adapter.getApiCalls()).toEqual(expectedApiCalls);

    expect(subscriptionManager.acquireSubscription({
        operationId: 2,
        type: subscription_manager.SubscriptionType.EventStream,
        topicFilters: [filter2]
    })).toEqual(subscription_manager.AcquireSubscriptionResult.Blocked);

    expect(adapter.getApiCalls()).toEqual(expectedApiCalls);
});

test('Subscription Manager - Acquire Multi Streaming Blocked', async () => {
    let adapter = new protocol_adapter_mock.MockProtocolAdapter();
    adapter.connect();

    let config = createBasicSubscriptionManagerConfig();
    config.maxStreamingSubscriptions = 2;

    // @ts-ignore
    let subscriptionManager = new subscription_manager.SubscriptionManager(adapter, config);

    let filter1 = "a/b/+";
    let filter2 = "hello/world";
    let filter3 = "foo/bar";

    let expectedApiCalls : Array<protocol_adapter_mock.ProtocolAdapterApiCall> = new Array<protocol_adapter_mock.ProtocolAdapterApiCall>(
        {
            methodName: 'subscribe',
            args: {
                topicFilter: filter1,
                timeoutInSeconds: 30
            }
        },
        {
            methodName: 'unsubscribe',
            args: {
                topicFilter: filter1,
                timeoutInSeconds: 30
            }
        },
    );

    let streamingSubscriptionEstablishedPromise = once(subscriptionManager, subscription_manager.SubscriptionManager.STREAMING_SUBSCRIPTION_ESTABLISHED);

    expect(subscriptionManager.acquireSubscription({
        operationId: 1,
        type: subscription_manager.SubscriptionType.EventStream,
        topicFilters: [filter1]
    })).toEqual(subscription_manager.AcquireSubscriptionResult.Subscribing);

    adapter.completeSubscribe(filter1);

    let streamingSubscriptionEstablished = (await streamingSubscriptionEstablishedPromise)[0];
    expect(streamingSubscriptionEstablished.topicFilter).toEqual(filter1);
    expect(streamingSubscriptionEstablished.operationId).toEqual(1);

    expect(adapter.getApiCalls()).toEqual(expectedApiCalls.slice(0, 1));

    let subscriptionOrphanedPromise = once(subscriptionManager, subscription_manager.SubscriptionManager.SUBSCRIPTION_ORPHANED);
    subscriptionManager.releaseSubscription({
        operationId: 1,
        topicFilters: [filter1]
    });

    let subscriptionOrphaned = (await subscriptionOrphanedPromise)[0];
    expect(subscriptionOrphaned.topicFilter).toEqual(filter1);

    subscriptionManager.purge();

    expect(adapter.getApiCalls()).toEqual(expectedApiCalls);

    expect(subscriptionManager.acquireSubscription({
        operationId: 2,
        type: subscription_manager.SubscriptionType.EventStream,
        topicFilters: [filter2, filter3]
    })).toEqual(subscription_manager.AcquireSubscriptionResult.Blocked);

    expect(adapter.getApiCalls()).toEqual(expectedApiCalls);
});

test('Subscription Manager - Acquire Streaming NoCapacity, None Allowed', async () => {
    let adapter = new protocol_adapter_mock.MockProtocolAdapter();
    adapter.connect();

    let config = createBasicSubscriptionManagerConfig();
    config.maxStreamingSubscriptions = 0;

    // @ts-ignore
    let subscriptionManager = new subscription_manager.SubscriptionManager(adapter, config);

    let filter1 = "a/b/+";

    let expectedApiCalls : Array<protocol_adapter_mock.ProtocolAdapterApiCall> = new Array<protocol_adapter_mock.ProtocolAdapterApiCall>();

    expect(subscriptionManager.acquireSubscription({
        operationId: 1,
        type: subscription_manager.SubscriptionType.EventStream,
        topicFilters: [filter1]
    })).toEqual(subscription_manager.AcquireSubscriptionResult.NoCapacity);

    expect(adapter.getApiCalls()).toEqual(expectedApiCalls);
});

test('Subscription Manager - Acquire Streaming NoCapacity, Too Many', async () => {
    let adapter = new protocol_adapter_mock.MockProtocolAdapter();
    adapter.connect();

    let config = createBasicSubscriptionManagerConfig();
    config.maxStreamingSubscriptions = 4;

    // @ts-ignore
    let subscriptionManager = new subscription_manager.SubscriptionManager(adapter, config);

    for (let i = 0; i < 4; i++) {
        let filter = `a/b/${i}`;
        expect(subscriptionManager.acquireSubscription({
            operationId: i + 1,
            type: subscription_manager.SubscriptionType.EventStream,
            topicFilters: [filter]
        })).toEqual(subscription_manager.AcquireSubscriptionResult.Subscribing);
    }

    let filter1 = "hello/world";

    expect(subscriptionManager.acquireSubscription({
        operationId: 1,
        type: subscription_manager.SubscriptionType.EventStream,
        topicFilters: [filter1]
    })).toEqual(subscription_manager.AcquireSubscriptionResult.NoCapacity);
});

test('Subscription Manager - Acquire Multi Streaming NoCapacity', async () => {
    let adapter = new protocol_adapter_mock.MockProtocolAdapter();
    adapter.connect();

    let config = createBasicSubscriptionManagerConfig();
    config.maxStreamingSubscriptions = 2;

    // @ts-ignore
    let subscriptionManager = new subscription_manager.SubscriptionManager(adapter, config);

    let filter1 = "a/b/+";
    let filter2 = "hello/world";
    let filter3 = "foo/bar";

    let expectedApiCalls : Array<protocol_adapter_mock.ProtocolAdapterApiCall> = new Array<protocol_adapter_mock.ProtocolAdapterApiCall>(
        {
            methodName: 'subscribe',
            args: {
                topicFilter: filter1,
                timeoutInSeconds: 30
            }
        },
    );

    let streamingSubscriptionEstablishedPromise = once(subscriptionManager, subscription_manager.SubscriptionManager.STREAMING_SUBSCRIPTION_ESTABLISHED);

    expect(subscriptionManager.acquireSubscription({
        operationId: 1,
        type: subscription_manager.SubscriptionType.EventStream,
        topicFilters: [filter1]
    })).toEqual(subscription_manager.AcquireSubscriptionResult.Subscribing);

    adapter.completeSubscribe(filter1);

    let streamingSubscriptionEstablished = (await streamingSubscriptionEstablishedPromise)[0];
    expect(streamingSubscriptionEstablished.topicFilter).toEqual(filter1);
    expect(streamingSubscriptionEstablished.operationId).toEqual(1);

    expect(adapter.getApiCalls()).toEqual(expectedApiCalls);

    expect(subscriptionManager.acquireSubscription({
        operationId: 2,
        type: subscription_manager.SubscriptionType.EventStream,
        topicFilters: [filter2, filter3]
    })).toEqual(subscription_manager.AcquireSubscriptionResult.NoCapacity);

    expect(adapter.getApiCalls()).toEqual(expectedApiCalls);
});

test('Subscription Manager - Acquire Failure Mixed Subscription Types', async () => {
    let adapter = new protocol_adapter_mock.MockProtocolAdapter();
    adapter.connect();

    let config = createBasicSubscriptionManagerConfig();
    config.maxStreamingSubscriptions = 2;

    // @ts-ignore
    let subscriptionManager = new subscription_manager.SubscriptionManager(adapter, config);

    let filter1 = "a/b/+";

    expect(subscriptionManager.acquireSubscription({
        operationId: 1,
        type: subscription_manager.SubscriptionType.RequestResponse,
        topicFilters: [filter1]
    })).toEqual(subscription_manager.AcquireSubscriptionResult.Subscribing);

    expect(subscriptionManager.acquireSubscription({
        operationId: 2,
        type: subscription_manager.SubscriptionType.EventStream,
        topicFilters: [filter1]
    })).toEqual(subscription_manager.AcquireSubscriptionResult.Failure);
});

test('Subscription Manager - Acquire Multi Failure Mixed Subscription Types', async () => {
    let adapter = new protocol_adapter_mock.MockProtocolAdapter();
    adapter.connect();

    let config = createBasicSubscriptionManagerConfig();
    config.maxStreamingSubscriptions = 2;

    // @ts-ignore
    let subscriptionManager = new subscription_manager.SubscriptionManager(adapter, config);

    let filter1 = "a/b/+";
    let filter2 = "c/d";

    expect(subscriptionManager.acquireSubscription({
        operationId: 1,
        type: subscription_manager.SubscriptionType.RequestResponse,
        topicFilters: [filter1]
    })).toEqual(subscription_manager.AcquireSubscriptionResult.Subscribing);

    expect(subscriptionManager.acquireSubscription({
        operationId: 2,
        type: subscription_manager.SubscriptionType.EventStream,
        topicFilters: [filter1, filter2]
    })).toEqual(subscription_manager.AcquireSubscriptionResult.Failure);
});

test('Subscription Manager - Acquire Failure Poisoned', async () => {
    let adapter = new protocol_adapter_mock.MockProtocolAdapter();
    adapter.connect();

    let config = createBasicSubscriptionManagerConfig();
    config.maxStreamingSubscriptions = 2;

    // @ts-ignore
    let subscriptionManager = new subscription_manager.SubscriptionManager(adapter, config);

    let filter1 = "a/b/+";

    expect(subscriptionManager.acquireSubscription({
        operationId: 1,
        type: subscription_manager.SubscriptionType.EventStream,
        topicFilters: [filter1]
    })).toEqual(subscription_manager.AcquireSubscriptionResult.Subscribing);

    let subscriptionHaltedPromise = once(subscriptionManager, subscription_manager.SubscriptionManager.STREAMING_SUBSCRIPTION_HALTED);

    adapter.completeSubscribe(filter1, new CrtError("Unrecoverable Error"));

    let subscriptionHalted = (await subscriptionHaltedPromise)[0];
    expect(subscriptionHalted.topicFilter).toEqual(filter1);

    expect(subscriptionManager.acquireSubscription({
        operationId: 2,
        type: subscription_manager.SubscriptionType.EventStream,
        topicFilters: [filter1]
    })).toEqual(subscription_manager.AcquireSubscriptionResult.Failure);
});




test('Subscription Manager - RequestResponse Multi Acquire/Release triggers Unsubscribe', async () => {
    let adapter = new protocol_adapter_mock.MockProtocolAdapter();
    adapter.connect();

    // @ts-ignore
    let subscriptionManager = new subscription_manager.SubscriptionManager(adapter, createBasicSubscriptionManagerConfig());

    let filter1 = "a/b/accepted";
    let expectedApiCalls : Array<protocol_adapter_mock.ProtocolAdapterApiCall> = new Array<protocol_adapter_mock.ProtocolAdapterApiCall>(
        {
            methodName: 'subscribe',
            args: {
                topicFilter: filter1,
                timeoutInSeconds: 30
            }
        },
        {
            methodName: 'unsubscribe',
            args: {
                topicFilter: filter1,
                timeoutInSeconds: 30
            }
        },
    );

    let allPromise = newLiftedPromise<void>();
    let events = new Array<protocol_adapter_mock.SubscriptionManagerEvent>();
    subscriptionManager.addListener(subscription_manager.SubscriptionManager.SUBSCRIBE_SUCCESS, (event) => {
        events.push({
            type: subscription_manager.SubscriptionEventType.SubscribeSuccess,
            data: event
        });
        if (events.length == 2) {
            allPromise.resolve();
        }
    });

    expect(subscriptionManager.acquireSubscription({
        operationId: 1,
        type: subscription_manager.SubscriptionType.RequestResponse,
        topicFilters: [filter1]
    })).toEqual(subscription_manager.AcquireSubscriptionResult.Subscribing);

    expect(subscriptionManager.acquireSubscription({
        operationId: 2,
        type: subscription_manager.SubscriptionType.RequestResponse,
        topicFilters: [filter1]
    })).toEqual(subscription_manager.AcquireSubscriptionResult.Subscribing);

    adapter.completeSubscribe(filter1);

    await allPromise.promise;

    let expectedSubscribeSuccesses : protocol_adapter_mock.SubscriptionManagerEvent[] = [
        {
            type: subscription_manager.SubscriptionEventType.SubscribeSuccess,
            data: {
                topicFilter: filter1,
                operationId: 1,
            }
        },
        {
            type: subscription_manager.SubscriptionEventType.SubscribeSuccess,
            data: {
                topicFilter: filter1,
                operationId: 2,
            }
        }
    ];

    expect(protocol_adapter_mock.subscriptionManagerEventSequenceContainsEvents(events, expectedSubscribeSuccesses)).toBeTruthy();
    expect(adapter.getApiCalls()).toEqual(expectedApiCalls.slice(0, 1));

    subscriptionManager.releaseSubscription({
        operationId: 1,
        topicFilters: [filter1]
    });

    subscriptionManager.purge();

    expect(adapter.getApiCalls()).toEqual(expectedApiCalls.slice(0, 1));

    subscriptionManager.releaseSubscription({
        operationId: 2,
        topicFilters: [filter1]
    });

    expect(adapter.getApiCalls()).toEqual(expectedApiCalls.slice(0, 1));

    subscriptionManager.purge();

    expect(adapter.getApiCalls()).toEqual(expectedApiCalls);
});

test('Subscription Manager - Multi Acquire/Release Multi triggers Unsubscribes', async () => {
    let adapter = new protocol_adapter_mock.MockProtocolAdapter();
    adapter.connect();

    // @ts-ignore
    let subscriptionManager = new subscription_manager.SubscriptionManager(adapter, createBasicSubscriptionManagerConfig());

    let filter1 = "a/b/accepted";
    let filter2 = "a/b/rejected";
    let expectedSubscribes : protocol_adapter_mock.ProtocolAdapterApiCall[] = [
        {
            methodName: 'subscribe',
            args: {
                topicFilter: filter1,
                timeoutInSeconds: 30
            }
        },
        {
            methodName: 'subscribe',
            args: {
                topicFilter: filter2,
                timeoutInSeconds: 30
            }
        },
    ];

    let expectedUnsubscribes : protocol_adapter_mock.ProtocolAdapterApiCall[] = [
        {
            methodName: 'unsubscribe',
            args: {
                topicFilter: filter1,
                timeoutInSeconds: 30
            }
        },
        {
            methodName: 'unsubscribe',
            args: {
                topicFilter: filter2,
                timeoutInSeconds: 30
            }
        },
    ];

    let allSubscribedPromise = newLiftedPromise<void>();
    let events = new Array<protocol_adapter_mock.SubscriptionManagerEvent>();
    subscriptionManager.addListener(subscription_manager.SubscriptionManager.SUBSCRIBE_SUCCESS, (event) => {
        events.push({
            type: subscription_manager.SubscriptionEventType.SubscribeSuccess,
            data: event
        });
        if (events.length == 4) {
            allSubscribedPromise.resolve();
        }
    });

    expect(subscriptionManager.acquireSubscription({
        operationId: 1,
        type: subscription_manager.SubscriptionType.RequestResponse,
        topicFilters: [filter1, filter2]
    })).toEqual(subscription_manager.AcquireSubscriptionResult.Subscribing);

    expect(subscriptionManager.acquireSubscription({
        operationId: 2,
        type: subscription_manager.SubscriptionType.RequestResponse,
        topicFilters: [filter1, filter2]
    })).toEqual(subscription_manager.AcquireSubscriptionResult.Subscribing);

    adapter.completeSubscribe(filter1);
    adapter.completeSubscribe(filter2);

    await allSubscribedPromise.promise;

    let expectedSubscribeSuccesses : protocol_adapter_mock.SubscriptionManagerEvent[] = [
        {
            type: subscription_manager.SubscriptionEventType.SubscribeSuccess,
            data: {
                topicFilter: filter1,
                operationId: 1,
            }
        },
        {
            type: subscription_manager.SubscriptionEventType.SubscribeSuccess,
            data: {
                topicFilter: filter1,
                operationId: 2,
            }
        },
        {
            type: subscription_manager.SubscriptionEventType.SubscribeSuccess,
            data: {
                topicFilter: filter2,
                operationId: 1,
            }
        },
        {
            type: subscription_manager.SubscriptionEventType.SubscribeSuccess,
            data: {
                topicFilter: filter2,
                operationId: 2,
            }
        },
    ];

    expect(protocol_adapter_mock.subscriptionManagerEventSequenceContainsEvents(events, expectedSubscribeSuccesses)).toBeTruthy();
    expect(protocol_adapter_mock.protocolAdapterApiCallSequenceContainsApiCalls(adapter.getApiCalls(), expectedSubscribes)).toBeTruthy();

    subscriptionManager.releaseSubscription({
        operationId: 1,
        topicFilters: [filter1, filter2]
    });

    subscriptionManager.purge();

    expect(protocol_adapter_mock.protocolAdapterApiCallSequenceContainsApiCalls(adapter.getApiCalls(), expectedUnsubscribes)).toBeFalsy();

    subscriptionManager.releaseSubscription({
        operationId: 2,
        topicFilters: [filter1, filter2]
    });

    expect(protocol_adapter_mock.protocolAdapterApiCallSequenceContainsApiCalls(adapter.getApiCalls(), expectedUnsubscribes)).toBeFalsy();

    subscriptionManager.purge();

    expect(protocol_adapter_mock.protocolAdapterApiCallSequenceContainsApiCalls(adapter.getApiCalls(), expectedUnsubscribes)).toBeTruthy();
});

test('Subscription Manager - Streaming Multi Acquire/Release triggers Unsubscribe', async () => {
    let adapter = new protocol_adapter_mock.MockProtocolAdapter();
    adapter.connect();

    // @ts-ignore
    let subscriptionManager = new subscription_manager.SubscriptionManager(adapter, createBasicSubscriptionManagerConfig());

    let filter1 = "a/b/accepted";
    let expectedApiCalls : Array<protocol_adapter_mock.ProtocolAdapterApiCall> = new Array<protocol_adapter_mock.ProtocolAdapterApiCall>(
        {
            methodName: 'subscribe',
            args: {
                topicFilter: filter1,
                timeoutInSeconds: 30
            }
        },
        {
            methodName: 'unsubscribe',
            args: {
                topicFilter: filter1,
                timeoutInSeconds: 30
            }
        },
    );

    let allPromise = newLiftedPromise<void>();
    let events = new Array<protocol_adapter_mock.SubscriptionManagerEvent>();
    subscriptionManager.addListener(subscription_manager.SubscriptionManager.STREAMING_SUBSCRIPTION_ESTABLISHED, (event) => {
        events.push({
            type: subscription_manager.SubscriptionEventType.StreamingSubscriptionEstablished,
            data: event
        });
        if (events.length == 2) {
            allPromise.resolve();
        }
    });

    expect(subscriptionManager.acquireSubscription({
        operationId: 1,
        type: subscription_manager.SubscriptionType.EventStream,
        topicFilters: [filter1]
    })).toEqual(subscription_manager.AcquireSubscriptionResult.Subscribing);

    expect(subscriptionManager.acquireSubscription({
        operationId: 2,
        type: subscription_manager.SubscriptionType.EventStream,
        topicFilters: [filter1]
    })).toEqual(subscription_manager.AcquireSubscriptionResult.Subscribing);

    adapter.completeSubscribe(filter1);

    await allPromise.promise;

    let expectedStreamingSubscriptionEstablishments : protocol_adapter_mock.SubscriptionManagerEvent[] = [
        {
            type: subscription_manager.SubscriptionEventType.StreamingSubscriptionEstablished,
            data: {
                topicFilter: filter1,
                operationId: 1,
            }
        },
        {
            type: subscription_manager.SubscriptionEventType.StreamingSubscriptionEstablished,
            data: {
                topicFilter: filter1,
                operationId: 2,
            }
        }
    ];

    expect(protocol_adapter_mock.subscriptionManagerEventSequenceContainsEvents(events, expectedStreamingSubscriptionEstablishments)).toBeTruthy();
    expect(adapter.getApiCalls()).toEqual(expectedApiCalls.slice(0, 1));

    subscriptionManager.releaseSubscription({
        operationId: 1,
        topicFilters: [filter1]
    });

    subscriptionManager.purge();

    expect(adapter.getApiCalls()).toEqual(expectedApiCalls.slice(0, 1));

    subscriptionManager.releaseSubscription({
        operationId: 2,
        topicFilters: [filter1]
    });

    expect(adapter.getApiCalls()).toEqual(expectedApiCalls.slice(0, 1));

    subscriptionManager.purge();

    expect(adapter.getApiCalls()).toEqual(expectedApiCalls);
});

async function doUnsubscribeMakesRoomTest(shouldUnsubscribeSucceed: boolean) {
    let adapter = new protocol_adapter_mock.MockProtocolAdapter();
    adapter.connect();

    // @ts-ignore
    let subscriptionManager = new subscription_manager.SubscriptionManager(adapter, createBasicSubscriptionManagerConfig());

    let filter1 = "a/b/accepted";
    let filter2 = "a/b/rejected";
    let filter3 = "hello/world";
    let expectedSubscribes : protocol_adapter_mock.ProtocolAdapterApiCall[] = [
        {
            methodName: 'subscribe',
            args: {
                topicFilter: filter1,
                timeoutInSeconds: 30
            }
        },
        {
            methodName: 'subscribe',
            args: {
                topicFilter: filter2,
                timeoutInSeconds: 30
            }
        },
    ];

    let blockedSubscribe = {
        methodName: 'subscribe',
        args: {
            topicFilter: filter3,
            timeoutInSeconds: 30
        }
    };

    let expectedUnsubscribes : protocol_adapter_mock.ProtocolAdapterApiCall[] = [
        {
            methodName: 'unsubscribe',
            args: {
                topicFilter: filter1,
                timeoutInSeconds: 30
            }
        },
    ];

    let allSubscribedPromise = newLiftedPromise<void>();
    let events = new Array<protocol_adapter_mock.SubscriptionManagerEvent>();
    subscriptionManager.addListener(subscription_manager.SubscriptionManager.SUBSCRIBE_SUCCESS, (event) => {
        events.push({
            type: subscription_manager.SubscriptionEventType.SubscribeSuccess,
            data: event
        });
        if (events.length == 2) {
            allSubscribedPromise.resolve();
        }
    });

    expect(subscriptionManager.acquireSubscription({
        operationId: 1,
        type: subscription_manager.SubscriptionType.RequestResponse,
        topicFilters: [filter1]
    })).toEqual(subscription_manager.AcquireSubscriptionResult.Subscribing);

    expect(subscriptionManager.acquireSubscription({
        operationId: 2,
        type: subscription_manager.SubscriptionType.RequestResponse,
        topicFilters: [filter2]
    })).toEqual(subscription_manager.AcquireSubscriptionResult.Subscribing);

    expect(subscriptionManager.acquireSubscription({
        operationId: 3,
        type: subscription_manager.SubscriptionType.RequestResponse,
        topicFilters: [filter3]
    })).toEqual(subscription_manager.AcquireSubscriptionResult.Blocked);

    adapter.completeSubscribe(filter1);
    adapter.completeSubscribe(filter2);

    await allSubscribedPromise.promise;

    let expectedSubscribeSuccesses : protocol_adapter_mock.SubscriptionManagerEvent[] = [
        {
            type: subscription_manager.SubscriptionEventType.SubscribeSuccess,
            data: {
                topicFilter: filter1,
                operationId: 1,
            }
        },
        {
            type: subscription_manager.SubscriptionEventType.SubscribeSuccess,
            data: {
                topicFilter: filter2,
                operationId: 2,
            }
        },
    ];

    expect(protocol_adapter_mock.subscriptionManagerEventSequenceContainsEvents(events, expectedSubscribeSuccesses)).toBeTruthy();
    expect(protocol_adapter_mock.protocolAdapterApiCallSequenceContainsApiCalls(adapter.getApiCalls(), expectedSubscribes)).toBeTruthy();

    expect(subscriptionManager.acquireSubscription({
        operationId: 3,
        type: subscription_manager.SubscriptionType.RequestResponse,
        topicFilters: [filter3]
    })).toEqual(subscription_manager.AcquireSubscriptionResult.Blocked);

    subscriptionManager.releaseSubscription({
        operationId: 1,
        topicFilters: [filter1]
    });

    expect(subscriptionManager.acquireSubscription({
        operationId: 3,
        type: subscription_manager.SubscriptionType.RequestResponse,
        topicFilters: [filter3]
    })).toEqual(subscription_manager.AcquireSubscriptionResult.Blocked);

    expect(protocol_adapter_mock.protocolAdapterApiCallSequenceContainsApiCalls(adapter.getApiCalls(), expectedUnsubscribes)).toBeFalsy();

    subscriptionManager.purge();

    expect(subscriptionManager.acquireSubscription({
        operationId: 3,
        type: subscription_manager.SubscriptionType.RequestResponse,
        topicFilters: [filter3]
    })).toEqual(subscription_manager.AcquireSubscriptionResult.Blocked);

    expect(protocol_adapter_mock.protocolAdapterApiCallSequenceContainsApiCalls(adapter.getApiCalls(), expectedUnsubscribes)).toBeTruthy();

    if (shouldUnsubscribeSucceed) {
        adapter.completeUnsubscribe(filter1);
    } else {
        adapter.completeUnsubscribe(filter1, new CrtError("Help"));
    }

    expect(protocol_adapter_mock.protocolAdapterApiCallSequenceContainsApiCall(adapter.getApiCalls(), blockedSubscribe)).toBeFalsy();

    subscriptionManager.purge();

    let expectedAcquireResult = shouldUnsubscribeSucceed ? subscription_manager.AcquireSubscriptionResult.Subscribing : subscription_manager.AcquireSubscriptionResult.Blocked;
    expect(subscriptionManager.acquireSubscription({
        operationId: 3,
        type: subscription_manager.SubscriptionType.RequestResponse,
        topicFilters: [filter3]
    })).toEqual(expectedAcquireResult);

    expect(protocol_adapter_mock.protocolAdapterApiCallSequenceContainsApiCall(adapter.getApiCalls(), blockedSubscribe)).toEqual(shouldUnsubscribeSucceed);
}

test('Subscription Manager - Successful Unsubscribe Frees Subscription Space', async () => {
    await doUnsubscribeMakesRoomTest(true);
});

test('Subscription Manager - Unsuccessful Unsubscribe Does Not Free Subscription Space', async () => {
    await doUnsubscribeMakesRoomTest(false);
});

test('Subscription Manager - Synchronous RequestResponse Subscribe Failure causes acquire failure', async () => {
    let adapter = new protocol_adapter_mock.MockProtocolAdapter({
        subscribeHandler: (subscribeOptions) => { throw new CrtError("Bad"); }
    });
    adapter.connect();

    // @ts-ignore
    let subscriptionManager = new subscription_manager.SubscriptionManager(adapter, createBasicSubscriptionManagerConfig());

    let filter1 = "a/b/+";
    let expectedApiCalls : Array<protocol_adapter_mock.ProtocolAdapterApiCall> = new Array<protocol_adapter_mock.ProtocolAdapterApiCall>(
        {
            methodName: 'subscribe',
            args: {
                topicFilter: filter1,
                timeoutInSeconds: 30
            }
        },
    );

    expect(subscriptionManager.acquireSubscription({
        operationId: 1,
        type: subscription_manager.SubscriptionType.RequestResponse,
        topicFilters: [filter1]
    })).toEqual(subscription_manager.AcquireSubscriptionResult.Failure);

    expect(adapter.getApiCalls()).toEqual(expectedApiCalls);
});

test('Subscription Manager - Synchronous Streaming Subscribe Failure causes acquire failure and poisons future acquires', async () => {
    let attemptNumber = 0;

    let adapter = new protocol_adapter_mock.MockProtocolAdapter({
        subscribeHandler: (subscribeOptions) => {
            attemptNumber++;
            if (attemptNumber == 1) {
                throw new CrtError("Bad");
            }
        }
    });
    adapter.connect();

    // @ts-ignore
    let subscriptionManager = new subscription_manager.SubscriptionManager(adapter, createBasicSubscriptionManagerConfig());

    let filter1 = "a/b/+";
    let expectedApiCalls : Array<protocol_adapter_mock.ProtocolAdapterApiCall> = new Array<protocol_adapter_mock.ProtocolAdapterApiCall>(
        {
            methodName: 'subscribe',
            args: {
                topicFilter: filter1,
                timeoutInSeconds: 30
            }
        },
    );

    expect(subscriptionManager.acquireSubscription({
        operationId: 1,
        type: subscription_manager.SubscriptionType.EventStream,
        topicFilters: [filter1]
    })).toEqual(subscription_manager.AcquireSubscriptionResult.Failure);

    expect(adapter.getApiCalls()).toEqual(expectedApiCalls);

    subscriptionManager.purge();

    expect(subscriptionManager.acquireSubscription({
        operationId: 2,
        type: subscription_manager.SubscriptionType.EventStream,
        topicFilters: [filter1]
    })).toEqual(subscription_manager.AcquireSubscriptionResult.Failure);

    expect(adapter.getApiCalls()).toEqual(expectedApiCalls);
});

test('Subscription Manager - RequestResponse Acquire Subscribe with error emits SubscribeFailed', async () => {
    let adapter = new protocol_adapter_mock.MockProtocolAdapter();
    adapter.connect();

    // @ts-ignore
    let subscriptionManager = new subscription_manager.SubscriptionManager(adapter, createBasicSubscriptionManagerConfig());

    let filter1 = "a/b/+";
    let expectedApiCalls : Array<protocol_adapter_mock.ProtocolAdapterApiCall> = new Array<protocol_adapter_mock.ProtocolAdapterApiCall>(
        {
            methodName: 'subscribe',
            args: {
                topicFilter: filter1,
                timeoutInSeconds: 30
            }
        },
    );

    expect(subscriptionManager.acquireSubscription({
        operationId: 1,
        type: subscription_manager.SubscriptionType.RequestResponse,
        topicFilters: [filter1]
    })).toEqual(subscription_manager.AcquireSubscriptionResult.Subscribing);

    let subscribeFailedPromise = once(subscriptionManager, subscription_manager.SubscriptionManager.SUBSCRIBE_FAILURE);
    adapter.completeSubscribe(filter1, new CrtError("Derp"));

    let subscribeFailed = (await subscribeFailedPromise)[0];
    expect(subscribeFailed.topicFilter).toEqual(filter1);
    expect(subscribeFailed.operationId).toEqual(1);

    expect(adapter.getApiCalls()).toEqual(expectedApiCalls);
});

test('Subscription Manager - Streaming Acquire Subscribe with retryable error triggers resubscribe', async () => {
    let adapter = new protocol_adapter_mock.MockProtocolAdapter();
    adapter.connect();

    // @ts-ignore
    let subscriptionManager = new subscription_manager.SubscriptionManager(adapter, createBasicSubscriptionManagerConfig());

    let filter1 = "a/b/+";
    let expectedApiCalls : Array<protocol_adapter_mock.ProtocolAdapterApiCall> = new Array<protocol_adapter_mock.ProtocolAdapterApiCall>(
        {
            methodName: 'subscribe',
            args: {
                topicFilter: filter1,
                timeoutInSeconds: 30
            }
        },
        {
            methodName: 'subscribe',
            args: {
                topicFilter: filter1,
                timeoutInSeconds: 30
            }
        },
    );

    expect(subscriptionManager.acquireSubscription({
        operationId: 1,
        type: subscription_manager.SubscriptionType.EventStream,
        topicFilters: [filter1]
    })).toEqual(subscription_manager.AcquireSubscriptionResult.Subscribing);

    expect(adapter.getApiCalls()).toEqual(expectedApiCalls.slice(0, 1));

    adapter.completeSubscribe(filter1, new CrtError("Derp"), true);

    expect(adapter.getApiCalls()).toEqual(expectedApiCalls);
});

function getExpectedEventTypeForOfflineAcquireOnlineTest(subscriptionType: subscription_manager.SubscriptionType, shouldSubscribeSucceed: boolean) : subscription_manager.SubscriptionEventType {
    if (subscriptionType == subscription_manager.SubscriptionType.RequestResponse) {
        if (shouldSubscribeSucceed) {
            return subscription_manager.SubscriptionEventType.SubscribeSuccess;
        } else {
            return subscription_manager.SubscriptionEventType.SubscribeFailure;
        }
    } else {
        if (shouldSubscribeSucceed) {
            return subscription_manager.SubscriptionEventType.StreamingSubscriptionEstablished;
        } else {
            return subscription_manager.SubscriptionEventType.StreamingSubscriptionHalted;
        }
    }
}

async function offlineAcquireOnlineTest(subscriptionType: subscription_manager.SubscriptionType, shouldSubscribeSucceed: boolean) {
    let adapter = new protocol_adapter_mock.MockProtocolAdapter();

    // @ts-ignore
    let subscriptionManager = new subscription_manager.SubscriptionManager(adapter, createBasicSubscriptionManagerConfig());

    let filter1 = "a/b/+";
    let expectedApiCalls : Array<protocol_adapter_mock.ProtocolAdapterApiCall> = new Array<protocol_adapter_mock.ProtocolAdapterApiCall>(
        {
            methodName: 'subscribe',
            args: {
                topicFilter: filter1,
                timeoutInSeconds: 30
            }
        },
    );

    expect(subscriptionManager.acquireSubscription({
        operationId: 1,
        type: subscriptionType,
        topicFilters: [filter1]
    })).toEqual(subscription_manager.AcquireSubscriptionResult.Subscribing);

    expect(adapter.getApiCalls()).toEqual([]);

    adapter.connect();

    expect(adapter.getApiCalls()).toEqual(expectedApiCalls);

    let anyPromise = newLiftedPromise<void>();
    let events = new Array<protocol_adapter_mock.SubscriptionManagerEvent>();
    subscriptionManager.addListener(subscription_manager.SubscriptionManager.SUBSCRIBE_SUCCESS, (event) => {
        events.push({
            type: subscription_manager.SubscriptionEventType.SubscribeSuccess,
            data: event
        });
        anyPromise.resolve();
    });
    subscriptionManager.addListener(subscription_manager.SubscriptionManager.SUBSCRIBE_FAILURE, (event) => {
        events.push({
            type: subscription_manager.SubscriptionEventType.SubscribeFailure,
            data: event
        });
        anyPromise.resolve();
    });
    subscriptionManager.addListener(subscription_manager.SubscriptionManager.STREAMING_SUBSCRIPTION_ESTABLISHED, (event) => {
        events.push({
            type: subscription_manager.SubscriptionEventType.StreamingSubscriptionEstablished,
            data: event
        });
        anyPromise.resolve();
    });
    subscriptionManager.addListener(subscription_manager.SubscriptionManager.STREAMING_SUBSCRIPTION_HALTED, (event) => {
        events.push({
            type: subscription_manager.SubscriptionEventType.StreamingSubscriptionHalted,
            data: event
        });
        anyPromise.resolve();
    });

    if (shouldSubscribeSucceed) {
        adapter.completeSubscribe(filter1);
    } else {
        adapter.completeSubscribe(filter1, new CrtError("Argh"));
    }

    await anyPromise.promise;

    expect(events.length).toEqual(1);
    let event = events[0];
    expect(event.type).toEqual(getExpectedEventTypeForOfflineAcquireOnlineTest(subscriptionType, shouldSubscribeSucceed));
    expect(event.data.topicFilter).toEqual(filter1);
}

test('Subscription Manager - RequestResponse Acquire While Offline, Going online triggers Subscribe, Subscribe Success', async () => {
    await offlineAcquireOnlineTest(subscription_manager.SubscriptionType.RequestResponse, true);
});

test('Subscription Manager - RequestResponse Acquire While Offline, Going online triggers Subscribe, Subscribe Failure', async () => {
    await offlineAcquireOnlineTest(subscription_manager.SubscriptionType.RequestResponse, false);
});

test('Subscription Manager - Streaming Acquire While Offline, Going online triggers Subscribe, Subscribe Success', async () => {
    await offlineAcquireOnlineTest(subscription_manager.SubscriptionType.EventStream, true);
});

test('Subscription Manager - Streaming Acquire While Offline, Going online triggers Subscribe, Subscribe Failure', async () => {
    await offlineAcquireOnlineTest(subscription_manager.SubscriptionType.EventStream, false);
});

async function offlineAcquireReleaseOnlineTest(subscriptionType: subscription_manager.SubscriptionType) {
    let adapter = new protocol_adapter_mock.MockProtocolAdapter();

    // @ts-ignore
    let subscriptionManager = new subscription_manager.SubscriptionManager(adapter, createBasicSubscriptionManagerConfig());

    let filter1 = "a/b/+";
    let filter2 = "hello/world"
    let expectedApiCalls : Array<protocol_adapter_mock.ProtocolAdapterApiCall> = new Array<protocol_adapter_mock.ProtocolAdapterApiCall>(
        {
            methodName: 'subscribe',
            args: {
                topicFilter: filter2,
                timeoutInSeconds: 30
            }
        },
    );

    expect(subscriptionManager.acquireSubscription({
        operationId: 1,
        type: subscriptionType,
        topicFilters: [filter1]
    })).toEqual(subscription_manager.AcquireSubscriptionResult.Subscribing);

    expect(adapter.getApiCalls()).toEqual([]);

    subscriptionManager.releaseSubscription({
        operationId: 1,
        topicFilters: [filter1],
    });

    expect(adapter.getApiCalls()).toEqual([]);

    adapter.connect();

    expect(adapter.getApiCalls()).toEqual([]);

    expect(subscriptionManager.acquireSubscription({
        operationId: 2,
        type: subscriptionType,
        topicFilters: [filter2]
    })).toEqual(subscription_manager.AcquireSubscriptionResult.Subscribing);

    expect(adapter.getApiCalls()).toEqual(expectedApiCalls);
}

test('Subscription Manager - RequestResponse Acquire-Release While Offline, Going online triggers nothing', async () => {
    await offlineAcquireReleaseOnlineTest(subscription_manager.SubscriptionType.RequestResponse);
});

test('Subscription Manager - Streaming Acquire-Release While Offline, Going online triggers nothing', async () => {
    await offlineAcquireReleaseOnlineTest(subscription_manager.SubscriptionType.EventStream);
});

async function acquireOfflineReleaseAcquireOnlineTest(subscriptionType: subscription_manager.SubscriptionType) {
    let adapter = new protocol_adapter_mock.MockProtocolAdapter();
    adapter.connect();

    let config = createBasicSubscriptionManagerConfig();
    config.maxStreamingSubscriptions = 2;

    // @ts-ignore
    let subscriptionManager = new subscription_manager.SubscriptionManager(adapter, config);

    let filter1 = "a/b/+";
    let filter2 = "hello/world";
    let expectedApiCalls : Array<protocol_adapter_mock.ProtocolAdapterApiCall> = new Array<protocol_adapter_mock.ProtocolAdapterApiCall>(
        {
            methodName: 'subscribe',
            args: {
                topicFilter: filter1,
                timeoutInSeconds: 30
            }
        },
        {
            methodName: 'subscribe',
            args: {
                topicFilter: filter2,
                timeoutInSeconds: 30
            }
        },
        {
            methodName: 'unsubscribe',
            args: {
                topicFilter: filter1,
                timeoutInSeconds: 30
            }
        },
    );

    expect(subscriptionManager.acquireSubscription({
        operationId: 1,
        type: subscriptionType,
        topicFilters: [filter1]
    })).toEqual(subscription_manager.AcquireSubscriptionResult.Subscribing);

    expect(adapter.getApiCalls()).toEqual(expectedApiCalls.slice(0,1));

    adapter.completeSubscribe(filter1);

    adapter.disconnect();

    subscriptionManager.releaseSubscription({
        operationId: 1,
        topicFilters: [filter1],
    });

    expect(adapter.getApiCalls()).toEqual(expectedApiCalls.slice(0,1));

    expect(subscriptionManager.acquireSubscription({
        operationId: 2,
        type: subscriptionType,
        topicFilters: [filter2]
    })).toEqual(subscription_manager.AcquireSubscriptionResult.Subscribing);

    expect(protocol_adapter_mock.protocolAdapterApiCallSequenceContainsApiCalls(adapter.getApiCalls(), expectedApiCalls)).toBeFalsy();

    adapter.connect(true);

    expect(protocol_adapter_mock.protocolAdapterApiCallSequenceContainsApiCalls(adapter.getApiCalls(), expectedApiCalls)).toBeTruthy();
}

test('Subscription Manager - RequestResponse Release-Acquire2 while offline, Going online triggers Unsubscribe and Subscribe', async () => {
    await acquireOfflineReleaseAcquireOnlineTest(subscription_manager.SubscriptionType.RequestResponse);
});

test('Subscription Manager - Streaming Release-Acquire2 while offline, Going online triggers Unsubscribe and Subscribe', async () => {
    await acquireOfflineReleaseAcquireOnlineTest(subscription_manager.SubscriptionType.EventStream);
});

async function closeTest(subscriptionType: subscription_manager.SubscriptionType, completeSubscribe: boolean, closeWhileConnected: boolean) {
    let adapter = new protocol_adapter_mock.MockProtocolAdapter();
    adapter.connect();

    // @ts-ignore
    let subscriptionManager = new subscription_manager.SubscriptionManager(adapter, createBasicSubscriptionManagerConfig());

    let filter1 = "a/b/+";
    let expectedApiCalls : Array<protocol_adapter_mock.ProtocolAdapterApiCall> = new Array<protocol_adapter_mock.ProtocolAdapterApiCall>(
        {
            methodName: 'subscribe',
            args: {
                topicFilter: filter1,
                timeoutInSeconds: 30
            }
        },
        {
            methodName: 'unsubscribe',
            args: {
                topicFilter: filter1,
                timeoutInSeconds: 30
            }
        },
    );

    expect(subscriptionManager.acquireSubscription({
        operationId: 1,
        type: subscriptionType,
        topicFilters: [filter1]
    })).toEqual(subscription_manager.AcquireSubscriptionResult.Subscribing);

    expect(adapter.getApiCalls()).toEqual(expectedApiCalls.slice(0,1));

    if (completeSubscribe) {
        adapter.completeSubscribe(filter1);
    }

    if (!closeWhileConnected) {
        adapter.disconnect();
    }

    subscriptionManager.close();

    expect(adapter.getApiCalls()).toEqual(expectedApiCalls);
}

test('Subscription Manager - Close while request-response subscribed and online triggers unsubscribe', async () => {
    await closeTest(subscription_manager.SubscriptionType.RequestResponse, true, true);
});

test('Subscription Manager - Close while streaming subscribed and online triggers unsubscribe', async () => {
    await closeTest(subscription_manager.SubscriptionType.EventStream, true, true);
});

test('Subscription Manager - Close while request-response subscribing and online triggers unsubscribe', async () => {
    await closeTest(subscription_manager.SubscriptionType.RequestResponse, false, true);
});

test('Subscription Manager - Close while streaming subscribing and online triggers unsubscribe', async () => {
    await closeTest(subscription_manager.SubscriptionType.EventStream, false, true);
});

test('Subscription Manager - Close while request-response subscribing and offline triggers unsubscribe', async () => {
    await closeTest(subscription_manager.SubscriptionType.RequestResponse, false, false);
});

test('Subscription Manager - Close while streaming subscribing and offline triggers unsubscribe', async () => {
    await closeTest(subscription_manager.SubscriptionType.EventStream, false, false);
});

async function noSessionSubscriptionEndedTest(offlineWhileUnsubscribing: boolean) {
    let adapter = new protocol_adapter_mock.MockProtocolAdapter();
    adapter.connect();

    // @ts-ignore
    let subscriptionManager = new subscription_manager.SubscriptionManager(adapter, createBasicSubscriptionManagerConfig());

    let filter1 = "a/b/+";
    let expectedApiCalls : Array<protocol_adapter_mock.ProtocolAdapterApiCall> = new Array<protocol_adapter_mock.ProtocolAdapterApiCall>(
        {
            methodName: 'subscribe',
            args: {
                topicFilter: filter1,
                timeoutInSeconds: 30
            }
        },
        {
            methodName: 'unsubscribe',
            args: {
                topicFilter: filter1,
                timeoutInSeconds: 30
            }
        },
    );

    expect(subscriptionManager.acquireSubscription({
        operationId: 1,
        type: subscription_manager.SubscriptionType.RequestResponse,
        topicFilters: [filter1]
    })).toEqual(subscription_manager.AcquireSubscriptionResult.Subscribing);

    expect(adapter.getApiCalls()).toEqual(expectedApiCalls.slice(0,1));

    adapter.completeSubscribe(filter1);

    if (offlineWhileUnsubscribing) {
        subscriptionManager.releaseSubscription({
            operationId: 1,
            topicFilters: [filter1]
        });

        subscriptionManager.purge();

        expect(adapter.getApiCalls()).toEqual(expectedApiCalls);
    }

    let subscriptionEndedPromise = once(subscriptionManager, subscription_manager.SubscriptionManager.SUBSCRIPTION_ENDED);

    adapter.disconnect();
    adapter.connect();

    if (!offlineWhileUnsubscribing) {
        let subscriptionEnded = (await subscriptionEndedPromise)[0];
        expect(subscriptionEnded.topicFilter).toEqual(filter1);
    }

    let reaquire: subscription_manager.AcquireSubscriptionConfig = {
        operationId: 2,
        type: subscription_manager.SubscriptionType.RequestResponse,
        topicFilters : [filter1],
    };

    if (offlineWhileUnsubscribing) {
        expect(subscriptionManager.acquireSubscription(reaquire)).toEqual(subscription_manager.AcquireSubscriptionResult.Blocked);

        adapter.completeUnsubscribe(filter1, new CrtError("timeout"));
    }

    expect(subscriptionManager.acquireSubscription(reaquire)).toEqual(subscription_manager.AcquireSubscriptionResult.Subscribing);
}

test('Subscription Manager - Subscribed Session Rejoin Failure triggers subscription ended', async () => {
    await noSessionSubscriptionEndedTest(false);
});

test('Subscription Manager - Subscribed Session Rejoin Failure while unsubscribing triggers subscription ended', async () => {
    await noSessionSubscriptionEndedTest(true);
});

test('Subscription Manager - Subscribed Streaming Session Rejoin Failure triggers resubscribe and emits SubscriptionLost', async () => {
    let adapter = new protocol_adapter_mock.MockProtocolAdapter();
    adapter.connect();

    // @ts-ignore
    let subscriptionManager = new subscription_manager.SubscriptionManager(adapter, createBasicSubscriptionManagerConfig());

    let filter1 = "a/b/+";
    let expectedApiCalls : Array<protocol_adapter_mock.ProtocolAdapterApiCall> = new Array<protocol_adapter_mock.ProtocolAdapterApiCall>(
        {
            methodName: 'subscribe',
            args: {
                topicFilter: filter1,
                timeoutInSeconds: 30
            }
        },
        {
            methodName: 'subscribe',
            args: {
                topicFilter: filter1,
                timeoutInSeconds: 30
            }
        },
    );

    expect(subscriptionManager.acquireSubscription({
        operationId: 1,
        type: subscription_manager.SubscriptionType.EventStream,
        topicFilters: [filter1]
    })).toEqual(subscription_manager.AcquireSubscriptionResult.Subscribing);

    expect(adapter.getApiCalls()).toEqual(expectedApiCalls.slice(0,1));

    adapter.completeSubscribe(filter1);

    let subscriptionLostPromise = once(subscriptionManager, subscription_manager.SubscriptionManager.STREAMING_SUBSCRIPTION_LOST);

    adapter.disconnect();

    expect(adapter.getApiCalls()).toEqual(expectedApiCalls.slice(0,1));

    adapter.connect();

    let subscriptionLost = (await subscriptionLostPromise)[0];
    expect(subscriptionLost.topicFilter).toEqual(filter1);

    expect(adapter.getApiCalls()).toEqual(expectedApiCalls);
});

async function doPurgeTest(subscriptionType: subscription_manager.SubscriptionType) {
    let adapter = new protocol_adapter_mock.MockProtocolAdapter();
    adapter.connect();

    // @ts-ignore
    let subscriptionManager = new subscription_manager.SubscriptionManager(adapter, createBasicSubscriptionManagerConfig());

    let filter1 = "a/b/+";
    let expectedApiCalls : Array<protocol_adapter_mock.ProtocolAdapterApiCall> = new Array<protocol_adapter_mock.ProtocolAdapterApiCall>(
        {
            methodName: 'subscribe',
            args: {
                topicFilter: filter1,
                timeoutInSeconds: 30
            }
        },
        {
            methodName: 'unsubscribe',
            args: {
                topicFilter: filter1,
                timeoutInSeconds: 30
            }
        },
    );

    expect(subscriptionManager.acquireSubscription({
        operationId: 1,
        type: subscription_manager.SubscriptionType.EventStream,
        topicFilters: [filter1]
    })).toEqual(subscription_manager.AcquireSubscriptionResult.Subscribing);

    expect(adapter.getApiCalls()).toEqual(expectedApiCalls.slice(0,1));

    adapter.completeSubscribe(filter1);

    let subscriptionOrphanedPromise = once(subscriptionManager, subscription_manager.SubscriptionManager.SUBSCRIPTION_ORPHANED);

    subscriptionManager.releaseSubscription({
        operationId: 1,
        topicFilters: [filter1],
    });

    let subscriptionOrphaned = (await subscriptionOrphanedPromise)[0];
    expect(subscriptionOrphaned.topicFilter).toEqual(filter1);

    expect(adapter.getApiCalls()).toEqual(expectedApiCalls.slice(0,1));

    subscriptionManager.purge();

    expect(adapter.getApiCalls()).toEqual(expectedApiCalls);
}

test('Subscription Manager - Subscribed RequestResponse emits orphaned event on release', async () => {
    await doPurgeTest(subscription_manager.SubscriptionType.RequestResponse);
});

test('Subscription Manager - Subscribed Streaming emits orphaned event on release', async () => {
    await doPurgeTest(subscription_manager.SubscriptionType.EventStream);
});