//-----------------------------------------------------------------------------
// Copyright (c) 2019, Oracle and/or its affiliates. All rights reserved.
// This program is free software: you can modify it and/or redistribute it
// under the terms of:
//
// (i)  the Universal Permissive License v 1.0 or at your option, any
//      later version (http://oss.oracle.com/licenses/upl); and/or
//
// (ii) the Apache License v 2.0. (http://www.apache.org/licenses/LICENSE-2.0)
//-----------------------------------------------------------------------------

//-----------------------------------------------------------------------------
// dpiQueue.c
//   Implementation of AQ queues.
//-----------------------------------------------------------------------------

#include "dpiImpl.h"

// forward declarations of internal functions only used in this file
static int dpiQueue__allocateBuffer(dpiQueue *queue, uint32_t numElements,
        dpiError *error);
static int dpiQueue__deq(dpiQueue *queue, uint32_t *numProps,
        dpiMsgProps **props, dpiError *error);
static void dpiQueue__freeBuffer(dpiQueue *queue, dpiError *error);
static int dpiQueue__getPayloadTDO(dpiQueue *queue, void **tdo,
        dpiError *error);


//-----------------------------------------------------------------------------
// dpiQueue__allocate() [INTERNAL]
//   Allocate and initialize a queue.
//-----------------------------------------------------------------------------
int dpiQueue__allocate(dpiConn *conn, const char *name, uint32_t nameLength,
        dpiObjectType *payloadType, dpiQueue **queue, dpiError *error)
{
    dpiQueue *tempQueue;
    char *buffer;

    // allocate handle; store reference to the connection that created it
    if (dpiGen__allocate(DPI_HTYPE_QUEUE, conn->env, (void**) &tempQueue,
            error) < 0)
        return DPI_FAILURE;
    dpiGen__setRefCount(conn, error, 1);
    tempQueue->conn = conn;

    // store payload type, which is either an object type or NULL (meaning that
    // RAW payloads are being enqueued and dequeued)
    if (payloadType) {
        dpiGen__setRefCount(payloadType, error, 1);
        tempQueue->payloadType = payloadType;
    }

    // allocate space for the name of the queue; OCI requires a NULL-terminated
    // string so allocate enough space to store the NULL terminator; UTF-16
    // encoded strings are not currently supported
    if (dpiUtils__allocateMemory(1, nameLength + 1, 0, "queue name",
            (void**) &buffer, error) < 0) {
        dpiQueue__free(tempQueue, error);
        return DPI_FAILURE;
    }
    memcpy(buffer, name, nameLength);
    buffer[nameLength] = '\0';
    tempQueue->name = buffer;

    *queue = tempQueue;
    return DPI_SUCCESS;
}


//-----------------------------------------------------------------------------
// dpiQueue__allocateBuffer() [INTERNAL]
//   Ensure there is enough space in the buffer for the specified number of
// elements.
//-----------------------------------------------------------------------------
static int dpiQueue__allocateBuffer(dpiQueue *queue, uint32_t numElements,
        dpiError *error)
{
    dpiQueue__freeBuffer(queue, error);
    queue->buffer.numElements = numElements;
    if (dpiUtils__allocateMemory(numElements, sizeof(dpiMsgProps*), 1,
            "allocate msg props array", (void**) &queue->buffer.props,
            error) < 0)
        return DPI_FAILURE;
    if (dpiUtils__allocateMemory(numElements, sizeof(void*), 1,
            "allocate OCI handles array", (void**) &queue->buffer.handles,
            error) < 0)
        return DPI_FAILURE;
    if (dpiUtils__allocateMemory(numElements, sizeof(void*), 1,
            "allocate OCI instances array", (void**) &queue->buffer.instances,
            error) < 0)
        return DPI_FAILURE;
    if (dpiUtils__allocateMemory(numElements, sizeof(void*), 1,
            "allocate OCI indicators array",
            (void**) &queue->buffer.indicators, error) < 0)
        return DPI_FAILURE;
    if (!queue->payloadType) {
        if (dpiUtils__allocateMemory(numElements, sizeof(int16_t), 1,
                "allocate OCI raw indicators array",
                (void**) &queue->buffer.rawIndicators, error) < 0)
            return DPI_FAILURE;
    }
    if (dpiUtils__allocateMemory(numElements, sizeof(void*), 1,
            "allocate message ids array", (void**) &queue->buffer.msgIds,
            error) < 0)
        return DPI_FAILURE;

    return DPI_SUCCESS;
}


//-----------------------------------------------------------------------------
// dpiQueue__check() [INTERNAL]
//   Determine if the queue is available to use.
//-----------------------------------------------------------------------------
static int dpiQueue__check(dpiQueue *queue, const char *fnName,
        dpiError *error)
{
    if (dpiGen__startPublicFn(queue, DPI_HTYPE_QUEUE, fnName, error) < 0)
        return DPI_FAILURE;
    if (!queue->conn->handle || queue->conn->closing)
        return dpiError__set(error, "check connection", DPI_ERR_NOT_CONNECTED);
    return DPI_SUCCESS;
}


//-----------------------------------------------------------------------------
// dpiQueue__createDeqOptions() [INTERNAL]
//   Create the dequeue options object that will be used for performing
// dequeues against the queue.
//-----------------------------------------------------------------------------
static int dpiQueue__createDeqOptions(dpiQueue *queue, dpiError *error)
{
    dpiDeqOptions *tempOptions;

    if (dpiGen__allocate(DPI_HTYPE_DEQ_OPTIONS, queue->env,
            (void**) &tempOptions, error) < 0)
        return DPI_FAILURE;
    if (dpiDeqOptions__create(tempOptions, queue->conn, error) < 0) {
        dpiDeqOptions__free(tempOptions, error);
        return DPI_FAILURE;
    }

    queue->deqOptions = tempOptions;
    return DPI_SUCCESS;
}


//-----------------------------------------------------------------------------
// dpiQueue__createEnqOptions() [INTERNAL]
//   Create the dequeue options object that will be used for performing
// dequeues against the queue.
//-----------------------------------------------------------------------------
static int dpiQueue__createEnqOptions(dpiQueue *queue, dpiError *error)
{
    dpiEnqOptions *tempOptions;

    if (dpiGen__allocate(DPI_HTYPE_ENQ_OPTIONS, queue->env,
            (void**) &tempOptions, error) < 0)
        return DPI_FAILURE;
    if (dpiEnqOptions__create(tempOptions, queue->conn, error) < 0) {
        dpiEnqOptions__free(tempOptions, error);
        return DPI_FAILURE;
    }

    queue->enqOptions = tempOptions;
    return DPI_SUCCESS;
}


//-----------------------------------------------------------------------------
// dpiQueue__deq() [INTERNAL]
//   Perform a dequeue of up to the specified number of properties.
//-----------------------------------------------------------------------------
static int dpiQueue__deq(dpiQueue *queue, uint32_t *numProps,
        dpiMsgProps **props, dpiError *error)
{
    dpiMsgProps *prop;
    void *payloadTDO;
    uint32_t i;
    int status;

    // create dequeue options, if necessary
    if (!queue->deqOptions && dpiQueue__createDeqOptions(queue, error) < 0)
        return DPI_FAILURE;

    // allocate buffer, if necessary
    if (queue->buffer.numElements < *numProps &&
            dpiQueue__allocateBuffer(queue, *numProps, error) < 0)
        return DPI_FAILURE;

    // populate buffer
    for (i = 0; i < *numProps; i++) {
        prop = queue->buffer.props[i];

        // create new message properties, if applicable
        if (!prop) {
            if (dpiMsgProps__allocate(queue->conn, &prop, error) < 0)
                return DPI_FAILURE;
            queue->buffer.props[i] = prop;
        }

        // create payload object, if applicable
        if (queue->payloadType && !prop->payloadObj &&
                dpiObject__allocate(queue->payloadType, NULL, NULL, NULL,
                &prop->payloadObj, error) < 0)
            return DPI_FAILURE;

        // set OCI arrays
        queue->buffer.handles[i] = prop->handle;
        if (queue->payloadType) {
            queue->buffer.instances[i] = prop->payloadObj->instance;
            queue->buffer.indicators[i] = prop->payloadObj->indicator;
        } else {
            queue->buffer.instances[i] = prop->payloadRaw;
            queue->buffer.indicators[i] = &queue->buffer.rawIndicators[i];
        }
        queue->buffer.msgIds[i] = prop->msgIdRaw;

    }

    // perform dequeue
    if (dpiQueue__getPayloadTDO(queue, &payloadTDO, error) < 0)
        return DPI_FAILURE;
    if (*numProps == 1) {
        status = dpiOci__aqDeq(queue->conn, queue->name,
                queue->deqOptions->handle, queue->buffer.handles[0],
                payloadTDO, queue->buffer.instances, queue->buffer.indicators,
                queue->buffer.msgIds, error);
        if (status < 0)
            *numProps = 0;
    } else {
        status = dpiOci__aqDeqArray(queue->conn, queue->name,
                queue->deqOptions->handle, numProps, queue->buffer.handles,
                payloadTDO, queue->buffer.instances, queue->buffer.indicators,
                queue->buffer.msgIds, error);
    }
    if (status < 0) {
        if (error->buffer->code != 25228)
            return DPI_FAILURE;
        error->buffer->offset = *numProps;
    }

    // transfer message properties to destination array
    for (i = 0; i < *numProps; i++) {
        props[i] = queue->buffer.props[i];
        queue->buffer.props[i] = NULL;
        if (!queue->payloadType)
            props[i]->payloadRaw = queue->buffer.instances[i];
        props[i]->msgIdRaw = queue->buffer.msgIds[i];
    }

    return DPI_SUCCESS;
}


//-----------------------------------------------------------------------------
// dpiQueue__enq() [INTERNAL]
//   Perform an enqueue of the specified properties.
//-----------------------------------------------------------------------------
static int dpiQueue__enq(dpiQueue *queue, uint32_t numProps,
        dpiMsgProps **props, dpiError *error)
{
    void *payloadTDO;
    uint32_t i;

    // if no messages are being enqueued, nothing to do!
    if (numProps == 0)
        return DPI_SUCCESS;

    // create enqueue options, if necessary
    if (!queue->enqOptions && dpiQueue__createEnqOptions(queue, error) < 0)
        return DPI_FAILURE;

    // allocate buffer, if necessary
    if (queue->buffer.numElements < numProps &&
            dpiQueue__allocateBuffer(queue, numProps, error) < 0)
        return DPI_FAILURE;

    // populate buffer
    for (i = 0; i < numProps; i++) {

        // perform checks
        if (!props[i]->payloadObj && !props[i]->payloadRaw)
            return dpiError__set(error, "check payload",
                    DPI_ERR_QUEUE_NO_PAYLOAD);
        if ((queue->payloadType && !props[i]->payloadObj) ||
                (!queue->payloadType && props[i]->payloadObj))
            return dpiError__set(error, "check payload",
                    DPI_ERR_QUEUE_WRONG_PAYLOAD_TYPE);
        if (queue->payloadType && props[i]->payloadObj &&
                queue->payloadType->tdo != props[i]->payloadObj->type->tdo)
            return dpiError__set(error, "check payload",
                    DPI_ERR_WRONG_TYPE,
                    props[i]->payloadObj->type->schemaLength,
                    props[i]->payloadObj->type->schema,
                    props[i]->payloadObj->type->nameLength,
                    props[i]->payloadObj->type->name,
                    queue->payloadType->schemaLength,
                    queue->payloadType->schema,
                    queue->payloadType->nameLength,
                    queue->payloadType->name);

        // set OCI arrays
        queue->buffer.handles[i] = props[i]->handle;
        if (queue->payloadType) {
            queue->buffer.instances[i] = props[i]->payloadObj->instance;
            queue->buffer.indicators[i] = props[i]->payloadObj->indicator;
        } else {
            queue->buffer.instances[i] = props[i]->payloadRaw;
            queue->buffer.indicators[i] = &queue->buffer.rawIndicators[i];
        }
        queue->buffer.msgIds[i] = props[i]->msgIdRaw;

    }

    // perform enqueue
    if (dpiQueue__getPayloadTDO(queue, &payloadTDO, error) < 0)
        return DPI_FAILURE;
    if (numProps == 1) {
        if (dpiOci__aqEnq(queue->conn, queue->name, queue->enqOptions->handle,
                queue->buffer.handles[0], payloadTDO, queue->buffer.instances,
                queue->buffer.indicators, queue->buffer.msgIds, error) < 0)
            return DPI_FAILURE;
    } else {
        if (dpiOci__aqEnqArray(queue->conn, queue->name,
                queue->enqOptions->handle, &numProps, queue->buffer.handles,
                payloadTDO, queue->buffer.instances, queue->buffer.indicators,
                queue->buffer.msgIds, error) < 0) {
            error->buffer->offset = numProps;
            return DPI_FAILURE;
        }
    }

    // transfer message ids back to message properties
    for (i = 0; i < numProps; i++)
        props[i]->msgIdRaw = queue->buffer.msgIds[i];

    return DPI_SUCCESS;
}


//-----------------------------------------------------------------------------
// dpiQueue__free() [INTERNAL]
//   Free the memory for a queue.
//-----------------------------------------------------------------------------
void dpiQueue__free(dpiQueue *queue, dpiError *error)
{
    if (queue->conn) {
        dpiGen__setRefCount(queue->conn, error, -1);
        queue->conn = NULL;
    }
    if (queue->payloadType) {
        dpiGen__setRefCount(queue->payloadType, error, -1);
        queue->payloadType = NULL;
    }
    if (queue->name) {
        dpiUtils__freeMemory((void*) queue->name);
        queue->name = NULL;
    }
    if (queue->deqOptions) {
        dpiGen__setRefCount(queue->deqOptions, error, -1);
        queue->deqOptions = NULL;
    }
    if (queue->enqOptions) {
        dpiGen__setRefCount(queue->enqOptions, error, -1);
        queue->enqOptions = NULL;
    }
    dpiQueue__freeBuffer(queue, error);
    dpiUtils__freeMemory(queue);
}


//-----------------------------------------------------------------------------
// dpiQueue__freeBuffer() [INTERNAL]
//   Free the memory areas in the queue buffer.
//-----------------------------------------------------------------------------
static void dpiQueue__freeBuffer(dpiQueue *queue, dpiError *error)
{
    dpiQueueBuffer *buffer = &queue->buffer;
    uint32_t i;

    if (buffer->props) {
        for (i = 0; i < buffer->numElements; i++) {
            if (buffer->props[i]) {
                dpiGen__setRefCount(buffer->props[i], error, -1);
                buffer->props[i] = NULL;
            }
        }
        dpiUtils__freeMemory(buffer->props);
        buffer->props = NULL;
    }
    if (buffer->handles) {
        dpiUtils__freeMemory(buffer->handles);
        buffer->handles = NULL;
    }
    if (buffer->instances) {
        dpiUtils__freeMemory(buffer->instances);
        buffer->instances = NULL;
    }
    if (buffer->indicators) {
        dpiUtils__freeMemory(buffer->indicators);
        buffer->indicators = NULL;
    }
    if (buffer->rawIndicators) {
        dpiUtils__freeMemory(buffer->rawIndicators);
        buffer->rawIndicators = NULL;
    }
    if (buffer->msgIds) {
        dpiUtils__freeMemory(buffer->msgIds);
        buffer->msgIds = NULL;
    }
}


//-----------------------------------------------------------------------------
// dpiQueue__getPayloadTDO() [INTERNAL]
//   Acquire the TDO to use for the payload. This will either be the TDO of the
// object type (if one was specified when the queue was created) or it will be
// the RAW TDO cached on the connection.
//-----------------------------------------------------------------------------
static int dpiQueue__getPayloadTDO(dpiQueue *queue, void **tdo,
        dpiError *error)
{
    if (queue->payloadType) {
        *tdo = queue->payloadType->tdo;
    } else {
        if (dpiConn__getRawTDO(queue->conn, error) < 0)
            return DPI_FAILURE;
        *tdo = queue->conn->rawTDO;
    }
    return DPI_SUCCESS;
}


//-----------------------------------------------------------------------------
// dpiQueue_addRef() [PUBLIC]
//   Add a reference to the queue.
//-----------------------------------------------------------------------------
int dpiQueue_addRef(dpiQueue *queue)
{
    return dpiGen__addRef(queue, DPI_HTYPE_QUEUE, __func__);
}


//-----------------------------------------------------------------------------
// dpiQueue_deqMany() [PUBLIC]
//   Dequeue multiple messages from the queue.
//-----------------------------------------------------------------------------
int dpiQueue_deqMany(dpiQueue *queue, uint32_t *numProps, dpiMsgProps **props)
{
    dpiError error;
    int status;

    if (dpiQueue__check(queue, __func__, &error) < 0)
        return dpiGen__endPublicFn(queue, DPI_FAILURE, &error);
    DPI_CHECK_PTR_NOT_NULL(queue, numProps)
    DPI_CHECK_PTR_NOT_NULL(queue, props)
    status = dpiQueue__deq(queue, numProps, props, &error);
    return dpiGen__endPublicFn(queue, status, &error);
}


//-----------------------------------------------------------------------------
// dpiQueue_deqOne() [PUBLIC]
//   Dequeue a single message from the queue.
//-----------------------------------------------------------------------------
int dpiQueue_deqOne(dpiQueue *queue, dpiMsgProps **props)
{
    uint32_t numProps = 1;
    dpiError error;

    if (dpiQueue__check(queue, __func__, &error) < 0)
        return dpiGen__endPublicFn(queue, DPI_FAILURE, &error);
    DPI_CHECK_PTR_NOT_NULL(queue, props)
    if (dpiQueue__deq(queue, &numProps, props, &error) < 0)
        return dpiGen__endPublicFn(queue, DPI_FAILURE, &error);
    if (numProps == 0)
        *props = NULL;
    return dpiGen__endPublicFn(queue, DPI_SUCCESS, &error);
}


//-----------------------------------------------------------------------------
// dpiQueue_enqMany() [PUBLIC]
//   Enqueue multiple message to the queue.
//-----------------------------------------------------------------------------
int dpiQueue_enqMany(dpiQueue *queue, uint32_t numProps, dpiMsgProps **props)
{
    dpiError error;
    uint32_t i;
    int status;

    // validate parameters
    if (dpiQueue__check(queue, __func__, &error) < 0)
        return dpiGen__endPublicFn(queue, DPI_FAILURE, &error);
    DPI_CHECK_PTR_NOT_NULL(queue, props)
    for (i = 0; i < numProps; i++) {
        if (dpiGen__checkHandle(props[i], DPI_HTYPE_MSG_PROPS,
                "check message properties", &error) < 0)
            return dpiGen__endPublicFn(queue, DPI_FAILURE, &error);
    }
    status = dpiQueue__enq(queue, numProps, props, &error);
    return dpiGen__endPublicFn(queue, status, &error);
}


//-----------------------------------------------------------------------------
// dpiQueue_enqOne() [PUBLIC]
//   Enqueue a single message to the queue.
//-----------------------------------------------------------------------------
int dpiQueue_enqOne(dpiQueue *queue, dpiMsgProps *props)
{
    dpiError error;
    int status;

    if (dpiQueue__check(queue, __func__, &error) < 0)
        return dpiGen__endPublicFn(queue, DPI_FAILURE, &error);
    if (dpiGen__checkHandle(props, DPI_HTYPE_MSG_PROPS,
            "check message properties", &error) < 0)
        return dpiGen__endPublicFn(queue, DPI_FAILURE, &error);
    status = dpiQueue__enq(queue, 1, &props, &error);
    return dpiGen__endPublicFn(queue, status, &error);
}


//-----------------------------------------------------------------------------
// dpiQueue_getDeqOptions() [PUBLIC]
//   Return the dequeue options associated with the queue. If no dequeue
// options are currently associated with the queue, create them first.
//-----------------------------------------------------------------------------
int dpiQueue_getDeqOptions(dpiQueue *queue, dpiDeqOptions **options)
{
    dpiError error;

    if (dpiGen__startPublicFn(queue, DPI_HTYPE_QUEUE, __func__, &error) < 0)
        return DPI_FAILURE;
    DPI_CHECK_PTR_NOT_NULL(queue, options)
    if (!queue->deqOptions && dpiQueue__createDeqOptions(queue, &error) < 0)
        return dpiGen__endPublicFn(queue, DPI_FAILURE, &error);
    *options = queue->deqOptions;
    return dpiGen__endPublicFn(queue, DPI_SUCCESS, &error);
}


//-----------------------------------------------------------------------------
// dpiQueue_getEnqOptions() [PUBLIC]
//   Return the enqueue options associated with the queue. If no enqueue
// options are currently associated with the queue, create them first.
//-----------------------------------------------------------------------------
int dpiQueue_getEnqOptions(dpiQueue *queue, dpiEnqOptions **options)
{
    dpiError error;

    if (dpiGen__startPublicFn(queue, DPI_HTYPE_QUEUE, __func__, &error) < 0)
        return DPI_FAILURE;
    DPI_CHECK_PTR_NOT_NULL(queue, options)
    if (!queue->enqOptions && dpiQueue__createEnqOptions(queue, &error) < 0)
        return dpiGen__endPublicFn(queue, DPI_FAILURE, &error);
    *options = queue->enqOptions;
    return dpiGen__endPublicFn(queue, DPI_SUCCESS, &error);
}


//-----------------------------------------------------------------------------
// dpiQueue_release() [PUBLIC]
//   Release a reference to the queue.
//-----------------------------------------------------------------------------
int dpiQueue_release(dpiQueue *queue)
{
    return dpiGen__release(queue, DPI_HTYPE_QUEUE, __func__);
}
