



ll := import("@platforma-sdk/workflow-tengo:ll")
maps := import("@platforma-sdk/workflow-tengo:maps")
smart := import("@platforma-sdk/workflow-tengo:smart")
slices := import("@platforma-sdk/workflow-tengo:slices")
sets := import("@platforma-sdk/workflow-tengo:sets")
assets := import("@platforma-sdk/workflow-tengo:assets")
render := import("@platforma-sdk/workflow-tengo:render")
builder := import("@platforma-sdk/workflow-tengo:pframes.builder")
pUtil := import("@platforma-sdk/workflow-tengo:pframes.util")
pSpec := import("@platforma-sdk/workflow-tengo:pframes.spec")
validation := import("@platforma-sdk/workflow-tengo:validation")
constants := import("@platforma-sdk/workflow-tengo:pframes.constants")
xsvBuilder := import("@platforma-sdk/workflow-tengo:pframes.xsv-builder")
tableBuilderMod := import("@platforma-sdk/workflow-tengo:pframes.table-builder")
pColumnData := import("@platforma-sdk/workflow-tengo:pframes.data")
pCommon := import("@platforma-sdk/workflow-tengo:pframes.process-column-common")
pColumnBatch := import("@platforma-sdk/workflow-tengo:pframes.process-column-batch")

llProcessTpl := assets.importTemplate("@platforma-sdk/workflow-tengo:pframes.process-pcolumn-data")


BASIC_TSV_COLUMN_SPEC_SCHEMA := {
	"valueType": "string,regex=Int|Long|Float|Double|String",
	"name,?": "string",
	"domain,?": { "any": "string" },
	"contextDomain,?": { "any": "string" },
	"annotations,?": { "any": "string" }
}


BASIC_TSV_COLUMN_SPEC_SCHEMA_STRING := {
	"valueType": "string,regex=String",
	"name,?": "string",
	"domain,?": { "any": "string" },
	"contextDomain,?": { "any": "string" },
	"annotations,?": { "any": "string" }
}


BASIC_COLUMN_INFO_SCHEMA := {
	"__options__,closed": "",
	"column": "string",
	"id,?": "string",
	"type": "string,regex=Int|Long|Float|Double|String"
}


TSV_CONTENT_SETTINGS_SCHEMA := {
	"__options__,closed": "",
	"axes": [{
		"__options__,closed": "",
		"column": "string",
		"spec": {
			"__options__,closed": "",
			"type": "string,regex=Int|Long|Float|Double|String",
			"name,?": "string",
			"domain,?": { "any": "string" },
			"contextDomain,?": { "any": "string" },
			"annotations,?": { "any": "string" }
		}
	}],
	"columns": [["or",
		{
			"__options__,closed": "",
			"kind": "string,regex=column",
			"column": "string",
			"id,?": "string",
			"spec": BASIC_TSV_COLUMN_SPEC_SCHEMA
		},
		{
			"__options__,closed": "",
			"kind": "string,regex=line",
			"id": "string",
			"spec": BASIC_TSV_COLUMN_SPEC_SCHEMA_STRING
		},
		{
			"__options__,closed": "",
			"kind": "string,regex=json-line",
			"id": "string",
			"spec": BASIC_TSV_COLUMN_SPEC_SCHEMA_STRING,
			"columns": [BASIC_COLUMN_INFO_SCHEMA]
		},
		{
			"__options__,closed": "",
			"column": "string",
			"id,?": "string",
			"spec": BASIC_TSV_COLUMN_SPEC_SCHEMA
		}
	]]
}


getTsvColumnId := func(column) {
	kind := column.kind
	if is_undefined(kind) {
		kind = "column"
	}

	if kind == "column" {
		columnId := column.id
		if is_undefined(columnId) {
			columnId = column.column
		}
		return columnId
	} else if kind == "line" || kind == "json-line" {
		return column.id
	} else {
		ll.panic("Unknown column kind: %s", kind)
	}
}


transformTsvSettings := func(settings) {
	return {
		axes: slices.map(settings.axes, func(axis) {
			return {
				name: axis.column,
				type: axis.spec.type
			}
		}),
		columns: slices.map(settings.columns, func(column) {

			kind := column.kind
			if is_undefined(kind) {
				kind = "column"
			}

			if kind == "column" {
				transformedColumn := {
					kind: "column",
					name: column.column,
					type: column.spec.valueType
				}
				if !is_undefined(column.id) {
					transformedColumn.id = column.id
				}
				return transformedColumn
			} else if kind == "line" {
				return {
					kind: "line",
					id: column.id
				}
			} else if kind == "json-line" {
				return {
					kind: "json-line",
					id: column.id,
					columns: slices.map(column.columns, func(jsonCol) {
						transformedCol := {
							name: jsonCol.column,
							type: jsonCol.type
						}
						if !is_undefined(jsonCol.id) {
							transformedCol.id = jsonCol.id
						}
						return transformedCol
					})
				}
			} else {
				ll.panic("Unknown column kind: %s", kind)
			}
		})
	}
}





exportFrame := func(pf) {
	ll.assert(!is_undefined(pf), "pframe reference is required")
	return render.createEphemeral(assets.importTemplate("@platforma-sdk/workflow-tengo:pframes.export-pframe"), { pf: pf }).output("result")
}




exportColumnData := func(data) {
	return render.createEphemeral(assets.importTemplate("@platforma-sdk/workflow-tengo:pframes.export-single-pcolumn"), { data: data }).output("result")
}






















































parseToJson := func(targetContent, params) {
	validation.assertType(params, TSV_CONTENT_SETTINGS_SCHEMA, "Invalid TsvContent settings")


	transformedSettings := transformTsvSettings(params)


	input := undefined
	baseAxesSpec := []

	if is_string(targetContent) || smart.isReference(targetContent) {

		input = targetContent
	} else if is_map(targetContent) && !is_undefined(targetContent.data) && !is_undefined(targetContent.spec) {

		input = targetContent.data
		validation.assertType(targetContent.spec, pSpec.P_COLUMN_SPEC_SCHEMA)
		baseAxesSpec = slices.normalize(targetContent.spec.axesSpec)
	} else {
		ll.panic("targetContent must be a string, resource reference, or {data: resource, spec: PColumnSpec}")
	}


	axesSpec := copy(baseAxesSpec)
	for axis in params.axes {
		axesSpec = append(axesSpec, axis.spec)
	}


	outputSpecs := {}
	for column in params.columns {
		columnId := getTsvColumnId(column)
		spec := maps.merge(column.spec, { axesSpec: axesSpec, kind: "PColumn" })
		outputSpecs[columnId] = spec
	}


	parseToJsonTpl := assets.importTemplate("@platforma-sdk/workflow-tengo:pframes.parse-to-json")
	renderResult := render.create(parseToJsonTpl, {
		params: smart.createJsonResource(transformedSettings),
		input: input
	})


	self := undefined
	self = ll.toStrict({






		output: func(columnId) {
			spec := outputSpecs[columnId]
			if is_undefined(spec) {
				ll.panic("unknown column: " + columnId)
			}
			return {
				data: renderResult.resolveOutput(["result", columnId]),
				spec: spec
			}
		},







		outputData: func(columnId) {
			if is_undefined(outputSpecs[columnId]) {
				ll.panic("unknown column: " + columnId)
			}
			return renderResult.resolveOutput(["result", columnId])
		},







		outputSpec: func(columnId) {
			spec := outputSpecs[columnId]
			if is_undefined(spec) {
				ll.panic("unknown column: " + columnId)
			}
			return spec
		},






		listOutputs: func() {
			return maps.getKeys(outputSpecs)
		},








		addAllOutputsToBuilder: func(builder, ...separator) {
			maps.forEach(outputSpecs, func(columnId, spec) {
				builder.add(
					columnId,
					spec,
					renderResult.resolveOutput(["result", columnId])
				)
			})

			return builder
		}
	})

	return self
}





pColumnResourceMapDataBuilder := func(keyLength) {
	ll.panic("pframes.pColumnResourceMapDataBuilder is not supported any more; use pcolumn.resourceMapBuilder")
}

__primitiveValueTypes := { "Int": true, "Long": true, "Float": true, "Double": true, "String": true, "Bytes": true }






















































































































































































































processColumn := func(input, bodyTpl, outputs, ...opts) {
	if len(opts) == 0 {
		opts = {}
	} else if len(opts) == 1 {
		opts = opts[0]
	} else {
		ll.panic("too many arguments")
	}


	isBatchMode := is_map(input) && !is_undefined(input.primary)
	if isBatchMode {
		return pColumnBatch.processColumnBatch(input, bodyTpl, outputs, opts)
	}


	if is_undefined(input.spec) {
		ll.panic("no input spec provided")
	}

	if !smart.isReference(input.data) {
		ll.panic("input data should be a reference to a field or resource")
	}

	validation.assertType(input.spec, pSpec.P_COLUMN_SPEC_SCHEMA)

	if is_undefined(opts.passKey) {
		opts.passKey = false
	}

	if is_undefined(opts.passAggregationAxesSpec) {
		opts.passAggregationAxesSpec = false
	}

	if is_undefined(opts.passAggregationAxesNames) {
		opts.passAggregationAxesNames = false
	}

	if is_undefined(opts.passDistilledAggregationSpec) {
		opts.passDistilledAggregationSpec = false
	}

	if is_undefined(opts.isEphemeral) {
		opts.isEphemeral = false
	}

	if (opts.passAggregationAxesSpec || opts.passAggregationAxesNames || opts.passDistilledAggregationSpec) && is_undefined(opts.aggregate) {
		ll.panic("can't use passAggregationAxesSpec, passAggregationAxesNames or passDistilledAggregationSpec without aggregate")
	}

	processTemplateParams := {
		eph: opts.isEphemeral,
		passKey: opts.passKey
	}

	if !is_undefined(opts.stepCache) && opts.stepCache > 0 {
		processTemplateParams.stepCache = opts.stepCache
	}

	if !is_undefined(opts.cpu) {
		processTemplateParams.cpu = opts.cpu
	}
	if !is_undefined(opts.mem) {
		processTemplateParams.mem = opts.mem
	}
	if !is_undefined(opts.queue) {
		processTemplateParams.queue = opts.queue
	}



	isPotentiallyPartitioned := __primitiveValueTypes[input.spec.valueType]

	extra := opts.extra
	if is_undefined(extra) {
		extra = {}
	}

	metaExtra := opts.metaExtra
	if is_undefined(metaExtra) {
		metaExtra = {}
	}

	iterationAxesSpec := copy(input.spec.axesSpec)


	if !isPotentiallyPartitioned {
		processTemplateParams.expectedKeyLength = len(input.spec.axesSpec)
	}

	if !is_undefined(opts.aggregate) {
		aggregationIndices := pSpec.matchAxes(input.spec.axesSpec, opts.aggregate)
		processTemplateParams.aggregationIndices = aggregationIndices


		anonymizationIndices := []
		anonymizedMatchers := slices.filter(opts.aggregate, func(m) { return is_map(m) && m.anonymize })

		nonAnonymizedAggregationIndices := undefined

		if len(anonymizedMatchers) > 0 {
			anonymizedAxesIndices := pSpec.matchAxes(input.spec.axesSpec, anonymizedMatchers)
			anonymizedIndicesSet := sets.fromSlice(anonymizedAxesIndices)

			nonAnonymizedAggregationIndices = []
			for i, aggIdx in aggregationIndices {
				if anonymizedIndicesSet[aggIdx] {
					anonymizationIndices = append(anonymizationIndices, i)
				} else {
					nonAnonymizedAggregationIndices = append(nonAnonymizedAggregationIndices, aggIdx)
				}
			}

			if len(anonymizationIndices) > 0 {
				processTemplateParams.anonymizationIndices = anonymizationIndices
			}
		} else {
			nonAnonymizedAggregationIndices = aggregationIndices
		}



		groupAxesIndices := pUtil.calculateGroupAxesIndices(len(input.spec.axesSpec), aggregationIndices)
		iterationAxesSpec = []
		for i in groupAxesIndices {
			iterationAxesSpec = append(iterationAxesSpec, input.spec.axesSpec[i])
		}

		if opts.passAggregationAxesSpec {
			aggregationAxesSpec := []
			for i in nonAnonymizedAggregationIndices {
				aggregationAxesSpec = append(aggregationAxesSpec, input.spec.axesSpec[i])
			}

			extra[constants.AGGREGATION_AXES_SPEC_FIELD_NAME] = smart.createJsonResource(aggregationAxesSpec)
		}

		if opts.passAggregationAxesNames {
			aggregationAxesNames := []
			for i in nonAnonymizedAggregationIndices {
				aggregationAxesNames = append(aggregationAxesNames, input.spec.axesSpec[i].name)
			}

			extra[constants.AGGREGATION_AXES_NAMES_FIELD_NAME] = smart.createJsonResource(aggregationAxesNames)
		}

		if opts.passDistilledAggregationSpec {
			distiller := pSpec.createSpecDistiller([input.spec])
			distilledSpec := distiller.distill(input.spec)

			passSpec := maps.deepTransform(distilledSpec, {
				axesSpec: func(axesSpec) {
					aggregationAxesSpec := []
					for i in nonAnonymizedAggregationIndices {
						aggregationAxesSpec = append(aggregationAxesSpec, axesSpec[i])
					}
					return aggregationAxesSpec
				}
			})

			extra[constants.DISTILLED_AGGREGATION_SPEC_FIELD_NAME] = smart.createJsonResource(passSpec)
		}
	}

	processedOutputs := []
	outputSpecs := {}
	outputsMap := {}
	for output in outputs {

		outputsMap[output.name] = output
		traceInfo := pCommon.buildOutputTrace(opts, output, input.spec)
		if output.type == "Resource" || output.type == "BinaryPartitioned" || output.type == "JsonPartitioned" {
			spec := maps.merge(output.spec, { axesSpec: copy(iterationAxesSpec) + slices.normalize(output.spec.axesSpec) })
			if !is_undefined(traceInfo.trace) {
				spec = traceInfo.trace.inject(spec, {override: traceInfo.overrideTrace})
			}
			outputSpecs[output.name] = spec
			processedOutputs = append(processedOutputs, maps.merge(output, {spec: undefined, traceSteps: undefined, overrideTrace: undefined }))
		} else if output.type == "ResourceMap" {
			processedOutputs = append(processedOutputs, maps.merge(output,
				{ keyLength: len(output.spec.axesSpec), spec: undefined, traceSteps: undefined, overrideTrace: undefined }))
			spec := maps.merge(output.spec, { axesSpec: copy(iterationAxesSpec) + slices.normalize(output.spec.axesSpec) })
			if !is_undefined(traceInfo.trace) {
				spec = traceInfo.trace.inject(spec, {override: traceInfo.overrideTrace})
			}
			outputSpecs[output.name] = spec
		} else if output.type == "Xsv" {
			if is_undefined(output.settings) {
				ll.panic("settings are required for Xsv output")
			}
			decomposition := pUtil.decomposePfconvImportCfg(output.settings, {
				additionalAxesSpec: iterationAxesSpec
			})
			purifiedPfconvCfg := decomposition.purifiedCfg
			columnsSpec := decomposition.columnsSpec
			if !is_undefined(traceInfo.trace) {
				for columnName, spec in columnsSpec {
					columnsSpec[columnName] = traceInfo.trace.inject(spec, {override: traceInfo.overrideTrace})
				}
			}
			outputSpecs[output.name] = columnsSpec
			processedOutputs = append(processedOutputs, maps.merge(output,
				{
					settings: purifiedPfconvCfg,
					flattenWithDelimiter: "/",
					traceSteps: undefined,
					overrideTrace: undefined
				}))
		} else if output.type == "TsvContent" {
			validation.assertType(output.settings, TSV_CONTENT_SETTINGS_SCHEMA, "Invalid TsvContent settings")


			transformedSettings := transformTsvSettings(output.settings)


			columnsSpec := {}
			axesSpec := copy(iterationAxesSpec)
			for axis in output.settings.axes {
				axesSpec = append(axesSpec, axis.spec)
			}

			for column in output.settings.columns {
				columnId := getTsvColumnId(column)


				spec := maps.merge(column.spec, { axesSpec: axesSpec, kind: "PColumn" })
				if !is_undefined(traceInfo.trace) {
					spec = traceInfo.trace.inject(spec, {override: traceInfo.overrideTrace})
				}
				columnsSpec[columnId] = spec
			}

			outputSpecs[output.name] = columnsSpec
			processedOutputs = append(processedOutputs, maps.merge(output, {
				settings: transformedSettings,
				flattenWithDelimiter: "/",
				traceSteps: undefined,
				overrideTrace: undefined
			}))
		} else {
			ll.panic("unknown output type: " + output.type)
		}
	}
	processTemplateParams.outputs = processedOutputs

	renderInputs := {
		params: smart.createJsonResource(processTemplateParams),
		body: bodyTpl,
		data: input.data
	}

	for k, v in extra {
		renderInputs["__extra_" + k] = v
	}

	for k, v in metaExtra {
		renderInputs["__meta_" + k] = v
	}

	renderResult := render.createEphemeral(llProcessTpl, renderInputs)
	return pCommon.createProcessColumnResult(renderResult, outputsMap, outputSpecs)
}


export ll.toStrict({
	parseData: pColumnData.parseData,
	processColumn: processColumn,
	exportFrame: exportFrame,
	pFrameBuilder: builder.pFrameBuilder,
	exportColumnData: exportColumnData,
	pColumnResourceMapDataBuilder: pColumnResourceMapDataBuilder,
	tsvFileBuilder: func() { return xsvBuilder.xsvFileBuilder("tsv") },
	csvFileBuilder: func() { return xsvBuilder.xsvFileBuilder("csv") },
	parquetFileBuilder: func() { return xsvBuilder.xsvFileBuilder("parquet") },
	tableBuilder: tableBuilderMod.tableBuilder,
	parseToJson: parseToJson
})
