ll := import("@platforma-sdk/workflow-tengo:ll")
validation := import("@platforma-sdk/workflow-tengo:validation")
objects := import("@platforma-sdk/workflow-tengo:objects")
util := import("@platforma-sdk/workflow-tengo:pframes.util")
pt := import("@platforma-sdk/workflow-tengo:pt")
execConstants := import("@platforma-sdk/workflow-tengo:exec.constants")
maps := import("@platforma-sdk/workflow-tengo:maps")
slices := import("@platforma-sdk/workflow-tengo:slices")
smart := import("@platforma-sdk/workflow-tengo:smart")

getColumnSpec := func(axesSpec, col) {
	spec := maps.deepMerge(col.spec, {
		kind: "PColumn",
		axesSpec: axesSpec
	})
	return objects.deleteUndefined(spec)
}

applyPreProcessSteps := func(colExpr, preProcessSteps) {
	result := colExpr
	if !is_undefined(preProcessSteps) {
		for step in preProcessSteps {
			if step.type == "regexpExtract" {
				result = result.extractEcmaRegex(step.pattern)
			} else if step.type == "regexpReplace" {
				result = result.replaceEcmaRegex(step.pattern, step.replacement)
			} else {
				ll.panic("unknown preProcess step type: %v", step.type)
			}
		}
	}
	return result
}

applyNaRegex := func(colExpr, naRegex, colType) {
	if is_undefined(naRegex) {
		return colExpr.cast(colType)
	}
	return pt.when(colExpr.matchesEcmaRegex(naRegex)).
		then(pt.lit(undefined).cast(colType)).
		otherwise(colExpr.cast(colType))
}


















importFileParquet := func(xsvFile, xsvType, spec, ops) {
	util.validateXsvType(xsvType)

	validation.assertType(spec, util.PFCONV_IMPORT_CFG_SCHEMA)
	if xsvType == "parquet" {
		ll.assert(is_undefined(spec.separator), "separator is not supported with xsvType 'parquet'")
		ll.assert(is_undefined(spec.commentLinePrefix), "commentLinePrefix is not supported with xsvType 'parquet'")
		ll.assert(is_undefined(spec.skipEmptyLines), "skipEmptyLines is not supported with xsvType 'parquet'")
	}


	ll.assert(is_undefined(spec.index), "index axis is not supported with Parquet storage format")

	ll.assert(is_undefined(spec.allowArtificialColumns), "allowArtificialColumns are not supported with Parquet storage format")

	if !is_undefined(spec.separator) {
		expectedSeparator := ","
		if xsvType == "tsv" {
			expectedSeparator = "\t"
		}
		ll.assert(spec.separator == expectedSeparator,
			"separator does not match xsvType %v, expected '%v', got '%v'",
			xsvType, expectedSeparator, spec.separator)
	}

	wf := pt.workflow()
	if !is_undefined(ops.queue) {
		if ops.queue == execConstants.HEAVY_QUEUE {
			wf.inHeavyQueue()
		} else if ops.queue == execConstants.MEDIUM_QUEUE {
			wf.inMediumQueue()
		} else if ops.queue == execConstants.LIGHT_QUEUE {
			wf.inLightQueue()
		} else if ops.queue == execConstants.UI_TASKS_QUEUE {
			wf.inUiQueue()
		} else {
			ll.panic("Unsupported queue: %v", ops.queue)
		}
	}
	if !is_undefined(ops.cpu) {
		wf.cpu(ops.cpu)
	}
	if !is_undefined(ops.mem) {
		wf.mem(ops.mem)
	}
	if !is_undefined(ops.inputCache) {
		wf.cacheInputs(ops.inputCache)
	}

	schema := []
	for ax in spec.axes {
		schema = append(schema, {
			column: ax.column,
			type: "String"
		})
	}
	for col in spec.columns {
		schema = append(schema, {
			column: col.column,
			type: "String"
		})
	}
	df := wf.frame(xsvFile, {
		xsvType: xsvType,
		ignoreErrors: spec.skipEmptyLines == true,
		commentPrefix: spec.commentLinePrefix,
		schema: schema
	})

	allColumns := []
	for ax in spec.axes {
		allColumns = append(allColumns, ax.column)
	}
	for col in spec.columns {
		allColumns = append(allColumns, col.column)
	}
	df = df.select(allColumns...)

	filterConditions := []
	for ax in spec.axes {
		if !is_undefined(ax.filterOutRegex) {
			filterConditions = append(filterConditions, pt.col(ax.column).matchesEcmaRegex(ax.filterOutRegex).not())
		}
	}
	for col in spec.columns {
		if !is_undefined(col.filterOutRegex) {
			filterConditions = append(filterConditions, pt.col(col.column).matchesEcmaRegex(col.filterOutRegex).not())
		}
	}
	if len(filterConditions) > 0 {
		df = df.filter(pt.and(filterConditions...))
	}

	projection := []
	for ax in spec.axes {
		colExpr := pt.col(ax.column)
		colExpr = applyPreProcessSteps(colExpr, ax.preProcess)
		colExpr = applyNaRegex(colExpr, ax.naRegex, ax.spec.type)
		projection = append(projection, colExpr.alias(ax.column))
	}
	for col in spec.columns {
		colExpr := pt.col(col.column)
		colExpr = applyPreProcessSteps(colExpr, col.preProcess)
		colExpr = applyNaRegex(colExpr, col.naRegex, col.spec.valueType)
		projection = append(projection, colExpr.alias(col.column))
	}
	df = df.withColumns(projection...)

	notNullConditions := []


	for col in spec.columns {
		if col.allowNA == false {

			notNullConditions = append(notNullConditions, pt.col(col.column).isNotNull())
		}
	}
	if len(notNullConditions) > 0 {
		df = df.filter(pt.and(notNullConditions...))
	}

	saveParams := {
		axes: [],
		columns: [],
		partitionKeyLength: 0
	}
	if !is_undefined(spec.partitionKeyLength) {
		saveParams.partitionKeyLength = spec.partitionKeyLength
	}
	for ax in spec.axes {
		saveParams.axes = append(saveParams.axes, {
			column: ax.column,
			spec: maps.deepMerge({ name: ax.column }, ax.spec)
		})
	}
	for col in spec.columns {
		saveParams.columns = append(saveParams.columns, {
			column: col.column,
			spec: maps.deepMerge({ name: col.column }, col.spec)
		})
	}

	frameName := "xsv_import_output"
	df.saveFrameDirect(frameName, saveParams)
	pf := wf.run().getFrameDirect(frameName)

	if ops.dataOnly {
		pfd := smart.mapBuilder()
		for col in spec.columns {
			colId := util.xsvColumnId(col)
			colData := pf.getFutureInputField(col.column + ".data")
			pfd.createInputField(colId).set(colData)
		}
		return pfd.lockAndBuild()
	}

	result := {}
	axesSpec := slices.map(spec.axes, func(ax) { return ax.spec })
	for col in spec.columns {
		colId := util.xsvColumnId(col)
		colSpec := getColumnSpec(axesSpec, col)
		colData := pf.getFutureInputField(col.column + ".data")
		if ops.splitDataAndSpec {
			result[colId] = {
				spec: colSpec,
				data: colData
			}
		} else {
			result[colId + ".spec"] = colSpec
			result[colId + ".data"] = colData
		}
	}
	return result
}

export ll.toStrict({
	importFileParquet: importFileParquet
})
