/*
 * Copyright (c) 2011 Justin Tulloss
 * Copyright (c) 2010 Justin Tulloss
 *
 * Permission is hereby granted, free of charge, to any person obtaining a copy
 * of this software and associated documentation files (the "Software"), to deal
 * in the Software without restriction, including without limitation the rights
 * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
 * copies of the Software, and to permit persons to whom the Software is
 * furnished to do so, subject to the following conditions:
 *
 * The above copyright notice and this permission notice shall be included in
 * all copies or substantial portions of the Software.
 *
 * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
 * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
 * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
 * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
 * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
 * THE SOFTWARE.
 */

#include <v8.h>
#include <node.h>
#include <node_version.h>
#include <node_buffer.h>
#include <zmq.h>
#if (ZMQ_VERSION < 40200)
#include <zmq_utils.h>
#endif
#include <assert.h>
#include <stddef.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <errno.h>
#include <stdexcept>
#include <set>
#include "nan.h"

#ifdef _WIN32
# define snprintf _snprintf_s
#endif

#define ZMQ_CAN_DISCONNECT (ZMQ_VERSION_MAJOR == 3 && ZMQ_VERSION_MINOR >= 2) || ZMQ_VERSION_MAJOR > 3
#define ZMQ_CAN_UNBIND (ZMQ_VERSION_MAJOR == 3 && ZMQ_VERSION_MINOR >= 2) || ZMQ_VERSION_MAJOR > 3
#define ZMQ_CAN_MONITOR (ZMQ_VERSION > 30201)
#define ZMQ_CAN_SET_CTX (ZMQ_VERSION_MAJOR == 3 && ZMQ_VERSION_MINOR >= 2) || ZMQ_VERSION_MAJOR > 3

#define ZERO_COPY_MESSAGE_SEND 1

using namespace v8;
using namespace node;

enum {
    STATE_READY
  , STATE_BUSY
  , STATE_CLOSED
};

namespace zmq {

  std::set<int> opts_int;
  std::set<int> opts_uint32;
  std::set<int> opts_int64;
  std::set<int> opts_uint64;
  std::set<int> opts_binary;

  class Socket;

  class Context : public Nan::ObjectWrap {
    friend class Socket;
    public:
      static NAN_MODULE_INIT(Initialize);
      virtual ~Context();

    private:
      Context(int io_threads);
      static NAN_METHOD(New);
      static Context *GetContext(const Nan::FunctionCallbackInfo<Value>&);
      void Close();
      static NAN_METHOD(Close);
#if ZMQ_CAN_SET_CTX
      static NAN_METHOD(GetOpt);
      static NAN_METHOD(SetOpt);
#endif

      void* context_;
  };

  class Socket : public Nan::ObjectWrap {
    public:
      static NAN_MODULE_INIT(Initialize);
      virtual ~Socket();
      void NotifyReadReady();
      void NotifySendReady();
      void CallbackIfReady();

#if ZMQ_CAN_MONITOR
      void MonitorEvent(uint16_t event_id, int32_t event_value, char *endpoint);
      void MonitorError(const char *error_msg);
#endif

    private:
      static NAN_METHOD(New);
      Socket(Context *context, int type);

      static Socket* GetSocket(const Nan::FunctionCallbackInfo<Value>&);
      static NAN_GETTER(GetState);

      static NAN_GETTER(GetPending);
      static NAN_SETTER(SetPending);

      template<typename T>
      Local<Value> GetSockOpt(int option);
      template<typename T>
      Local<Value> SetSockOpt(int option, Local<Value> wrappedValue);
      static NAN_METHOD(GetSockOpt);
      static NAN_METHOD(SetSockOpt);

      void _AttachToEventLoop();
      void _DetachFromEventLoop();
      static NAN_METHOD(AttachToEventLoop);
      static NAN_METHOD(DetachFromEventLoop);

      struct BindState;
      static NAN_METHOD(Bind);

      static void UV_BindAsync(uv_work_t* req);
      static void UV_BindAsyncAfter(uv_work_t* req);

      static NAN_METHOD(BindSync);
#if ZMQ_CAN_UNBIND
      static NAN_METHOD(Unbind);

      static void UV_UnbindAsync(uv_work_t* req);
      static void UV_UnbindAsyncAfter(uv_work_t* req);

      static NAN_METHOD(UnbindSync);
#endif
      static NAN_METHOD(Connect);
#if ZMQ_CAN_DISCONNECT
      static NAN_METHOD(Disconnect);
#endif

      class IncomingMessage;
      class OutgoingMessage;
      static NAN_METHOD(Recv);
      static NAN_METHOD(Readv);
      static NAN_METHOD(Sendv);
      void Close();
      static NAN_METHOD(Close);

      Nan::Persistent<Object> context_;
      void *socket_;
      bool pending_;
      uint8_t state_;
      int32_t endpoints;
#if ZMQ_CAN_MONITOR
      void *monitor_socket_;
      uv_timer_t *monitor_handle_;
      int64_t timer_interval_;
      int64_t num_of_events_;
      static void UV_MonitorCallback(uv_timer_t* handle, int status);
      static NAN_METHOD(Monitor);
      void Unmonitor();
      static NAN_METHOD(Unmonitor);
#endif

      short PollForEvents();
      uv_poll_t *poll_handle_;
      static void UV_PollCallback(uv_poll_t* handle, int status, int events);
  };

  Nan::Persistent<String> send_callback_symbol;
  Nan::Persistent<String> read_callback_symbol;

#if ZMQ_CAN_MONITOR
  Nan::Persistent<String> monitor_symbol;
  Nan::Persistent<String> monitor_error;
  int monitors_count = 0;
#endif

  static NAN_MODULE_INIT(Initialize);

  static void
  on_uv_close(uv_handle_t *handle)
  {
    delete handle;
  }

  /*
   * Helpers for dealing with ØMQ errors.
   */

  static inline const char*
  ErrorMessage() {
    return zmq_strerror(zmq_errno());
  }

  static inline Local<Value>
  ExceptionFromError() {
    return Nan::Error(ErrorMessage());
  }

  /*
   * Context methods.
   */

  NAN_MODULE_INIT(Context::Initialize) {
    Nan::HandleScope scope;
    Local<FunctionTemplate> t = Nan::New<FunctionTemplate>(New);
    t->InstanceTemplate()->SetInternalFieldCount(1);

    Nan::SetPrototypeMethod(t, "close", Close);
#if ZMQ_CAN_SET_CTX
    Nan::SetPrototypeMethod(t, "setOpt", SetOpt);
    Nan::SetPrototypeMethod(t, "getOpt", GetOpt);
#endif

    Nan::Set(target, Nan::New("Context").ToLocalChecked(), Nan::GetFunction(t).ToLocalChecked());
  }


  Context::~Context() {
    Close();
  }

  NAN_METHOD(Context::New) {
    assert(info.IsConstructCall());
    int io_threads = 1;
    if (info.Length() == 1) {
      if (!info[0]->IsNumber()) {
        return Nan::ThrowTypeError("io_threads must be an integer");
      }
      io_threads = Nan::To<int>(info[0]).FromJust();
      if (io_threads < 1) {
        return Nan::ThrowRangeError("io_threads must be a positive number");
      }
    }
    Context *context = new Context(io_threads);
    context->Wrap(info.This());
    info.GetReturnValue().Set(info.This());
  }

  Context::Context(int io_threads) : Nan::ObjectWrap() {
    context_ = zmq_init(io_threads);
    if (!context_) Nan::ThrowError(ErrorMessage());
  }

  Context *
  Context::GetContext(const Nan::FunctionCallbackInfo<Value>& info) {
    return Nan::ObjectWrap::Unwrap<Context>(info.This());
  }

  void
  Context::Close() {
    if (context_ != NULL) {
      if (zmq_term(context_) < 0) {
        Nan::ThrowError(ErrorMessage());
        return;
      }
      context_ = NULL;
    }
  }

  NAN_METHOD(Context::Close) {
    GetContext(info)->Close();
    return;
  }

#if ZMQ_CAN_SET_CTX
  NAN_METHOD(Context::SetOpt) {
    if (info.Length() != 2)
      return Nan::ThrowError("Must pass an option and a value");
    if (!info[0]->IsNumber() || !info[1]->IsNumber())
      return Nan::ThrowTypeError("Arguments must be numbers");
    int option = Nan::To<int32_t>(info[0]).FromJust();
    int value = Nan::To<int32_t>(info[1]).FromJust();

    Context *context = GetContext(info);
    if (zmq_ctx_set(context->context_, option, value) < 0)
      return Nan::ThrowError(ExceptionFromError());
    return;
  }

  NAN_METHOD(Context::GetOpt) {
    if (info.Length() != 1)
      return Nan::ThrowError("Must pass an option");
    if (!info[0]->IsNumber())
      return Nan::ThrowTypeError("Option must be an integer");
    int option = Nan::To<int32_t>(info[0]).FromJust();

    Context *context = GetContext(info);
    int value = zmq_ctx_get(context->context_, option);
    info.GetReturnValue().Set(Nan::New<Integer>(value));
  }
#endif
  /*
   * Socket methods.
   */

  NAN_MODULE_INIT(Socket::Initialize) {
    Nan::HandleScope scope;

    Local<FunctionTemplate> t = Nan::New<FunctionTemplate>(New);
    t->InstanceTemplate()->SetInternalFieldCount(1);
    Nan::SetAccessor(t->InstanceTemplate(),
      Nan::New("state").ToLocalChecked(), Socket::GetState);
    Nan::SetAccessor(t->InstanceTemplate(),
      Nan::New("pending").ToLocalChecked(), GetPending, SetPending);

    Nan::SetPrototypeMethod(t, "bind", Bind);
    Nan::SetPrototypeMethod(t, "bindSync", BindSync);
#if ZMQ_CAN_UNBIND
    Nan::SetPrototypeMethod(t, "unbind", Unbind);
    Nan::SetPrototypeMethod(t, "unbindSync", UnbindSync);
#endif
    Nan::SetPrototypeMethod(t, "connect", Connect);
    Nan::SetPrototypeMethod(t, "getsockopt", GetSockOpt);
    Nan::SetPrototypeMethod(t, "setsockopt", SetSockOpt);
    Nan::SetPrototypeMethod(t, "ref", AttachToEventLoop);
    Nan::SetPrototypeMethod(t, "unref", DetachFromEventLoop);
    Nan::SetPrototypeMethod(t, "recv", Recv);
    Nan::SetPrototypeMethod(t, "readv", Readv);
    Nan::SetPrototypeMethod(t, "sendv", Sendv);
    Nan::SetPrototypeMethod(t, "close", Close);

#if ZMQ_CAN_DISCONNECT
    Nan::SetPrototypeMethod(t, "disconnect", Disconnect);
#endif

#if ZMQ_CAN_MONITOR
    Nan::SetPrototypeMethod(t, "monitor", Monitor);
    Nan::SetPrototypeMethod(t, "unmonitor", Unmonitor);
    monitor_symbol.Reset(Nan::New("onMonitorEvent").ToLocalChecked());
    monitor_error.Reset(Nan::New("onMonitorError").ToLocalChecked());
#endif

    Nan::Set(target, Nan::New("SocketBinding").ToLocalChecked(), Nan::GetFunction(t).ToLocalChecked());

    read_callback_symbol.Reset(Nan::New("onReadReady").ToLocalChecked());
    send_callback_symbol.Reset(Nan::New("onSendReady").ToLocalChecked());
  }

  Socket::~Socket() {
    Unmonitor();
    Close();
  }

  NAN_METHOD(Socket::New) {
    assert(info.IsConstructCall());

    if (info.Length() != 2) {
      return Nan::ThrowError("Must pass a context and a type to constructor");
    }

    Context *context = Nan::ObjectWrap::Unwrap<Context>(info[0].As<Object>());

    if (!info[1]->IsNumber()) {
      return Nan::ThrowTypeError("Type must be an integer");
    }

    int type = Nan::To<int>(info[1]).FromJust();

    Socket *socket = new Socket(context, type);
    socket->Wrap(info.This());
    info.GetReturnValue().Set(info.This());
  }

  short
  Socket::PollForEvents() {
    zmq_pollitem_t item = { socket_, 0, ZMQ_POLLIN, 0 };
    if (pending_)
      item.events |= ZMQ_POLLOUT;

    while (true) {
      int rc = zmq_poll(&item, 1, 0);
      if (rc < 0) {
        if (zmq_errno()==EINTR) {
          continue;
        } else {
          Nan::ThrowError(ErrorMessage());
          return -1;
        }
      } else {
        break;
      }
    }
    return item.revents & item.events;
  }

  void
  Socket::NotifyReadReady() {
    Nan::HandleScope scope;
    Local<Value> callback_v = Nan::Get(this->handle(), Nan::New(read_callback_symbol)).ToLocalChecked();

    Nan::MakeCallback(this->handle(), callback_v.As<Function>(), 0, NULL);
  }

  void
  Socket::NotifySendReady() {
    Nan::HandleScope scope;
    Local<Value> callback_v = Nan::Get(this->handle(), Nan::New(send_callback_symbol)).ToLocalChecked();

    Nan::MakeCallback(this->handle(), callback_v.As<Function>(), 0, NULL);
  }

  void
  Socket::CallbackIfReady() {
    short events = PollForEvents();

    if ((events & ZMQ_POLLIN) != 0) {
      NotifyReadReady();
    }

    if ((events & ZMQ_POLLOUT) != 0) {
      NotifySendReady();
    }
  }

  void
  Socket::UV_PollCallback(uv_poll_t* handle, int status, int events) {
    if (status != 0) {
      Nan::ThrowError("I/O status: socket not ready !=0 ");
      return;
    }
    Socket* s = static_cast<Socket*>(handle->data);
    s->CallbackIfReady();
  }

#if ZMQ_CAN_MONITOR
  void
  Socket::MonitorEvent(uint16_t event_id, int32_t event_value, char *event_endpoint) {
    Nan::HandleScope scope;

    Local<Value> callback_v = Nan::Get(this->handle(), Nan::New(monitor_symbol)).ToLocalChecked();
    if (!callback_v->IsFunction()) {
      return;
    }

    Local<Value> argv[4];
    argv[0] = Nan::New<Integer>(event_id);
    argv[1] = Nan::New<Integer>(event_value);
    argv[2] = Nan::New<String>(event_endpoint).ToLocalChecked();
    switch (event_id) {
      case ZMQ_EVENT_BIND_FAILED:
      case ZMQ_EVENT_ACCEPT_FAILED:
      case ZMQ_EVENT_CLOSE_FAILED:
        argv[3] = ExceptionFromError();
        break;
      default:
        argv[3] = Nan::Undefined();
        break;
    }

    Nan::MakeCallback(this->handle(), callback_v.As<Function>(), 4, argv);
  }

  void
  Socket::MonitorError(const char *error_msg) {
    Nan::HandleScope scope;

    Local<Value> callback_v = Nan::Get(this->handle(), Nan::New(monitor_error)).ToLocalChecked();
    if (!callback_v->IsFunction()) {
      return;
    }

    Local<Value> argv[1];
    argv[0] = Nan::New<String>(error_msg).ToLocalChecked();

    Nan::MakeCallback(this->handle(), callback_v.As<Function>(), 1, argv);
  }

  void
  Socket::UV_MonitorCallback(uv_timer_t* handle, int status) {
    Nan::HandleScope scope;
    Socket* s = static_cast<Socket*>(handle->data);
    zmq_msg_t msg1; /* 3.x has 1 message per event */

    if (s->state_ == STATE_CLOSED)
      return;

    zmq_pollitem_t item;
    item.socket = s->monitor_socket_;
    item.events = ZMQ_POLLIN;

    const char* error = NULL;
    int64_t ittr = 0;
    while ((s->num_of_events_ == 0 || s->num_of_events_ > ittr++) && zmq_poll(&item, 1, 0)) {
      zmq_msg_init (&msg1);
      if (zmq_recvmsg (s->monitor_socket_, &msg1, ZMQ_DONTWAIT) > 0) {
        char event_endpoint[1025];
        uint16_t event_id;
        int32_t event_value;

#if ZMQ_VERSION_MAJOR >= 4
        uint8_t *data = static_cast<uint8_t *>(zmq_msg_data(&msg1));
        event_id = *reinterpret_cast<uint16_t *>(data);
        event_value = *reinterpret_cast<uint32_t *>(data + 2);

        zmq_msg_t msg2; /* 4.x has 2 messages per event */

        // get our next frame it may have the target address and safely copy to our buffer
        zmq_msg_init (&msg2);
        if (zmq_msg_more(&msg1) == 0 || zmq_recvmsg (s->monitor_socket_, &msg2, 0) == -1) {
          error = ErrorMessage();
          zmq_msg_close(&msg2);
          break;
        }

        // protect from overflow
        size_t len = zmq_msg_size(&msg2);
        // MIN message size and buffer size with null padding
        len = len < sizeof(event_endpoint)-1 ? len : sizeof(event_endpoint)-1;
        memcpy(event_endpoint, zmq_msg_data(&msg2), len);
        zmq_msg_close(&msg2);

        // null terminate our string
        event_endpoint[len]=0;
#else
        // monitoring on zmq < 4 used zmq_event_t
        zmq_event_t event;
        memcpy (&event, zmq_msg_data (&msg1), sizeof (zmq_event_t));
        event_id = event.event;

        // Bit of a hack, but all events in the zmq_event_t union have the same layout so this will work for all event types.
        event_value = event.data.connected.fd;
        snprintf(event_endpoint, sizeof(event_endpoint), "%s", event.data.connected.addr);
#endif

        s->MonitorEvent(event_id, event_value, event_endpoint);
        zmq_msg_close(&msg1);
      }
      else {
        error = ErrorMessage();
        zmq_msg_close(&msg1);
        break;
      }
    }

    // If there was no error and we still monitor we reset the monitor timer
    if (error == NULL && s->monitor_handle_ != NULL) {
      uv_timer_start(s->monitor_handle_, reinterpret_cast<uv_timer_cb>(Socket::UV_MonitorCallback), s->timer_interval_, 0);
    }
    // If error raise the monitor error event and stop the monitor
    else if (error != NULL) {
      s->Unmonitor();
      s->MonitorError(error);
    }
  }
#endif

  Socket::Socket(Context *context, int type) : Nan::ObjectWrap() {
    context_.Reset(context->handle());
    socket_ = zmq_socket(context->context_, type);
    pending_ = false;
    state_ = STATE_READY;

    if (NULL == socket_) {
      Nan::ThrowError(ErrorMessage());
      return;
    }

    endpoints = 0;

    poll_handle_ = new uv_poll_t;

    poll_handle_->data = this;

    uv_os_sock_t socket;
    size_t len = sizeof(uv_os_sock_t);

    if (zmq_getsockopt(socket_, ZMQ_FD, &socket, &len)) {
      Nan::ThrowError(ErrorMessage());
      return;
    }

    #if ZMQ_CAN_MONITOR
      this->monitor_socket_ = NULL;
    #endif

    uv_poll_init_socket(uv_default_loop(), poll_handle_, socket);
    uv_poll_start(poll_handle_, UV_READABLE, Socket::UV_PollCallback);
  }

  Socket *
  Socket::GetSocket(const Nan::FunctionCallbackInfo<Value> &info) {
    return Nan::ObjectWrap::Unwrap<Socket>(info.This());
  }

  /*
   * This macro makes a call to GetSocket and checks the socket state. These two
   * things go hand in hand everywhere in our code.
   */
  #define GET_SOCKET(info)                                      \
      Socket* socket = GetSocket(info);                         \
      if (socket->state_ == STATE_CLOSED)                       \
          return Nan::ThrowTypeError("Socket is closed");       \
      if (socket->state_ == STATE_BUSY)                         \
          return Nan::ThrowTypeError("Socket is busy");

  NAN_GETTER(Socket::GetState) {
    Socket* socket = Nan::ObjectWrap::Unwrap<Socket>(info.Holder());
    info.GetReturnValue().Set(Nan::New<Integer>(socket->state_));
  }

  NAN_GETTER(Socket::GetPending) {
    Socket* socket = Nan::ObjectWrap::Unwrap<Socket>(info.Holder());
    info.GetReturnValue().Set(socket->pending_);
  }

  NAN_SETTER(Socket::SetPending) {
    if (!value->IsBoolean())
      return Nan::ThrowTypeError("Pending must be a boolean");

    Socket* socket = Nan::ObjectWrap::Unwrap<Socket>(info.Holder());
    socket->pending_ = Nan::To<bool>(value).FromJust();
  }

  template<typename T>
  Local<Value> Socket::GetSockOpt(int option) {
    T value = 0;
    size_t len = sizeof(T);
    while (true) {
      int rc = zmq_getsockopt(socket_, option, &value, &len);
      if (rc < 0) {
        if(zmq_errno()==EINTR) {
          continue;
        }
        Nan::ThrowError(ExceptionFromError());
        return Nan::Undefined();
      }
      break;
    }
    return Nan::New<Number>(value);
  }

  template<typename T>
  Local<Value> Socket::SetSockOpt(int option, Local<Value> wrappedValue) {
    if (!wrappedValue->IsNumber()) {
      Nan::ThrowError("Value must be an integer");
      return Nan::Undefined();
    }
    T value = Nan::To<T>(wrappedValue).FromJust();
    if (zmq_setsockopt(socket_, option, &value, sizeof(T)) < 0)
      Nan::ThrowError(ExceptionFromError());
    return Nan::Undefined();
  }

  template<> Local<Value>
  Socket::GetSockOpt<char*>(int option) {
    char value[1024];
    size_t len = sizeof(value) - 1;
    if (zmq_getsockopt(socket_, option, value, &len) < 0) {
      Nan::ThrowError(ExceptionFromError());
      return Nan::Undefined();
    }
    value[len] = '\0';
    return Nan::New<String>(value).ToLocalChecked();
  }

  template<> Local<Value>
  Socket::SetSockOpt<char*>(int option, Local<Value> wrappedValue) {
    if (!Buffer::HasInstance(wrappedValue)) {
      Nan::ThrowTypeError("Value must be a buffer");
      return Nan::Undefined();
    }
    Local<Object> buf = wrappedValue.As<Object>();
    size_t length = Buffer::Length(buf);
    if (zmq_setsockopt(socket_, option, Buffer::Data(buf), length) < 0)
      Nan::ThrowError(ExceptionFromError());
    return Nan::Undefined();
  }

  NAN_METHOD(Socket::GetSockOpt) {
    if (info.Length() != 1)
      return Nan::ThrowError("Must pass an option");
    if (!info[0]->IsNumber())
      return Nan::ThrowTypeError("Option must be an integer");
    int option = Nan::To<int>(info[0]).FromJust();

    GET_SOCKET(info);

    if (opts_int.count(option)) {
      info.GetReturnValue().Set(socket->GetSockOpt<int>(option));
    } else if (opts_uint32.count(option)) {
      info.GetReturnValue().Set(socket->GetSockOpt<uint32_t>(option));
    } else if (opts_int64.count(option)) {
      info.GetReturnValue().Set(socket->GetSockOpt<int64_t>(option));
    } else if (opts_uint64.count(option)) {
      info.GetReturnValue().Set(socket->GetSockOpt<uint64_t>(option));
    } else if (opts_binary.count(option)) {
      info.GetReturnValue().Set(socket->GetSockOpt<char*>(option));
    } else {
      return Nan::ThrowError(zmq_strerror(EINVAL));
    }
  }

  NAN_METHOD(Socket::SetSockOpt) {
    if (info.Length() != 2)
      return Nan::ThrowError("Must pass an option and a value");
    if (!info[0]->IsNumber())
      return Nan::ThrowTypeError("Option must be an integer");
    int option = Nan::To<int>(info[0]).FromJust();
    GET_SOCKET(info);

    if (opts_int.count(option)) {
      info.GetReturnValue().Set(socket->SetSockOpt<int>(option, info[1]));
    } else if (opts_uint32.count(option)) {
      info.GetReturnValue().Set(socket->SetSockOpt<uint32_t>(option, info[1]));
    } else if (opts_int64.count(option)) {
      info.GetReturnValue().Set(socket->SetSockOpt<int64_t>(option, info[1]));
    } else if (opts_uint64.count(option)) {
      info.GetReturnValue().Set(socket->SetSockOpt<int64_t>(option, info[1]));
    } else if (opts_binary.count(option)) {
      info.GetReturnValue().Set(socket->SetSockOpt<char*>(option, info[1]));
    } else {
      return Nan::ThrowError(zmq_strerror(EINVAL));
    }
  }

  void Socket::_AttachToEventLoop() {
    uv_ref(reinterpret_cast<uv_handle_t *>(this->poll_handle_));
  }

  NAN_METHOD(Socket::AttachToEventLoop) {
    GET_SOCKET(info);
    socket->_AttachToEventLoop();
  }

  void Socket::_DetachFromEventLoop() {
    uv_unref(reinterpret_cast<uv_handle_t *>(this->poll_handle_));
  }

  NAN_METHOD(Socket::DetachFromEventLoop) {
    GET_SOCKET(info);
    socket->_DetachFromEventLoop();
  }

  struct Socket::BindState {
    BindState(Socket* sock_, Local<Function> cb_, Local<String> addr_)
          : addr(addr_) {
      sock_obj.Reset(sock_->handle());
      sock = sock_->socket_;
      cb.Reset(cb_);
      error = 0;
    }

    ~BindState() {
      sock_obj.Reset();
      cb.Reset();
    }

    Nan::Persistent<Object> sock_obj;
    void* sock;
    Nan::Persistent<Function> cb;
    Nan::Utf8String addr;
    int error;
  };

  NAN_METHOD(Socket::Bind) {
    if (!info[0]->IsString())
      return Nan::ThrowTypeError("Address must be a string!");
    Local<String> addr = info[0].As<String>();
    if (info.Length() > 1 && !info[1]->IsFunction())
      return Nan::ThrowTypeError("Provided callback must be a function");
    Local<Function> cb = Local<Function>::Cast(info[1]);

    GET_SOCKET(info);

    BindState* state = new BindState(socket, cb, addr);
    uv_work_t* req = new uv_work_t;
    req->data = state;
    uv_queue_work(uv_default_loop(),
                  req,
                  UV_BindAsync,
                  (uv_after_work_cb)UV_BindAsyncAfter);
    socket->state_ = STATE_BUSY;

    return;
  }

  void Socket::UV_BindAsync(uv_work_t* req) {
    BindState* state = static_cast<BindState*>(req->data);
    if (zmq_bind(state->sock, *state->addr) < 0)
        state->error = zmq_errno();
  }

  void Socket::UV_BindAsyncAfter(uv_work_t* req) {
    BindState* state = static_cast<BindState*>(req->data);
    Nan::HandleScope scope;

    Local<Value> argv[1];

    if (state->error) {
      argv[0] = Nan::Error(zmq_strerror(state->error));
    } else {
      argv[0] = Nan::Undefined();
    }

    Local<Function> cb = Nan::New(state->cb);

    Socket *socket = Nan::ObjectWrap::Unwrap<Socket>(Nan::New(state->sock_obj));
    socket->state_ = STATE_READY;

    if (socket->endpoints == 0) {
      socket->Ref();
      socket->_AttachToEventLoop();
    }
    socket->endpoints += 1;

    Nan::MakeCallback(Nan::GetCurrentContext()->Global(), cb, 1, argv);

    delete state;
    delete req;
  }

  NAN_METHOD(Socket::BindSync) {
    if (!info[0]->IsString())
      return Nan::ThrowTypeError("Address must be a string!");
    Nan::Utf8String addr(info[0].As<String>());
    GET_SOCKET(info);
    socket->state_ = STATE_BUSY;
    if (zmq_bind(socket->socket_, *addr) < 0) {
      socket->state_ = STATE_READY;
      return Nan::ThrowError(ErrorMessage());
    }

    socket->state_ = STATE_READY;

    if (socket->endpoints == 0) {
      socket->Ref();
      socket->_AttachToEventLoop();
    }

    socket->endpoints += 1;

    return;
  }

#if ZMQ_CAN_UNBIND
  NAN_METHOD(Socket::Unbind) {
    if (!info[0]->IsString())
      return Nan::ThrowTypeError("Address must be a string!");
    Local<String> addr = info[0].As<String>();
    if (info.Length() > 1 && !info[1]->IsFunction())
      return Nan::ThrowTypeError("Provided callback must be a function");
    Local<Function> cb = Local<Function>::Cast(info[1]);

    GET_SOCKET(info);

    BindState* state = new BindState(socket, cb, addr);
    uv_work_t* req = new uv_work_t;
    req->data = state;
    uv_queue_work(uv_default_loop(),
                  req,
                  UV_UnbindAsync,
                  (uv_after_work_cb)UV_UnbindAsyncAfter);
    socket->state_ = STATE_BUSY;
    return;
  }

  void Socket::UV_UnbindAsync(uv_work_t* req) {
    BindState* state = static_cast<BindState*>(req->data);
    if (zmq_unbind(state->sock, *state->addr) < 0)
        state->error = zmq_errno();
  }

  void Socket::UV_UnbindAsyncAfter(uv_work_t* req) {
    BindState* state = static_cast<BindState*>(req->data);
    Nan::HandleScope scope;

    Local<Value> argv[1];

    if (state->error) {
      argv[0] = Nan::Error(zmq_strerror(state->error));
    } else {
      argv[0] = Nan::Undefined();
    }

    Local<Function> cb = Nan::New(state->cb);

    Socket *socket = Nan::ObjectWrap::Unwrap<Socket>(Nan::New(state->sock_obj));
    socket->state_ = STATE_READY;

    if (--socket->endpoints == 0) {
      socket->Unref();
      socket->_DetachFromEventLoop();
    }

    Nan::MakeCallback(Nan::GetCurrentContext()->Global(), cb, 1, argv);

    delete state;
    delete req;
  }

  NAN_METHOD(Socket::UnbindSync) {
    if (!info[0]->IsString())
      return Nan::ThrowTypeError("Address must be a string!");
    Nan::Utf8String addr(info[0].As<String>());
    GET_SOCKET(info);
    socket->state_ = STATE_BUSY;
    if (zmq_unbind(socket->socket_, *addr) < 0) {
      socket->state_ = STATE_READY;
      return Nan::ThrowError(ErrorMessage());
    }

    socket->state_ = STATE_READY;

    if (--socket->endpoints == 0) {
      socket->Unref();
      socket->_DetachFromEventLoop();
    }

    return;
  }
#endif

  NAN_METHOD(Socket::Connect) {
    if (!info[0]->IsString()) {
      return Nan::ThrowTypeError("Address must be a string!");
    }

    GET_SOCKET(info);

    Nan::Utf8String address(info[0].As<String>());
    if (zmq_connect(socket->socket_, *address))
      return Nan::ThrowError(ErrorMessage());

    if (socket->endpoints++ == 0) {
      socket->Ref();
      socket->_AttachToEventLoop();
    }

    return;
  }

#if ZMQ_CAN_DISCONNECT
  NAN_METHOD(Socket::Disconnect) {

    if (!info[0]->IsString()) {
      return Nan::ThrowTypeError("Address must be a string!");
    }

    GET_SOCKET(info);

    Nan::Utf8String address(info[0].As<String>());
    if (zmq_disconnect(socket->socket_, *address))
      return Nan::ThrowError(ErrorMessage());
    if (--socket->endpoints == 0) {
      socket->Unref();
      socket->_DetachFromEventLoop();
    }

    return;
  }
#endif

  /*
   * An object that creates an empty ØMQ message, which can be used for
   * zmq_recv. After the receive call, a Buffer object wrapping the ØMQ
   * message can be requested. The reference for the ØMQ message will
   * remain while the data is in use by the Buffer.
   */

  class Socket::IncomingMessage {
    public:
      inline IncomingMessage() {
        msgref_ = new MessageReference();
      };

      inline ~IncomingMessage() {
        if (buf_.IsEmpty() && msgref_) {
          delete msgref_;
          msgref_ = NULL;
        } else {
          buf_.Reset();
        }
      };

      inline operator zmq_msg_t*() {
        return *msgref_;
      }

      inline Local<Value> GetBuffer() {
        if (buf_.IsEmpty()) {
          Local<Object> buf_obj = Nan::NewBuffer((char*)zmq_msg_data(*msgref_), zmq_msg_size(*msgref_), FreeCallback, msgref_).ToLocalChecked();
          if (buf_obj.IsEmpty()) {
            return Local<Value>();
          }
          buf_.Reset(buf_obj);
        }
        return Nan::New(buf_);
      }

    private:
      static void FreeCallback(char* data, void* message) {
        delete static_cast<MessageReference*>(message);
      }

      class MessageReference {
        public:
          inline MessageReference() {
            if (zmq_msg_init(&msg_) < 0)
              Nan::ThrowError(ErrorMessage());
          }

          inline ~MessageReference() {
            if (zmq_msg_close(&msg_) < 0)
              Nan::ThrowError(ErrorMessage());
          }

          inline operator zmq_msg_t*() {
            return &msg_;
          }

        private:
          zmq_msg_t msg_;
      };

      Nan::Persistent<Object> buf_;
      MessageReference* msgref_;
  };

  class Socket::OutgoingMessage {
    public:
      inline OutgoingMessage(Local<Object> buf)
        : bufref_(new BufferReference(buf)) {
        if (zmq_msg_init_data(&msg_, Buffer::Data(buf), Buffer::Length(buf),
            BufferReference::FreeCallback, bufref_) < 0) {
          delete bufref_;
          Nan::ThrowError(ErrorMessage());
        }
      };

      inline ~OutgoingMessage() {
        if (zmq_msg_close(&msg_) < 0)
          Nan::ThrowError(ErrorMessage());
      };

      inline operator zmq_msg_t*() {
        return &msg_;
      }

    private:
      class BufferReference {
        public:
          inline BufferReference(Local<Object> buf)
            : persistent_(buf), async_(new uv_async_t) {
            if (uv_async_init(uv_default_loop(), async_, Destroy) < 0) {
              delete async_;
              delete this;
              Nan::ThrowError("Async initialization failed");
            } else {
              async_->data = this;
            }
          }

          inline ~BufferReference() {
            persistent_.Reset();
          }

          // Called by zmq when the message has been sent.
          // NOTE: May be called from a worker thread. Do not modify V8/Node.
          static void FreeCallback(void*, void* bufref) {
            int result = uv_async_send(static_cast<BufferReference*>(bufref)->async_);
            assert(result == 0);
          }

          static void Destroy(uv_async_t* async) {
            uv_close(reinterpret_cast<uv_handle_t*>(async), on_uv_close);
            BufferReference* bufref = static_cast<BufferReference*>(async->data);
            delete bufref;
          }
        private:
          Nan::Persistent<Object> persistent_;
          uv_async_t* async_;
      };

    zmq_msg_t msg_;
    BufferReference* bufref_;
  };


#if ZMQ_CAN_MONITOR
  NAN_METHOD(Socket::Monitor) {
    int64_t timer_interval = 10; // default to 10ms interval
    int64_t num_of_events = 1; // default is 1 event per interval

    if (info.Length() > 0 && !info[0]->IsUndefined()) {
      if (!info[0]->IsNumber())
        return Nan::ThrowTypeError("Option must be an integer");
      timer_interval = Nan::To<int64_t>(info[0]).FromJust();
      if (timer_interval <= 0)
        return Nan::ThrowTypeError("Option must be a positive integer");
    }

    if (info.Length() > 1 && !info[1]->IsUndefined()) {
      if (!info[1]->IsNumber())
        return Nan::ThrowTypeError("numOfEvents must be an integer");
      num_of_events = Nan::To<int64_t>(info[1]).FromJust();
      if (num_of_events < 0)
        return Nan::ThrowTypeError("numOfEvents should be no less than zero");
    }

    GET_SOCKET(info);
    char addr[255];
    Context *context = Nan::ObjectWrap::Unwrap<Context>(Nan::New(socket->context_));
    sprintf(addr, "%s%d", "inproc://monitor.req.", monitors_count++);

    if(zmq_socket_monitor(socket->socket_, addr, ZMQ_EVENT_ALL) != -1) {
      socket->monitor_socket_ = zmq_socket (context->context_, ZMQ_PAIR);
      zmq_connect (socket->monitor_socket_, addr);
      socket->timer_interval_ = timer_interval;
      socket->num_of_events_ = num_of_events;
      socket->monitor_handle_ = new uv_timer_t;
      socket->monitor_handle_->data = socket;

      uv_timer_init(uv_default_loop(), socket->monitor_handle_);
      uv_timer_start(socket->monitor_handle_, reinterpret_cast<uv_timer_cb>(Socket::UV_MonitorCallback), timer_interval, 0);
    }

    return;
  }

  void
  Socket::Unmonitor() {
    // Make sure we are monitoring
    if (this->monitor_socket_ == NULL) {
      return;
    }

    // Passing NULL as addr will tell zmq to stop monitor
    zmq_socket_monitor(this->socket_, NULL, ZMQ_EVENT_ALL);

    // Close the monitor socket and stop timer
    if (zmq_close(this->monitor_socket_) < 0) {
      Nan::ThrowError(ErrorMessage());
      return;
    }
    uv_timer_stop(this->monitor_handle_);
    uv_close(reinterpret_cast<uv_handle_t*>(this->monitor_handle_), on_uv_close);
    this->monitor_handle_ = NULL;
    this->monitor_socket_ = NULL;
  }

  NAN_METHOD(Socket::Unmonitor) {
    // We can't use the GET_SOCKET macro here as it requries the socket to be open,
    // which might not always be the case
    Socket* socket = GetSocket(info);
    socket->Unmonitor();
    return;
  }

#endif

  NAN_METHOD(Socket::Readv) {
    Socket* socket = GetSocket(info);
    if (socket->state_ != STATE_READY)
      return;

    int events;
    size_t events_size = sizeof(events);
    bool checkPollIn = true;

    int rc = 0;
    int flags = 0;
    int64_t more = 1;
    size_t more_size = sizeof(more);
    uint32_t index = 0;

    Local<Array> result = Nan::New<Array>();

    while (more == 1) {
      if (checkPollIn) {
        while (zmq_getsockopt(socket->socket_, ZMQ_EVENTS, &events, &events_size)) {
          if (zmq_errno() != EINTR)
            return Nan::ThrowError(ErrorMessage());
        }

        if ((events & ZMQ_POLLIN) == 0)
          return;
      }

      IncomingMessage part;

      while (true) {
        rc = zmq_msg_init(part);
        if (rc != 0) {
          if (zmq_errno()==EINTR) {
            continue;
          }
          return Nan::ThrowError(ErrorMessage());
        }
        break;
      }

      while (true) {
      #if ZMQ_VERSION_MAJOR == 2
        rc = zmq_recv(socket->socket_, part, flags);
      #elif ZMQ_VERSION_MAJOR == 3
        rc = zmq_recvmsg(socket->socket_, part, flags);
      #else
        rc = zmq_msg_recv(part, socket->socket_, flags);
        checkPollIn = false;
      #endif

        if (rc < 0) {
          if (zmq_errno() == EINTR)
            continue;
          return Nan::ThrowError(ErrorMessage());
        }

        Nan::Set(result, index++, part.GetBuffer());
        break;
      }

      while (zmq_getsockopt(socket->socket_, ZMQ_RCVMORE, &more, &more_size)) {
        if (zmq_errno() != EINTR)
          return Nan::ThrowError(ErrorMessage());
      }
    }

    info.GetReturnValue().Set(result);
  }

  NAN_METHOD(Socket::Recv) {
    int flags = 0;
    int argc = info.Length();
    if (argc == 1) {
      if (!info[0]->IsNumber())
        return Nan::ThrowTypeError("Argument should be an integer");
      flags = Nan::To<int>(info[0]).FromJust();
    } else if (argc != 0) {
      return Nan::ThrowTypeError("Only one argument at most was expected");
    }

    GET_SOCKET(info);

    IncomingMessage msg;
    while (true) {
      int rc;
    #if ZMQ_VERSION_MAJOR == 2
      rc = zmq_recv(socket->socket_, msg, flags);
    #else
      rc = zmq_recvmsg(socket->socket_, msg, flags);
    #endif
      if (rc < 0) {
        if (zmq_errno()==EINTR) {
          continue;
        }
        return Nan::ThrowError(ErrorMessage());
      } else {
        break;
      }
    }
    info.GetReturnValue().Set(msg.GetBuffer());
  }

  NAN_METHOD(Socket::Sendv) {
    Socket* socket = GetSocket(info);
    if (socket->state_ != STATE_READY)
      return info.GetReturnValue().Set(false);

    int events;
    size_t events_size = sizeof(events);
    bool checkPollOut = true;
    bool readsReady = false;

    Local<Array> batch = info[0].As<Array>();
    size_t len = batch->Length();

    if (len == 0)
      return info.GetReturnValue().Set(true);

    if (len % 2 != 0)
      return Nan::ThrowTypeError("Batch length must be even!");

    for (uint32_t i = 0; i < len; i += 2) {
      if (checkPollOut) {
        while (zmq_getsockopt(socket->socket_, ZMQ_EVENTS, &events, &events_size)) {
          if (zmq_errno() != EINTR)
            return Nan::ThrowError(ErrorMessage());
        }

        if ((events & ZMQ_POLLIN) != 0) {
          readsReady = true;
        }

        if ((events & ZMQ_POLLOUT) == 0) {
          if (readsReady) {
            socket->NotifyReadReady();
          }
          return info.GetReturnValue().Set(false);
        }
      }

      Local<Object> buf = batch->Get(Nan::GetCurrentContext(), i).ToLocalChecked().As<Object>();
      Local<Number> flagsObj = batch->Get(Nan::GetCurrentContext(), i + 1).ToLocalChecked().As<Number>();

      int flags = Nan::To<int>(flagsObj).FromJust();

#if ZERO_COPY_MESSAGE_SEND
      /* Non-copying implementation. */
      OutgoingMessage msg_p(buf);
#else
      /* Copying implementation. */
      zmq_msg_t msg;
      int rc;
      size_t len = Buffer::Length(buf);
      rc = zmq_msg_init_size(&msg, len);
      if (rc != 0)
        return Nan::ThrowError(ErrorMessage());

      char* cp = static_cast<char *>(zmq_msg_data(&msg));
      const char* dat = Buffer::Data(buf);
      std::copy(dat, dat + len, cp);
      zmq_msg_t* msg_p = &msg;
#endif

      while (true) {
        int rc;
      #if ZMQ_VERSION_MAJOR == 2
        rc = zmq_send(socket->socket_, msg_p, flags);
      #elif ZMQ_VERSION_MAJOR == 3
        rc = zmq_sendmsg(socket->socket_, msg_p, flags);
      #else
        rc = zmq_msg_send(msg_p, socket->socket_, flags);
        checkPollOut = false;
      #endif
        if (rc < 0){
          if (zmq_errno() == EINTR) {
            continue;
          }
          return Nan::ThrowError(ErrorMessage());
        }
        break;
      }
    }

    while (zmq_getsockopt(socket->socket_, ZMQ_EVENTS, &events, &events_size)) {
      if (zmq_errno() != EINTR)
        return Nan::ThrowError(ErrorMessage());
    }

    if ((events & ZMQ_POLLIN) != 0) {
      readsReady = true;
    }

    if (readsReady) {
      socket->NotifyReadReady();
    }

    return info.GetReturnValue().Set(true);
  }

  void
  Socket::Close() {
    if (socket_) {
      if (zmq_close(socket_) < 0) {
        Nan::ThrowError(ErrorMessage());
        return;
      }
      socket_ = NULL;
      state_ = STATE_CLOSED;
      context_.Reset();

      if (this->endpoints > 0)
        this->Unref();
      this->endpoints = 0;

      uv_poll_stop(poll_handle_);
      uv_close(reinterpret_cast<uv_handle_t*>(poll_handle_), on_uv_close);
    }
  }

  NAN_METHOD(Socket::Close) {
    GET_SOCKET(info);
    socket->Close();
    return;
  }

  // Make zeromq versions less than 2.1.3 work by defining
  // the new constants if they don't already exist
  #if (ZMQ_VERSION < 20103)
  #   define ZMQ_DEALER ZMQ_XREQ
  #   define ZMQ_ROUTER ZMQ_XREP
  #endif

  /*
   * Module functions.
   */

   static NAN_METHOD(ZmqVersion) {
    int major, minor, patch;
    zmq_version(&major, &minor, &patch);

    char version_info[16];
    snprintf(version_info, 16, "%d.%d.%d", major, minor, patch);

    info.GetReturnValue().Set(Nan::New<String>(version_info).ToLocalChecked());
  }

#if ZMQ_VERSION_MAJOR >= 4
   static NAN_METHOD(ZmqCurveKeypair) {

    char public_key [41];
    char secret_key [41];

    int rc = zmq_curve_keypair( public_key, secret_key);
    if (rc < 0) {
      return Nan::ThrowError("zmq_curve_keypair operation failed. Method support in libzmq v4+ -with-libsodium.");
    }

    Local<Object> obj = Nan::New<Object>();
    Nan::Set(obj, Nan::New<String>("public").ToLocalChecked(), Nan::New<String>(public_key).ToLocalChecked());
    Nan::Set(obj, Nan::New<String>("secret").ToLocalChecked(), Nan::New<String>(secret_key).ToLocalChecked());

    info.GetReturnValue().Set(obj);
  }
#endif

  static NAN_MODULE_INIT(Initialize) {
    Nan::HandleScope scope;

    opts_int.insert(14); // ZMQ_FD
    opts_int.insert(16); // ZMQ_TYPE
    opts_int.insert(17); // ZMQ_LINGER
    opts_int.insert(18); // ZMQ_RECONNECT_IVL
    opts_int.insert(19); // ZMQ_BACKLOG
    opts_int.insert(21); // ZMQ_RECONNECT_IVL_MAX
    opts_int.insert(23); // ZMQ_SNDHWM
    opts_int.insert(24); // ZMQ_RCVHWM
    opts_int.insert(25); // ZMQ_MULTICAST_HOPS
    opts_int.insert(27); // ZMQ_RCVTIMEO
    opts_int.insert(28); // ZMQ_SNDTIMEO
    opts_int.insert(29); // ZMQ_RCVLABEL
    opts_int.insert(30); // ZMQ_RCVCMD
    opts_int.insert(31); // ZMQ_IPV4ONLY
    opts_int.insert(33); // ZMQ_ROUTER_MANDATORY
    opts_int.insert(34); // ZMQ_TCP_KEEPALIVE
    opts_int.insert(35); // ZMQ_TCP_KEEPALIVE_CNT
    opts_int.insert(36); // ZMQ_TCP_KEEPALIVE_IDLE
    opts_int.insert(37); // ZMQ_TCP_KEEPALIVE_INTVL
    opts_int.insert(39); // ZMQ_DELAY_ATTACH_ON_CONNECT
    opts_int.insert(40); // ZMQ_XPUB_VERBOSE
    opts_int.insert(41); // ZMQ_ROUTER_RAW
    opts_int.insert(42); // ZMQ_IPV6

    opts_int64.insert(3); // ZMQ_SWAP
    opts_int64.insert(8); // ZMQ_RATE
    opts_int64.insert(10); // ZMQ_MCAST_LOOP
    opts_int64.insert(20); // ZMQ_RECOVERY_IVL_MSEC
    opts_int64.insert(22); // ZMQ_MAXMSGSIZE

    opts_uint64.insert(1); // ZMQ_HWM
    opts_uint64.insert(4); // ZMQ_AFFINITY

    opts_binary.insert(5); // ZMQ_IDENTITY
    opts_binary.insert(6); // ZMQ_SUBSCRIBE
    opts_binary.insert(7); // ZMQ_UNSUBSCRIBE
    opts_binary.insert(32); // ZMQ_LAST_ENDPOINT
    opts_binary.insert(38); // ZMQ_TCP_ACCEPT_FILTER

    // transition types
    #if ZMQ_VERSION_MAJOR >= 3
    opts_int.insert(15); // ZMQ_EVENTS 3.x int
    opts_int.insert(8); // ZMQ_RATE 3.x int
    opts_int.insert(9); // ZMQ_RECOVERY_IVL 3.x int
    opts_int.insert(13); // ZMQ_RCVMORE 3.x int
    opts_int.insert(11); // ZMQ_SNDBUF 3.x int
    opts_int.insert(12); // ZMQ_RCVBUF 3.x int
    #else
    opts_uint32.insert(15); // ZMQ_EVENTS 2.x uint32_t
    opts_int64.insert(8); // ZMQ_RATE 2.x int64_t
    opts_int64.insert(9); // ZMQ_RECOVERY_IVL 2.x int64_t
    opts_int64.insert(13); // ZMQ_RCVMORE 2.x int64_t
    opts_uint64.insert(11); // ZMQ_SNDBUF 2.x uint64_t
    opts_uint64.insert(12); // ZMQ_RCVBUF 2.x uint64_t
    #endif

    #if ZMQ_VERSION_MAJOR >= 4
    opts_int.insert(43); // ZMQ_MECHANISM
    opts_int.insert(44); // ZMQ_PLAIN_SERVER
    opts_binary.insert(45); // ZMQ_PLAIN_USERNAME
    opts_binary.insert(46); // ZMQ_PLAIN_PASSWORD
    opts_int.insert(47); // ZMQ_CURVE_SERVER
    opts_binary.insert(48); // ZMQ_CURVE_PUBLICKEY
    opts_binary.insert(49); // ZMQ_CURVE_SECRETKEY
    opts_binary.insert(50); // ZMQ_CURVE_SERVERKEY
    opts_int.insert(51); //ZMQ_PROBE_ROUTER
    opts_binary.insert(55); // ZMQ_ZAP_DOMAIN
    opts_int.insert(56); // ZMQ_ROUTER_HANDOVER
    opts_int.insert(66); //ZMQ_HANDSHAKE_IVL
    #if ZMQ_VERSION_MINOR >= 2
    opts_int.insert(75); //ZMQ_HEARTBEAT_IVL
    opts_int.insert(76); //ZMQ_HEARTBEAT_TTL
    opts_int.insert(77); //ZMQ_HEARTBEAT_TIMEOUT
    opts_int.insert(79); //ZMQ_CONNECT_TIMEOUT
    #endif
    #endif

    NODE_DEFINE_CONSTANT(target, ZMQ_CAN_DISCONNECT);
    NODE_DEFINE_CONSTANT(target, ZMQ_CAN_UNBIND);
    NODE_DEFINE_CONSTANT(target, ZMQ_CAN_MONITOR);
    NODE_DEFINE_CONSTANT(target, ZMQ_CAN_SET_CTX);
    NODE_DEFINE_CONSTANT(target, ZMQ_PUB);
    NODE_DEFINE_CONSTANT(target, ZMQ_SUB);
    #if ZMQ_VERSION_MAJOR >= 3
    NODE_DEFINE_CONSTANT(target, ZMQ_XPUB);
    NODE_DEFINE_CONSTANT(target, ZMQ_XSUB);
    #endif
    NODE_DEFINE_CONSTANT(target, ZMQ_REQ);
    NODE_DEFINE_CONSTANT(target, ZMQ_XREQ);
    NODE_DEFINE_CONSTANT(target, ZMQ_REP);
    NODE_DEFINE_CONSTANT(target, ZMQ_XREP);
    NODE_DEFINE_CONSTANT(target, ZMQ_DEALER);
    NODE_DEFINE_CONSTANT(target, ZMQ_ROUTER);
    NODE_DEFINE_CONSTANT(target, ZMQ_PUSH);
    NODE_DEFINE_CONSTANT(target, ZMQ_PULL);
    NODE_DEFINE_CONSTANT(target, ZMQ_PAIR);
    #if ZMQ_VERSION_MAJOR >= 4
    NODE_DEFINE_CONSTANT(target, ZMQ_STREAM);
    #endif

    NODE_DEFINE_CONSTANT(target, ZMQ_POLLIN);
    NODE_DEFINE_CONSTANT(target, ZMQ_POLLOUT);
    NODE_DEFINE_CONSTANT(target, ZMQ_POLLERR);

    NODE_DEFINE_CONSTANT(target, ZMQ_SNDMORE);
    #if ZMQ_VERSION_MAJOR == 2
    NODE_DEFINE_CONSTANT(target, ZMQ_NOBLOCK);
    #endif

    NODE_DEFINE_CONSTANT(target, STATE_READY);
    NODE_DEFINE_CONSTANT(target, STATE_BUSY);
    NODE_DEFINE_CONSTANT(target, STATE_CLOSED);

    Nan::SetMethod(target, "zmqVersion", ZmqVersion);
    #if ZMQ_VERSION_MAJOR >= 4
    Nan::SetMethod(target, "zmqCurveKeypair", ZmqCurveKeypair);
    #endif

    Context::Initialize(target);
    Socket::Initialize(target);
  }
} // namespace zmq


// module

extern "C" NAN_MODULE_INIT(init) {
  zmq::Initialize(target);
}

NAN_MODULE_WORKER_ENABLED(zmq, init)
