/*******************************************************************************
 * Copyright 2023 Aerospike, Inc.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 ******************************************************************************/

#include "client.h"
#include "async.h"
#include "command.h"
#include "conversions.h"
#include "policy.h"
#include "log.h"
#include "scan.h"

extern "C" {
#include <aerospike/aerospike_scan.h>
#include <aerospike/as_error.h>
#include <aerospike/as_policy.h>
#include <aerospike/as_scan.h>
#include <aerospike/as_partition_filter.h>
#include <aerospike/as_status.h>
}

using namespace v8;

NAN_METHOD(AerospikeClient::ScanPages)
{
	TYPE_CHECK_REQ(info[0], IsString, "Namespace must be a string");
	TYPE_CHECK_OPT(info[1], IsString, "Set must be a string");
	TYPE_CHECK_OPT(info[2], IsObject, "Options must be an object");
	TYPE_CHECK_OPT(info[3], IsObject, "Policy must be an object");
	TYPE_CHECK_OPT(info[4], IsNumber, "Scan_id must be a number");
	TYPE_CHECK_OPT(info[5], IsObject, "saved_scan must be an object");
	TYPE_CHECK_REQ(info[6], IsFunction, "Callback must be a function");

	AerospikeClient *client =
		Nan::ObjectWrap::Unwrap<AerospikeClient>(info.This());
	AsyncCommand *cmd =
		new AsyncCommand("Scan", client, info[6].As<Function>());

	LogInfo *log = client->log;

	uint64_t scan_id = 0;

	as_policy_scan *p_policy = NULL;
	as_policy_scan policy;
	as_partition_filter pf;
	bool pf_defined = false;
	as_status status;

	struct scan_udata* su = (scan_udata*) cf_malloc(sizeof(struct scan_udata));
	su->cmd = cmd;
	su->count = 0;

	if (info[5]->IsObject()) {
		uint32_t bytes_size = 0;
		load_bytes_size(info[5].As<Object>(), &bytes_size, log);
		uint8_t* bytes = new uint8_t[bytes_size];
		load_bytes(info[5].As<Object>(), bytes, bytes_size, log);
		setup_scan_pages(&su->scan, info[0], info[1], Nan::Null(), bytes, bytes_size, log);
		delete [] bytes;
	}
	else{
		setup_scan_pages(&su->scan, info[0], info[1], info[2], NULL, 0, log);
	}



	if (info[3]->IsObject()) {
		if (scanpolicy_from_jsobject(&policy, info[3].As<Object>(), log) !=
			AS_NODE_PARAM_OK) {
			CmdErrorCallback(cmd, AEROSPIKE_ERR_PARAM, "Policy object invalid");
			goto Cleanup;
		}
		p_policy = &policy;
	}

	if (info[4]->IsNumber()) {
		scan_id = Nan::To<int64_t>(info[4]).FromJust();
		as_v8_debug(log, "Using scan ID %lli for async scan.", scan_id);
	}

	
	as_partition_filter_set_all(&pf);
	if (partitions_from_jsobject(&pf, &pf_defined, info[2].As<Object>(), log) !=
		AS_NODE_PARAM_OK) {
		CmdErrorCallback(cmd, AEROSPIKE_ERR_PARAM, "Partitions object invalid");
		goto Cleanup;
	}
	
	su->max_records = p_policy->max_records;
	p_policy->max_records = 0;

	if (pf_defined) {
		as_v8_debug(log, "Sending async scan partitions command");
		status = aerospike_scan_partitions_async(
			client->as, &cmd->err, p_policy, su->scan, &pf, async_scan_pages_listener,
			su, NULL);
	}
	else {
		as_v8_debug(log, "Sending async scan command");
		status = aerospike_scan_async(client->as, &cmd->err, p_policy, su->scan,
									  &scan_id, async_scan_pages_listener, su, NULL);
	}
	if (status == AEROSPIKE_OK) {
		cmd = NULL; // async callback responsible for deleting the command
	}
	else {
		cmd->ErrorCallback();
	}

Cleanup:
	delete cmd;
	if (p_policy && policy.base.filter_exp) {
		as_exp_destroy(policy.base.filter_exp);
	}
}