/**
 * File:   SessionSynchronizeWorker.cpp
 * Author: Alexander Ksenofontov <aksenofo@yahoo.ru>
 *
 * Created on August 15, 2016, 14:13 PM
 */

#include <iostream>
#include <sstream>
#include <algorithm>
#include <libtorrent/add_torrent_params.hpp>
#include <libtorrent/torrent_handle.hpp>
#include <libtorrent/alert_types.hpp>
#include <libtorrent/bencode.hpp>
#include <libtorrent/torrent_status.hpp>
#include <libtorrent/torrent_info.hpp>
#include "libtorrent/announce_entry.hpp"
#include "libtorrent/bdecode.hpp"
#include "libtorrent/magnet_uri.hpp"
#include <libtorrent/session.hpp>
#include "tohaux.h"
#include "synchronize_worker.h"
#include "environment.h"
#include "HTTPClient.h"

using namespace Nan;
using namespace v8;
using namespace libtorrent;
namespace lt = libtorrent;

SynchronizeWorker::SynchronizeWorker(const v8::Persistent<v8::Object, 
                                                   v8::CopyablePersistentTraits<v8::Object>>& owner,
                                                   Nan::Callback *periodical, 
                                                   Nan::Callback *finished, 
                                                   const Session::remote_info& ri)
: AsyncProgressWorker(finished)
, owner_(owner)
, ri_(ri)
, error_(0)
, periodical_(periodical) {
}

SynchronizeWorker::~SynchronizeWorker() {
}

void SynchronizeWorker::Execute (const Nan::AsyncProgressWorker::ExecutionProgress& progress) {
    message_listener ml;

    try {
	// 1.Create directory
	auto full_path(merge_path(ri_.local_root_, HIDDEN_DIR));

	auto rc(create_directory_recursively(full_path));
	if(rc != 0) {
	    error_ = errno;
	    return;
	}
	
	auto buff(new data_pulse);
	buff->protocol_ = "PREPARING";
	buff->progress_ = 1;
	send_progress(progress, buff);

	// Load existing items
	int64_t total(0);
	try {
	    total = load_dir_list();
	}
	catch(...) {
	    // File is currupted or does not exist
	    std::string http_dest(merge_path(ri_.local_root_, HTTP_FILE_DESCR));
	    sync_file_with_server(&ml, merge_path(ri_.http_url_, HTTP_FILE_DESCR_NAME), http_dest);
	    total = load_dir_list();
	}

	uint64_t failed_tot(0);
	auto failed_list(detect_failed(progress, failed_tot));
	total_ = total;

	if(!failed_list->empty()) {
		long double dtot((long double)total);
		long double dfailed((long double)failed_tot);

		if (dfailed / dtot > .5) {
			try {
				TorrentSync(progress);
			}
			catch (const std::exception & e) {
				std::cout << e.what() << std::endl;
				HTTPsync(progress, failed_list);
			}
		}
	    else
		HTTPsync(progress, failed_list);
		// Verify list
	}

    }
    catch (const exit_exeption&) {
	// Exit has been intialized by outside
    }
    catch (std::exception e) {
		std::cout << e.what() << std::endl;
		//TOD: report exeption
    }
}

void SynchronizeWorker::HandleProgressCallback(const char *data, size_t size) {
    Nan::HandleScope scope;
	if (data) {

		const data_pulse** pbuff((const data_pulse**)(data));
		v8::Local<v8::Value> argv[] = {
			Nan::New<String>((*pbuff)->protocol_).ToLocalChecked(),
			Nan::New<Number>((*pbuff)->progress_),
			Nan::New<String>((*pbuff)->file_).ToLocalChecked(),
		};
		periodical_->Call(3, argv);
		delete *pbuff;
	}
}

void SynchronizeWorker::HandleOKCallback() {
	Nan::HandleScope scope;
    v8::Local<v8::Value> argv[] = {
		Nan::New(owner_),
		Nan::New<v8::Integer>(error_)
    };
    callback->Call(2, argv);
}

// Load item.
int64_t SynchronizeWorker::load_dir_list() {
    std::string http_dest(merge_path(ri_.local_root_, HTTP_FILE_DESCR));
    try {
		int64_t total(0);
		std::ifstream is;
		is.exceptions(std::ofstream::failbit | std::ofstream::badbit);
		is.open(http_dest.c_str(), std::ifstream::binary);
		items_ = convert(is, total);
		return total;
    }
    catch (...) { // Any error should be treated as invalid file
		items_.reset();
		throw;
    }
}

std::shared_ptr<std::vector<std::tuple<std::string, uint64_t, std::string>>>
SynchronizeWorker::detect_failed(const Nan::AsyncProgressWorker::ExecutionProgress& progress, 
                 uint64_t& fail_total) {
	
	uint64_t total(0);
	uint64_t cur(0);

	std::for_each(items_->begin(), items_->end(), 
		[&](const std::tuple<std::string, uint64_t, std::string >& b) { 
			total += std::get<1>(b); });

    fail_total = 0;
    std::shared_ptr<std::vector<std::tuple<std::string, uint64_t, std::string>>> 
	rc(new std::vector<std::tuple<std::string, uint64_t, std::string>>);
    std::for_each(items_->begin(), items_->end(), [&](const std::tuple<std::string, uint64_t, std::string>& item){
		if(!cmp_file_with_description(ri_.local_root_, item)) {
			rc->push_back(item);
			fail_total += std::get<1>(item);
		}
		cur += std::get<1>(item);

		auto buff(new data_pulse);
		buff->protocol_ = "FAIL_LIST_LOAD";
		buff->progress_ = double(cur)/double(total);
		buff->file_ = std::get<0>(item);
		send_progress(progress, buff);

	});
    return rc;
}

std::shared_ptr<std::vector<std::tuple<std::string, uint64_t, std::string>>>
SynchronizeWorker::HTTPsync(const Nan::AsyncProgressWorker::ExecutionProgress& progress, 
		std::shared_ptr<std::vector<std::tuple<std::string, uint64_t, std::string>>>& items) {
    message_listener ml;

    std::shared_ptr<std::vector<std::tuple<std::string, uint64_t, std::string>>> 
		failed(new std::vector<std::tuple<std::string, uint64_t, std::string>>);
	uint64_t tot_failed(0);
	// Calulate total fialed size;
	std::for_each(items->begin(), items->end(), [&](const std::tuple<std::string, uint64_t, std::string>& item) {
		tot_failed += std::get<1>(item);
	});

	uint64_t tot_progr(total_ - tot_failed);

	std::for_each(items->begin(), items->end(), [&](const std::tuple<std::string, uint64_t, std::string>& item){
		auto local_path(merge_path(ri_.local_root_, std::get<0>(item)));
		auto remote_path(merge_path(ri_.http_url_, std::get<0>(item)));
		try {
			sync_file_with_server(&ml, remote_path, local_path);
			auto buff(new data_pulse);
			buff->protocol_ = "HTTP";
			buff->file_ = std::get<0>(item);
			tot_progr += std::get<1>(item);
			buff->progress_ = (double)tot_progr / (double)total_;
			send_progress(progress, buff);
		}
		catch (...) {
			failed->push_back(item);
		}
    });

    return failed;
}

void SynchronizeWorker::send_progress(const Nan::AsyncProgressWorker::ExecutionProgress& progress, 
	                                     SynchronizeWorker::data_pulse* dp) {
	progress.Send((const char*)&dp, sizeof(&dp));
}

void SynchronizeWorker::TorrentSync(const Nan::AsyncProgressWorker::ExecutionProgress& progress) {
    message_listener ml;
    auto torrent_dest(merge_path(ri_.local_root_, TORR_FILE_DESCR));
    auto resume_name(merge_path(ri_.local_root_, TORR_RESUME_FILE));

	// Fetch torrent file
    sync_file_with_server(&ml, ri_.torrent_url_, torrent_dest);

    lt::add_torrent_params atp;
    std::ifstream ifs(resume_name, std::ios_base::binary);
    if (ifs.good()) {
		ifs.unsetf(std::ios_base::skipws);
		atp.resume_data.assign(std::istream_iterator<char>(ifs), std::istream_iterator<char>());
    }
	
	boost::shared_ptr<torrent_info> ti(new torrent_info(torrent_dest));
	atp.ti = ti;
	// Torrent root shell point on parent directory
	boost::filesystem::path p(ri_.local_root_);
	atp.save_path = p.parent_path().string(); // Save in current dir

	auto hash_id(torrent_session->add(atp));
	try {
		while (torrent_session->wait_for_event(hash_id, [&](const info_t& info) {
			const auto sti(std::get<1>(info));
	
			auto buff(new data_pulse);
			buff->protocol_ = "TORRENT";
			buff->progress_ = sti->progress_;
			send_progress(progress, buff);
			if (sti->abort_timeout_)
				throw std::runtime_error("Torrent timeout aborted.");
			// Call node 
			return !sti->done_;
		}));
		torrent_session->remove(hash_id);
	}
	catch (...) {
		torrent_session->remove(hash_id);
		throw;
	}
}
