/*
 * Copyright 2010-2018 Amazon.com, Inc. or its affiliates. All Rights Reserved.
 *
 * Licensed under the Apache License, Version 2.0 (the "License").
 * You may not use this file except in compliance with the License.
 * A copy of the License is located at
 *
 *  http://aws.amazon.com/apache2.0
 *
 * or in the "license" file accompanying this file. This file is distributed
 * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
 * express or implied. See the License for the specific language governing
 * permissions and limitations under the License.
 */

#include <aws/io/pipe.h>

#include <aws/io/event_loop.h>

#ifdef __GLIBC__
#    define __USE_GNU
#endif

/* TODO: move this detection to CMAKE and a config header */
#if !defined(COMPAT_MODE) && defined(__GLIBC__) && __GLIBC__ >= 2 && __GLIBC_MINOR__ >= 9
#    define HAVE_PIPE2 1
#else
#    define HAVE_PIPE2 0
#endif

#include <errno.h>
#include <fcntl.h>
#include <unistd.h>

/* This isn't defined on ancient linux distros (breaking the builds).
 * However, if this is a prebuild, we purposely build on an ancient system, but
 * we want the kernel calls to still be the same as a modern build since that's likely the target of the application
 * calling this code. Just define this if it isn't there already. GlibC and the kernel don't really care how the flag
 * gets passed as long as it does.
 */
#ifndef O_CLOEXEC
#    define O_CLOEXEC 02000000
#endif

struct read_end_impl {
    struct aws_allocator *alloc;
    struct aws_io_handle handle;
    struct aws_event_loop *event_loop;
    aws_pipe_on_readable_fn *on_readable_user_callback;
    void *on_readable_user_data;

    /* Used in handshake for detecting whether user callback resulted in read-end being cleaned up.
     * If clean_up() sees that the pointer is set, the bool it points to will get set true. */
    bool *did_user_callback_clean_up_read_end;

    bool is_subscribed;
};

struct write_request {
    struct aws_byte_cursor original_cursor;
    struct aws_byte_cursor cursor; /* tracks progress of write */
    size_t num_bytes_written;
    aws_pipe_on_write_completed_fn *user_callback;
    void *user_data;
    struct aws_linked_list_node list_node;

    /* True if the write-end is cleaned up while the user callback is being invoked */
    bool did_user_callback_clean_up_write_end;
};

struct write_end_impl {
    struct aws_allocator *alloc;
    struct aws_io_handle handle;
    struct aws_event_loop *event_loop;
    struct aws_linked_list write_list;

    /* Valid while invoking user callback on a completed write request. */
    struct write_request *currently_invoking_write_callback;

    bool is_writable;

    /* Future optimization idea: avoid an allocation on each write by keeping 1 pre-allocated write_request around
     * and re-using it whenever possible */
};

static void s_write_end_on_event(
    struct aws_event_loop *event_loop,
    struct aws_io_handle *handle,
    int events,
    void *user_data);

static int s_translate_posix_error(int err) {
    AWS_ASSERT(err);

    switch (err) {
        case EPIPE:
            return AWS_IO_BROKEN_PIPE;
        default:
            return AWS_ERROR_SYS_CALL_FAILURE;
    }
}

static int s_raise_posix_error(int err) {
    return aws_raise_error(s_translate_posix_error(err));
}

AWS_IO_API int aws_open_nonblocking_posix_pipe(int pipe_fds[2]) {
    int err;

#if HAVE_PIPE2
    err = pipe2(pipe_fds, O_NONBLOCK | O_CLOEXEC);
    if (err) {
        return s_raise_posix_error(err);
    }

    return AWS_OP_SUCCESS;
#else
    err = pipe(pipe_fds);
    if (err) {
        return s_raise_posix_error(err);
    }

    for (int i = 0; i < 2; ++i) {
        int flags = fcntl(pipe_fds[i], F_GETFL);
        if (flags == -1) {
            s_raise_posix_error(err);
            goto error;
        }

        flags |= O_NONBLOCK | O_CLOEXEC;
        if (fcntl(pipe_fds[i], F_SETFL, flags) == -1) {
            s_raise_posix_error(err);
            goto error;
        }
    }

    return AWS_OP_SUCCESS;
error:
    close(pipe_fds[0]);
    close(pipe_fds[1]);
    return AWS_OP_ERR;
#endif
}

int aws_pipe_init(
    struct aws_pipe_read_end *read_end,
    struct aws_event_loop *read_end_event_loop,
    struct aws_pipe_write_end *write_end,
    struct aws_event_loop *write_end_event_loop,
    struct aws_allocator *allocator) {

    AWS_ASSERT(read_end);
    AWS_ASSERT(read_end_event_loop);
    AWS_ASSERT(write_end);
    AWS_ASSERT(write_end_event_loop);
    AWS_ASSERT(allocator);

    AWS_ZERO_STRUCT(*read_end);
    AWS_ZERO_STRUCT(*write_end);

    struct read_end_impl *read_impl = NULL;
    struct write_end_impl *write_impl = NULL;
    int err;

    /* Open pipe */
    int pipe_fds[2];
    err = aws_open_nonblocking_posix_pipe(pipe_fds);
    if (err) {
        return AWS_OP_ERR;
    }

    /* Init read-end */
    read_impl = aws_mem_calloc(allocator, 1, sizeof(struct read_end_impl));
    if (!read_impl) {
        goto error;
    }

    read_impl->alloc = allocator;
    read_impl->handle.data.fd = pipe_fds[0];
    read_impl->event_loop = read_end_event_loop;

    /* Init write-end */
    write_impl = aws_mem_calloc(allocator, 1, sizeof(struct write_end_impl));
    if (!write_impl) {
        goto error;
    }

    write_impl->alloc = allocator;
    write_impl->handle.data.fd = pipe_fds[1];
    write_impl->event_loop = write_end_event_loop;
    write_impl->is_writable = true; /* Assume pipe is writable to start. Even if it's not, things shouldn't break */
    aws_linked_list_init(&write_impl->write_list);

    read_end->impl_data = read_impl;
    write_end->impl_data = write_impl;

    err = aws_event_loop_subscribe_to_io_events(
        write_end_event_loop, &write_impl->handle, AWS_IO_EVENT_TYPE_WRITABLE, s_write_end_on_event, write_end);
    if (err) {
        goto error;
    }

    return AWS_OP_SUCCESS;

error:
    close(pipe_fds[0]);
    close(pipe_fds[1]);

    if (read_impl) {
        aws_mem_release(allocator, read_impl);
    }

    if (write_impl) {
        aws_mem_release(allocator, write_impl);
    }

    read_end->impl_data = NULL;
    write_end->impl_data = NULL;

    return AWS_OP_ERR;
}

int aws_pipe_clean_up_read_end(struct aws_pipe_read_end *read_end) {
    struct read_end_impl *read_impl = read_end->impl_data;
    if (!read_impl) {
        return aws_raise_error(AWS_IO_BROKEN_PIPE);
    }

    if (!aws_event_loop_thread_is_callers_thread(read_impl->event_loop)) {
        return aws_raise_error(AWS_ERROR_IO_EVENT_LOOP_THREAD_ONLY);
    }

    if (read_impl->is_subscribed) {
        int err = aws_pipe_unsubscribe_from_readable_events(read_end);
        if (err) {
            return AWS_OP_ERR;
        }
    }

    /* If the event-handler is invoking a user callback, let it know that the read-end was cleaned up */
    if (read_impl->did_user_callback_clean_up_read_end) {
        *read_impl->did_user_callback_clean_up_read_end = true;
    }

    close(read_impl->handle.data.fd);

    aws_mem_release(read_impl->alloc, read_impl);
    AWS_ZERO_STRUCT(*read_end);
    return AWS_OP_SUCCESS;
}

struct aws_event_loop *aws_pipe_get_read_end_event_loop(const struct aws_pipe_read_end *read_end) {
    const struct read_end_impl *read_impl = read_end->impl_data;
    if (!read_impl) {
        aws_raise_error(AWS_IO_BROKEN_PIPE);
        return NULL;
    }

    return read_impl->event_loop;
}

struct aws_event_loop *aws_pipe_get_write_end_event_loop(const struct aws_pipe_write_end *write_end) {
    const struct write_end_impl *write_impl = write_end->impl_data;
    if (!write_impl) {
        aws_raise_error(AWS_IO_BROKEN_PIPE);
        return NULL;
    }

    return write_impl->event_loop;
}

int aws_pipe_read(struct aws_pipe_read_end *read_end, struct aws_byte_buf *dst_buffer, size_t *num_bytes_read) {
    AWS_ASSERT(dst_buffer && dst_buffer->buffer);

    struct read_end_impl *read_impl = read_end->impl_data;
    if (!read_impl) {
        return aws_raise_error(AWS_IO_BROKEN_PIPE);
    }

    if (num_bytes_read) {
        *num_bytes_read = 0;
    }

    size_t num_bytes_to_read = dst_buffer->capacity - dst_buffer->len;

    ssize_t read_val = read(read_impl->handle.data.fd, dst_buffer->buffer + dst_buffer->len, num_bytes_to_read);

    if (read_val < 0) {
        if (errno == EAGAIN || errno == EWOULDBLOCK) {
            return aws_raise_error(AWS_IO_READ_WOULD_BLOCK);
        }
        return s_raise_posix_error(errno);
    }

    /* Success */
    dst_buffer->len += read_val;

    if (num_bytes_read) {
        *num_bytes_read = read_val;
    }

    return AWS_OP_SUCCESS;
}

static void s_read_end_on_event(
    struct aws_event_loop *event_loop,
    struct aws_io_handle *handle,
    int events,
    void *user_data) {

    (void)event_loop;
    (void)handle;

    /* Note that it should be impossible for this to run after read-end has been unsubscribed or cleaned up */
    struct aws_pipe_read_end *read_end = user_data;
    struct read_end_impl *read_impl = read_end->impl_data;
    AWS_ASSERT(read_impl);
    AWS_ASSERT(read_impl->event_loop == event_loop);
    AWS_ASSERT(&read_impl->handle == handle);
    AWS_ASSERT(read_impl->is_subscribed);
    AWS_ASSERT(events != 0);
    AWS_ASSERT(read_impl->did_user_callback_clean_up_read_end == NULL);

    /* Set up handshake, so we can be informed if the read-end is cleaned up while invoking a user callback */
    bool did_user_callback_clean_up_read_end = false;
    read_impl->did_user_callback_clean_up_read_end = &did_user_callback_clean_up_read_end;

    /* If readable event received, tell user to try and read, even if "error" events have also occurred. */
    if (events & AWS_IO_EVENT_TYPE_READABLE) {
        read_impl->on_readable_user_callback(read_end, AWS_ERROR_SUCCESS, read_impl->on_readable_user_data);

        if (did_user_callback_clean_up_read_end) {
            return;
        }

        events &= ~AWS_IO_EVENT_TYPE_READABLE;
    }

    if (events) {
        /* Check that user didn't unsubscribe in the previous callback */
        if (read_impl->is_subscribed) {
            read_impl->on_readable_user_callback(read_end, AWS_IO_BROKEN_PIPE, read_impl->on_readable_user_data);

            if (did_user_callback_clean_up_read_end) {
                return;
            }
        }
    }

    read_impl->did_user_callback_clean_up_read_end = NULL;
}

int aws_pipe_subscribe_to_readable_events(
    struct aws_pipe_read_end *read_end,
    aws_pipe_on_readable_fn *on_readable,
    void *user_data) {

    AWS_ASSERT(on_readable);

    struct read_end_impl *read_impl = read_end->impl_data;
    if (!read_impl) {
        return aws_raise_error(AWS_IO_BROKEN_PIPE);
    }

    if (!aws_event_loop_thread_is_callers_thread(read_impl->event_loop)) {
        return aws_raise_error(AWS_ERROR_IO_EVENT_LOOP_THREAD_ONLY);
    }

    if (read_impl->is_subscribed) {
        return aws_raise_error(AWS_ERROR_IO_ALREADY_SUBSCRIBED);
    }

    read_impl->is_subscribed = true;
    read_impl->on_readable_user_callback = on_readable;
    read_impl->on_readable_user_data = user_data;

    int err = aws_event_loop_subscribe_to_io_events(
        read_impl->event_loop, &read_impl->handle, AWS_IO_EVENT_TYPE_READABLE, s_read_end_on_event, read_end);
    if (err) {
        read_impl->is_subscribed = false;
        read_impl->on_readable_user_callback = NULL;
        read_impl->on_readable_user_data = NULL;

        return AWS_OP_ERR;
    }

    return AWS_OP_SUCCESS;
}

int aws_pipe_unsubscribe_from_readable_events(struct aws_pipe_read_end *read_end) {
    struct read_end_impl *read_impl = read_end->impl_data;
    if (!read_impl) {
        return aws_raise_error(AWS_IO_BROKEN_PIPE);
    }

    if (!aws_event_loop_thread_is_callers_thread(read_impl->event_loop)) {
        return aws_raise_error(AWS_ERROR_IO_EVENT_LOOP_THREAD_ONLY);
    }

    if (!read_impl->is_subscribed) {
        return aws_raise_error(AWS_ERROR_IO_NOT_SUBSCRIBED);
    }

    int err = aws_event_loop_unsubscribe_from_io_events(read_impl->event_loop, &read_impl->handle);
    if (err) {
        return AWS_OP_ERR;
    }

    read_impl->is_subscribed = false;
    read_impl->on_readable_user_callback = NULL;
    read_impl->on_readable_user_data = NULL;

    return AWS_OP_SUCCESS;
}

/* Pop front write request, invoke its callback, and delete it.
 * Returns whether the callback resulted in the write-end getting cleaned up */
static bool s_write_end_complete_front_write_request(struct aws_pipe_write_end *write_end, int error_code) {
    struct write_end_impl *write_impl = write_end->impl_data;

    AWS_ASSERT(!aws_linked_list_empty(&write_impl->write_list));
    struct aws_linked_list_node *node = aws_linked_list_pop_front(&write_impl->write_list);
    struct write_request *request = AWS_CONTAINER_OF(node, struct write_request, list_node);

    struct aws_allocator *alloc = write_impl->alloc;

    /* Let the write-end know that a callback is in process, so the write-end can inform the callback
     * whether it resulted in clean_up() being called. */
    bool write_end_cleaned_up_during_callback = false;
    struct write_request *prev_invoking_request = write_impl->currently_invoking_write_callback;
    write_impl->currently_invoking_write_callback = request;

    if (request->user_callback) {
        request->user_callback(write_end, error_code, request->original_cursor, request->user_data);
        write_end_cleaned_up_during_callback = request->did_user_callback_clean_up_write_end;
    }

    if (!write_end_cleaned_up_during_callback) {
        write_impl->currently_invoking_write_callback = prev_invoking_request;
    }

    aws_mem_release(alloc, request);

    return write_end_cleaned_up_during_callback;
}

/* Process write requests as long as the pipe remains writable */
static void s_write_end_process_requests(struct aws_pipe_write_end *write_end) {
    struct write_end_impl *write_impl = write_end->impl_data;
    AWS_ASSERT(write_impl);

    while (!aws_linked_list_empty(&write_impl->write_list)) {
        struct aws_linked_list_node *node = aws_linked_list_front(&write_impl->write_list);
        struct write_request *request = AWS_CONTAINER_OF(node, struct write_request, list_node);

        int completed_error_code = AWS_ERROR_SUCCESS;

        if (request->cursor.len > 0) {
            ssize_t write_val = write(write_impl->handle.data.fd, request->cursor.ptr, request->cursor.len);

            if (write_val < 0) {
                if (errno == EAGAIN || errno == EWOULDBLOCK) {
                    /* The pipe is no longer writable. Bail out */
                    write_impl->is_writable = false;
                    return;
                }

                /* A non-recoverable error occurred during this write */
                completed_error_code = s_translate_posix_error(errno);

            } else {
                aws_byte_cursor_advance(&request->cursor, write_val);

                if (request->cursor.len > 0) {
                    /* There was a partial write, loop again to try and write the rest. */
                    continue;
                }
            }
        }

        /* If we got this far in the loop, then the write request is complete.
         * Note that the callback may result in the pipe being cleaned up. */
        bool write_end_cleaned_up = s_write_end_complete_front_write_request(write_end, completed_error_code);
        if (write_end_cleaned_up) {
            /* Bail out! Any remaining requests were canceled during clean_up() */
            return;
        }
    }
}

/* Handle events on the write-end's file handle */
static void s_write_end_on_event(
    struct aws_event_loop *event_loop,
    struct aws_io_handle *handle,
    int events,
    void *user_data) {

    (void)event_loop;
    (void)handle;

    /* Note that it should be impossible for this to run after write-end has been unsubscribed or cleaned up */
    struct aws_pipe_write_end *write_end = user_data;
    struct write_end_impl *write_impl = write_end->impl_data;
    AWS_ASSERT(write_impl);
    AWS_ASSERT(write_impl->event_loop == event_loop);
    AWS_ASSERT(&write_impl->handle == handle);

    /* Only care about the writable event. */
    if ((events & AWS_IO_EVENT_TYPE_WRITABLE) == 0) {
        return;
    }

    write_impl->is_writable = true;

    s_write_end_process_requests(write_end);
}

int aws_pipe_write(
    struct aws_pipe_write_end *write_end,
    struct aws_byte_cursor src_buffer,
    aws_pipe_on_write_completed_fn *on_completed,
    void *user_data) {

    AWS_ASSERT(src_buffer.ptr);

    struct write_end_impl *write_impl = write_end->impl_data;
    if (!write_impl) {
        return aws_raise_error(AWS_IO_BROKEN_PIPE);
    }

    if (!aws_event_loop_thread_is_callers_thread(write_impl->event_loop)) {
        return aws_raise_error(AWS_ERROR_IO_EVENT_LOOP_THREAD_ONLY);
    }

    struct write_request *request = aws_mem_calloc(write_impl->alloc, 1, sizeof(struct write_request));
    if (!request) {
        return AWS_OP_ERR;
    }

    request->original_cursor = src_buffer;
    request->cursor = src_buffer;
    request->user_callback = on_completed;
    request->user_data = user_data;

    aws_linked_list_push_back(&write_impl->write_list, &request->list_node);

    /* If the pipe is writable, process the request (unless pipe is already in the middle of processing, which could
     * happen if a this aws_pipe_write() call was made by another write's completion callback */
    if (write_impl->is_writable && !write_impl->currently_invoking_write_callback) {
        s_write_end_process_requests(write_end);
    }

    return AWS_OP_SUCCESS;
}

int aws_pipe_clean_up_write_end(struct aws_pipe_write_end *write_end) {
    struct write_end_impl *write_impl = write_end->impl_data;
    if (!write_impl) {
        return aws_raise_error(AWS_IO_BROKEN_PIPE);
    }

    if (!aws_event_loop_thread_is_callers_thread(write_impl->event_loop)) {
        return aws_raise_error(AWS_ERROR_IO_EVENT_LOOP_THREAD_ONLY);
    }

    int err = aws_event_loop_unsubscribe_from_io_events(write_impl->event_loop, &write_impl->handle);
    if (err) {
        return AWS_OP_ERR;
    }

    close(write_impl->handle.data.fd);

    /* Zero out write-end before invoking user callbacks so that it won't work anymore with public functions. */
    AWS_ZERO_STRUCT(*write_end);

    /* If a request callback is currently being invoked, let it know that the write-end was cleaned up */
    if (write_impl->currently_invoking_write_callback) {
        write_impl->currently_invoking_write_callback->did_user_callback_clean_up_write_end = true;
    }

    /* Force any outstanding write requests to complete with an error status. */
    while (!aws_linked_list_empty(&write_impl->write_list)) {
        struct aws_linked_list_node *node = aws_linked_list_pop_front(&write_impl->write_list);
        struct write_request *request = AWS_CONTAINER_OF(node, struct write_request, list_node);
        if (request->user_callback) {
            request->user_callback(NULL, AWS_IO_BROKEN_PIPE, request->original_cursor, request->user_data);
        }
        aws_mem_release(write_impl->alloc, request);
    }

    aws_mem_release(write_impl->alloc, write_impl);
    return AWS_OP_SUCCESS;
}
