#!/usr/bin/env python3
#
#
# Run librdkafka regression tests on with different SASL parameters
# and broker verisons.
#
# Requires:
#  trivup python module
#  gradle in your PATH

from cluster_testing import (
    LibrdkafkaTestCluster,
    print_report_summary,
    read_scenario_conf)
from LibrdkafkaTestApp import LibrdkafkaTestApp

import subprocess
import tempfile
import os
import sys
import argparse
import json


def test_it(version, deploy=True, conf={}, rdkconf={}, tests=None,
            interact=False, debug=False, scenario="default"):
    """
    @brief Create, deploy and start a Kafka cluster using Kafka \\p version
    Then run librdkafka's regression tests.
    """

    cluster = LibrdkafkaTestCluster(version, conf,
                                    num_brokers=int(conf.get('broker_cnt', 3)),
                                    debug=debug, scenario=scenario)

    # librdkafka's regression tests, as an App.
    _rdkconf = conf.copy()  # Base rdkconf on cluster conf + rdkconf
    _rdkconf.update(rdkconf)
    rdkafka = LibrdkafkaTestApp(cluster, version, _rdkconf, tests=tests,
                                scenario=scenario)
    rdkafka.do_cleanup = False

    if deploy:
        cluster.deploy()

    cluster.start(timeout=30)

    if conf.get('test_mode', '') == 'bash':
        cmd = 'bash --rcfile <(cat ~/.bashrc; echo \'PS1="[TRIVUP:%s@%s] \\u@\\h:\\w$ "\')' % (  # noqa: E501
            cluster.name, version)
        subprocess.call(
            cmd,
            env=rdkafka.env,
            shell=True,
            executable='/bin/bash')
        report = None

    else:
        rdkafka.start()
        print(
            '# librdkafka regression tests started, logs in %s' %
            rdkafka.root_path())
        rdkafka.wait_stopped(timeout=60 * 30)

        report = rdkafka.report()
        report['root_path'] = rdkafka.root_path()

        if report.get('tests_failed', 0) > 0 and interact:
            print(
                '# Connect to cluster with bootstrap.servers %s' %
                cluster.bootstrap_servers())
            print('# Exiting the shell will bring down the cluster. '
                  'Good luck.')
            subprocess.call(
                'bash --rcfile <(cat ~/.bashrc; echo \'PS1="[TRIVUP:%s@%s] \\u@\\h:\\w$ "\')' %  # noqa: E501
                (cluster.name, version), env=rdkafka.env, shell=True,
                executable='/bin/bash')

    cluster.stop(force=True)

    cluster.cleanup()
    return report


def handle_report(report, version, suite):
    """ Parse test report and return tuple (Passed(bool), Reason(str)) """
    test_cnt = report.get('tests_run', 0)

    if test_cnt == 0:
        return (False, 'No tests run')

    passed = report.get('tests_passed', 0)
    failed = report.get('tests_failed', 0)
    if 'all' in suite.get('expect_fail', []) or version in suite.get(
            'expect_fail', []):
        expect_fail = True
    else:
        expect_fail = False

    if expect_fail:
        if failed == test_cnt:
            return (True, 'All %d/%d tests failed as expected' %
                    (failed, test_cnt))
        else:
            return (False, '%d/%d tests failed: expected all to fail' %
                    (failed, test_cnt))
    else:
        if failed > 0:
            return (False, '%d/%d tests passed: expected all to pass' %
                    (passed, test_cnt))
        else:
            return (True, 'All %d/%d tests passed as expected' %
                    (passed, test_cnt))


if __name__ == '__main__':

    parser = argparse.ArgumentParser(
        description='Run librdkafka tests on a range of broker versions')

    parser.add_argument('--debug', action='store_true', default=False,
                        help='Enable trivup debugging')
    parser.add_argument('--conf', type=str, dest='conf', default=None,
                        help='trivup JSON config object (not file)')
    parser.add_argument('--rdkconf', type=str, dest='rdkconf', default=None,
                        help='trivup JSON config object (not file) '
                        'for LibrdkafkaTestApp')
    parser.add_argument('--scenario', type=str, dest='scenario',
                        default='default',
                        help='Test scenario (see scenarios/ directory)')
    parser.add_argument('--tests', type=str, dest='tests', default=None,
                        help='Test to run (e.g., "0002")')
    parser.add_argument('--report', type=str, dest='report', default=None,
                        help='Write test suites report to this filename')
    parser.add_argument('--interact', action='store_true', dest='interact',
                        default=False,
                        help='On test failure start a shell before bringing '
                        'the cluster down.')
    parser.add_argument('versions', type=str, nargs='*',
                        default=['0.8.1.1', '0.8.2.2', '0.9.0.1', '2.3.0'],
                        help='Broker versions to test')
    parser.add_argument('--interactive', action='store_true',
                        dest='interactive',
                        default=False,
                        help='Start a shell instead of running tests')
    parser.add_argument(
        '--root',
        type=str,
        default=os.environ.get(
            'TRIVUP_ROOT',
            'tmp'),
        help='Root working directory')
    parser.add_argument(
        '--port',
        default=None,
        help='Base TCP port to start allocating from')
    parser.add_argument(
        '--kafka-src',
        dest='kafka_path',
        type=str,
        default=None,
        help='Path to Kafka git repo checkout (used for version=trunk)')
    parser.add_argument(
        '--brokers',
        dest='broker_cnt',
        type=int,
        default=3,
        help='Number of Kafka brokers')
    parser.add_argument('--ssl', dest='ssl', action='store_true',
                        default=False,
                        help='Enable SSL endpoints')
    parser.add_argument(
        '--sasl',
        dest='sasl',
        type=str,
        default=None,
        help='SASL mechanism (PLAIN, GSSAPI)')

    args = parser.parse_args()

    conf = dict()
    rdkconf = dict()

    if args.conf is not None:
        args.conf = json.loads(args.conf)
    else:
        args.conf = {}

    if args.port is not None:
        args.conf['port_base'] = int(args.port)
    if args.kafka_path is not None:
        args.conf['kafka_path'] = args.kafka_path
    if args.ssl:
        args.conf['security.protocol'] = 'SSL'
    if args.sasl:
        if args.sasl == 'PLAIN' and 'sasl_users' not in args.conf:
            args.conf['sasl_users'] = 'testuser=testpass'
        args.conf['sasl_mechanisms'] = args.sasl
        args.conf['sasl_servicename'] = 'kafka'
    if args.interactive:
        args.conf['test_mode'] = 'bash'
    args.conf['broker_cnt'] = args.broker_cnt

    conf.update(args.conf)
    if args.rdkconf is not None:
        rdkconf.update(json.loads(args.rdkconf))

    conf.update(read_scenario_conf(args.scenario))

    if args.tests is not None:
        tests = args.tests.split(',')
    elif 'tests' in conf:
        tests = conf.get('tests', '').split(',')
    else:
        tests = None

    # Test version + suite matrix
    if 'versions' in conf:
        versions = conf.get('versions')
    else:
        versions = args.versions
    suites = [{'name': 'standard'}]

    pass_cnt = 0
    fail_cnt = 0
    for version in versions:
        for suite in suites:
            _conf = conf.copy()
            _conf.update(suite.get('conf', {}))
            _rdkconf = rdkconf.copy()
            _rdkconf.update(suite.get('rdkconf', {}))

            if 'version' not in suite:
                suite['version'] = dict()

            # Run tests
            print('#### Version %s, suite %s, scenario %s: STARTING' %
                  (version, suite['name'], args.scenario))
            report = test_it(version, tests=tests, conf=_conf,
                             rdkconf=_rdkconf,
                             interact=args.interact, debug=args.debug,
                             scenario=args.scenario)

            if not report:
                continue

            # Handle test report
            report['version'] = version
            passed, reason = handle_report(report, version, suite)
            report['PASSED'] = passed
            report['REASON'] = reason

            if passed:
                print('\033[42m#### Version %s, suite %s: PASSED: %s\033[0m' %
                      (version, suite['name'], reason))
                pass_cnt += 1
            else:
                print('\033[41m#### Version %s, suite %s: FAILED: %s\033[0m' %
                      (version, suite['name'], reason))
                fail_cnt += 1

                # Emit hopefully relevant parts of the log on failure
                subprocess.call(
                    "grep --color=always -B100 -A10 FAIL %s" %
                    (os.path.join(
                        report['root_path'],
                        'stderr.log')),
                    shell=True)

            print('#### Test output: %s/stderr.log' % (report['root_path']))

            suite['version'][version] = report

    # Write test suite report JSON file
    if args.report is not None:
        test_suite_report_file = args.report
        f = open(test_suite_report_file, 'w')
    else:
        fd, test_suite_report_file = tempfile.mkstemp(prefix='test_suite_',
                                                      suffix='.json',
                                                      dir='.')
        f = os.fdopen(fd, 'w')

    full_report = {'suites': suites, 'pass_cnt': pass_cnt,
                   'fail_cnt': fail_cnt, 'total_cnt': pass_cnt + fail_cnt}

    f.write(json.dumps(full_report))
    f.close()

    print('\n\n\n')
    print_report_summary(full_report)
    print('#### Full test suites report in: %s' % test_suite_report_file)

    if pass_cnt == 0 or fail_cnt > 0:
        sys.exit(1)
    else:
        sys.exit(0)
