package integration

import (
	"bytes"
	"fmt"
	"mime/multipart"
	"net/http"
	"strings"
	"testing"
	"time"

	"github.com/contiamo/dev/tests/integration/suite"
	uuid "github.com/satori/go.uuid"
)

type Payload suite.Payload

func TestManagedDataSourceCreation(t *testing.T) {
	s := suite.New(t)

	var (
		dsName    = "managed_" + strings.Replace(uuid.NewV4().String(), "-", "_", -1)
		tableName = "managed_" + strings.Replace(uuid.NewV4().String(), "-", "_", -1)
		email     = fmt.Sprintf("lemonjr+managed+%d@example.com", time.Now().Unix())
		dsID      string
		fileID    string
		tableID   string
		userID    string
	)

	s.Run("create non-admin project member", func(s *suite.Suite) {
		path := fmt.Sprintf("/auth/api/v2/tenants/%s/users", s.TenantID)
		payload := Payload{
			"name":            "lemon managed jr",
			"email":           email,
			"tenantId":        s.TenantID,
			"sendInviteEmail": true,
			"password":        "localdev",
		}
		resp := s.Post(path, payload)
		defer resp.Body.Close()

		data := s.ParsePayload(resp.Body)
		s.Require().Equal(http.StatusOK, resp.StatusCode, "payload: %+v", data)

		userID = data["id"].(string)
		s.Require().NotEmpty(userID, "user id not found in payload: %+v", data)
	})

	s.Run("add non-admin user to project", func(s *suite.Suite) {
		path := fmt.Sprintf("/auth/api/v2/tenants/%s/realms/%s/members", s.TenantID, s.ProjectID)
		payload := Payload{"userId": userID}
		resp := s.Post(path, payload)
		defer resp.Body.Close()

		data := s.ParsePayload(resp.Body)
		s.Require().Equal(http.StatusOK, resp.StatusCode, "payload: %+v, resp: %+v", payload, data)
	})

	s.SetCurrentUser(email, "localdev")

	s.Run("create new managed datasource", func(s *suite.Suite) {
		path := fmt.Sprintf("/hub/api/v1/%s/datasources", s.ProjectID)
		payload := Payload{
			"name": dsName,
			"type": "managed",
		}
		resp := s.Post(path, payload)
		defer resp.Body.Close()

		data := s.ParsePayload(resp.Body)
		s.Require().Equal(http.StatusCreated, resp.StatusCode, "payload: %+v", data)
		idVal := s.JqVal(data, ".id")
		s.Require().NotEmpty(idVal, "id is not found in the response")
		var ok bool
		dsID, ok = idVal.(string)
		s.Require().True(ok, "ID must be a string")

		s.Run("check if we can get it by id", func(s *suite.Suite) {
			path := fmt.Sprintf("/hub/api/v1/%s/datasources/%s", s.ProjectID, dsID)
			resp := s.Get(path)
			defer resp.Body.Close()
			s.Require().Equal(http.StatusOK, resp.StatusCode)
			data := s.ParsePayload(resp.Body)
			s.Require().Equal(dsName, s.JqVal(data, `.data.name`))
		})
	})

	s.Run("upload csv file", func(s *suite.Suite) {
		path := fmt.Sprintf("/hub/api/v1/%v/datasources/%v/uploads", s.ProjectID, dsID)

		body := &bytes.Buffer{}
		writer := multipart.NewWriter(body)
		part, err := writer.CreateFormFile("file", "file.csv")
		s.Require().NoError(err)
		part.Write([]byte("foo,bar,baz\na,2,3\nb,2,3\nc,2,3\n"))
		writer.Close()

		request, err := http.NewRequest(http.MethodPost, s.PlatformURL(path), body)
		s.Require().NoError(err)
		request.Header.Add("Content-Type", writer.FormDataContentType())
		resp, err := s.Do(request)
		s.Require().NoError(err)
		defer resp.Body.Close()

		payload := s.ParsePayload(resp.Body)
		s.Require().Equal(http.StatusOK, resp.StatusCode, "payload: %+v", payload)
		var ok bool
		fileID, ok = payload["fileId"].(string)
		s.Require().True(ok, "payload: %+v", payload)
	})

	s.Run("create table", func(s *suite.Suite) {
		path := fmt.Sprintf("/hub/api/v1/%v/datasources/%v/managed-tables", s.ProjectID, dsID)
		resp := s.Post(path, Payload{
			"name":                tableName,
			"defaultRowErrorMode": "drop",
			"defaultInsertMode":   "upsert",
			"schema": Payload{
				"columns": []Payload{
					{"name": "foo", "type": "text", "required": false},
					{"name": "bar", "type": "integer", "required": false},
					{"name": "baz", "type": "integer", "required": false},
				},
			},
		})
		payload := s.ParsePayload(resp.Body)
		s.Require().Equal(http.StatusCreated, resp.StatusCode, "payload: %+v", payload)
		idVal := s.JqVal(payload, `.data.id`)

		var ok bool
		tableID, ok = idVal.(string)
		s.Require().True(ok, "tableIDVal: %+v", idVal)
		s.Require().NotEmpty(tableID, "tableID: %+v", tableID)
	})

	s.Run("start ingestion", func(s *suite.Suite) {
		path := fmt.Sprintf("/hub/api/v1/%v/datasources/%v/tasks", s.ProjectID, dsID)
		resp := s.Post(path, Payload{
			"tableId": tableID,
			"fileId":  fileID,
			"schema": Payload{
				"columns": []Payload{
					{"name": "foo", "type": "text", "required": false},
					{"name": "bar", "type": "integer", "required": false},
					{"name": "baz", "type": "integer", "required": false},
				},
			},
			"options": Payload{
				"preferredDateFormat": "monthFirst",
				"delimiter":           ",",
				"decimalSeparator":    "numericComma",
				"encoding":            "ISO-8859-9",
				"commentCharacter":    "#",
				"hasHeader":           true,
				"ignoreHeader":        false,
				"trimSpaces":          false,
			},
			"insertMode": "upsert",
		})
		payload := s.ParsePayload(resp.Body)
		s.Require().Equal(http.StatusCreated, resp.StatusCode, "payload: %+v", payload)
	})

	s.Run("side effects", func(s *suite.Suite) {
		s.Run("check if we can query the datasource", func(s *suite.Suite) {
			for i := 0; i < 10; i++ {
				path := fmt.Sprintf("/hub/api/v1/%s/query", s.ProjectID)
				resp := s.Post(path, Payload{
					"sql": fmt.Sprintf(`SELECT * FROM "%s"."%s"`, dsName, tableName),
				})
				defer resp.Body.Close()
				data := s.ParsePayload(resp.Body)
				hits := s.Jq(data, `.rows[]`)
				if len(hits) == 3 {
					return
				}
				time.Sleep(1 * time.Second)
			}
			s.Require().True(false, "failed to query data after 10 retries")
		})

		s.Run("check if we can profile the query", func(s *suite.Suite) {
			path := fmt.Sprintf("/hub/api/v1/%s/profile/sql", s.ProjectID)
			resp := s.Post(path, Payload{
				"projectId": s.ProjectID,
				"sql":       fmt.Sprintf(`SELECT foo, bar, baz FROM "%s"."%s"`, dsName, tableName),
			})
			defer resp.Body.Close()
			data := s.ParsePayload(resp.Body)
			columns, ok := data["columns"].(map[string]interface{})
			s.Require().True(ok, "unknown response %+v", data)
			s.Require().Len(columns, 3, "unknown response %+v", data)

			profile, ok := columns["foo"].(map[string]interface{})
			s.Require().True(ok, "unknown response %+v", columns)
			s.Require().Equal(profile["dataType"], "string", "unknown response %+v", profile)

			profile, ok = columns["bar"].(map[string]interface{})
			s.Require().True(ok, "unknown response %+v", columns)
			s.Require().Equal(profile["dataType"], "integer", "unknown response %+v", profile)
		})
	})

	s.Run("delete the table", func(s *suite.Suite) {
		path := fmt.Sprintf("/hub/api/v1/%s/datasources/%s/tables/%s", s.ProjectID, dsID, tableID)
		resp := s.Delete(path)
		defer resp.Body.Close()
		s.Require().Equal(http.StatusNoContent, resp.StatusCode)

		s.Run("check that we can't query the deleted table", func(s *suite.Suite) {
			path := fmt.Sprintf("/hub/api/v1/%s/query", s.ProjectID)
			resp := s.Post(path, Payload{
				"sql": fmt.Sprintf(`SELECT * FROM %s.%s`, dsName, tableName),
			})
			defer resp.Body.Close()
			s.Require().Equal(http.StatusUnprocessableEntity, resp.StatusCode, s.ParsePayload(resp.Body))
		})
	})

	s.Run("delete the datasource", func(s *suite.Suite) {
		path := fmt.Sprintf("/hub/api/v1/%s/datasources/%s", s.ProjectID, dsID)
		resp := s.Delete(path)
		defer resp.Body.Close()
		s.Require().Equal(http.StatusNoContent, resp.StatusCode)
	})

}
