ll := import("@platforma-sdk/workflow-tengo:ll")
exp := import("@platforma-sdk/workflow-tengo:pt.expression")
util := import("@platforma-sdk/workflow-tengo:pt.util")
assets := import("@platforma-sdk/workflow-tengo:assets")
smart := import("@platforma-sdk/workflow-tengo:smart")
exec := import("@platforma-sdk/workflow-tengo:exec")
json := import("json")
text := import("text")
slices := import("@platforma-sdk/workflow-tengo:slices")
execConstants := import("@platforma-sdk/workflow-tengo:exec.constants")
pframesBuilder := import("@platforma-sdk/workflow-tengo:pframes.builder")
pframesConstants := import("@platforma-sdk/workflow-tengo:pframes.constants")
pframesUtil := import("@platforma-sdk/workflow-tengo:pframes.util")
pframesSpec := import("@platforma-sdk/workflow-tengo:pframes.spec")
render := import("@platforma-sdk/workflow-tengo:render")
maps := import("@platforma-sdk/workflow-tengo:maps")
canonical := import("@platforma-sdk/workflow-tengo:canonical")
validation := import("@platforma-sdk/workflow-tengo:validation")






































_newDataFrame := undefined
_newDataFrameGroupBy := undefined

workflowCounter := 0

_validateColumn := func(name, columnMap) {
	ll.assert(is_string(name), "column name must be a string")
	ll.assert(is_map(columnMap), "column must be a map")
	ll.assert(!is_undefined(columnMap.spec), "column must have 'spec' field defined")
	ll.assert(!is_undefined(columnMap.data), "column must have 'data' field defined")
}

_newPEntry := func(columnsByName, specQuery) {
	return ll.toStrict({

		_getColumnsMap: func() {
			return columnsByName
		},

		_getSpecQuery: func() {
			return specQuery
		}
	})
}

_isPEntry := func(entry) {
	return ll.isMap(entry) && ll.methodExists(entry, "_getColumnsMap") && ll.methodExists(entry, "_getSpecQuery");
}

_frameToPEntries := undefined





_mergePEntries := func(entries) {
	columns := {}
	specQueries := []
	for entry in entries {
		ll.assert(_isPEntry(entry), "expected a PEntry, got: %v", entry)
		entryColumns := entry._getColumnsMap()


		maps.forEach(entryColumns, func(name, info) {
			if !is_undefined(columns[name]) {
				ll.panic("column name %s is already used, please pick a different name", name)
			}
			columns[name] = info
		})
		specQueries = append(specQueries, entry._getSpecQuery())
	}
	return { columns: columns, specQueries: specQueries }
}




_asJoinEntry := func(specQuery) {
	return { entry: specQuery }
}

_asJoinEntries := func(specQueries) {
	return slices.map(specQueries, _asJoinEntry)
}







_buildAxisSelector := func(axesSpec, idx) {
	axis := axesSpec[idx]
	selector := { name: axis.name }
	if !is_undefined(axis.type) {
		selector.type = axis.type
	}
	if !is_undefined(axis.domain) && len(axis.domain) > 0 {
		selector.domain = maps.clone(axis.domain)
	}
	if !is_undefined(axis.contextDomain) && len(axis.contextDomain) > 0 {
		selector.contextDomain = maps.clone(axis.contextDomain)
	}
	if !is_undefined(axis.parentAxes) && len(axis.parentAxes) > 0 {
		parents := []
		for parentIdx in axis.parentAxes {
			ll.assert(parentIdx >= 0 && parentIdx < idx,
				"slicedColumn: parentAxes index %d must be less than current axis index %d",
				parentIdx, idx)
			parents = append(parents, _buildAxisSelector(axesSpec, parentIdx))
		}
		selector.parentAxes = parents
	}
	return selector
}









p := ll.toStrict({






	column: func(name, columnMap) {
		_validateColumn(name, columnMap)
		columnsByName := {}
		columnsByName[name] = columnMap
		return _newPEntry(columnsByName, {
			type: "column",
			column: name
		})
	},










	slicedColumn: func(name, columnMap, axisFilters) {
		_validateColumn(name, columnMap)
		util.validateAxisFilters(axisFilters)

		axesSpec := columnMap.spec.axesSpec
		ll.assert(is_array(axesSpec),
			"slicedColumn: column %q spec must carry axesSpec, got: %v", name, columnMap.spec)

		resolvedFilters := []
		for filter in axisFilters {
			idx := filter.axisIndex
			ll.assert(idx >= 0 && idx < len(axesSpec),
				"slicedColumn: axisIndex %d out of range for column %q (axes: %d)",
				idx, name, len(axesSpec))
			resolvedFilters = append(resolvedFilters, {
				axisSelector: _buildAxisSelector(axesSpec, idx),
				constant: filter.constant
			})
		}

		columnsByName := {}
		columnsByName[name] = columnMap
		return _newPEntry(columnsByName, {
			type: "sliceAxes",
			input: { type: "column", column: name },
			axisFilters: resolvedFilters
		})
	},





	inner: func(...entries) {
		ll.assert(len(entries) > 0, "inner join requires at least one entry")
		if len(entries) == 1 && !_isPEntry(entries[0]) {
			entries = _frameToPEntries(entries[0])
		}
		merged := _mergePEntries(entries)
		return _newPEntry(merged.columns, {
			type: "innerJoin",
			entries: _asJoinEntries(merged.specQueries)
		})
	},





	full: func(...entries) {
		ll.assert(len(entries) > 0, "full join requires at least one entry")
		if len(entries) == 1 && !_isPEntry(entries[0]) {
			entries = _frameToPEntries(entries[0])
		}
		merged := _mergePEntries(entries)
		return _newPEntry(merged.columns, {
			type: "fullJoin",
			entries: _asJoinEntries(merged.specQueries)
		})
	},






	outer: func(primary, ...secondary) {
		ll.assert(_isPEntry(primary), "primary entry must be a PEntry object")
		ll.assert(len(secondary) > 0, "outer join requires at least one secondary entry")
		merged := _mergePEntries(append([primary], secondary...))
		return _newPEntry(merged.columns, {
			type: "outerJoin",
			primary: _asJoinEntry(merged.specQueries[0]),
			secondary: _asJoinEntries(merged.specQueries[1:])
		})
	},












	linkerJoin: func(linkerColumnEntry, secondaryEntry) {
		ll.assert(_isPEntry(linkerColumnEntry), "linker must be a PEntry object")
		linkerQuery := linkerColumnEntry._getSpecQuery()
		ll.assert(ll.isMap(linkerQuery) && linkerQuery.type == "column",
			"linkerJoin: linker entry must be a plain p.column(...) result, got: %v", linkerQuery)



		linkerSpec := linkerColumnEntry._getColumnsMap()[linkerQuery.column].spec
		if ll.isMap(linkerSpec) {
			annotations := linkerSpec.annotations
			isLinker := ll.isMap(annotations) && annotations[pframesSpec.A_IS_LINKER_COLUMN] == "true"
			ll.assert(isLinker,
				"linkerJoin: linker column %q must carry annotation %q == \"true\", got spec: %v",
				linkerQuery.column, pframesSpec.A_IS_LINKER_COLUMN, linkerSpec)
		}

		merged := _mergePEntries([linkerColumnEntry, secondaryEntry])
		return _newPEntry(merged.columns, {
			type: "linkerJoin",
			linker: { column: linkerQuery.column },
			secondary: [_asJoinEntry(merged.specQueries[1])]
		})
	},











	_rawQueryEntry: func(columnsByName, joinEntry) {
		ll.assert(ll.isMap(columnsByName), "columnsByName must be a map")
		ll.assert(ll.isMap(joinEntry) && !is_undefined(joinEntry.entry),
			"joinEntry must be a SpecQueryJoinEntry { entry, qualifications? }, got: %v", joinEntry)
		for name, info in columnsByName {
			_validateColumn(name, info)
		}
		return _newPEntry(columnsByName, joinEntry.entry)
	}
})

_frameToPEntries = func(frameInput) {
	frameResource := undefined
	if pframesUtil.isFlatPfMap(frameInput) {
		frameResource = frameInput
	} else if pframesUtil.isStructuredPfMap(frameInput) {
		pfBuilder := pframesBuilder.pFrameBuilder()
		for key, value in frameInput {
			pfBuilder.add(key, value.spec, value.data)
		}
		frameResource = pfBuilder.build()
	} else if smart.isReference(frameInput) {
		if smart.isField(frameInput) {
			ll.assert(frameInput.isSet(), "frameInput must be set")
			frameResource = frameInput.getValue()
		} else {
			frameResource = frameInput
		}
		ll.assert(frameResource.checkResourceType(pframesConstants.RTYPE_P_FRAME), "frameResource must be a PFrame resource")
	} else {
		ll.panic("unsupported frameInput type: %v", frameInput)
	}

	columnsMap := pframesUtil.pFrameToColumnsMap(frameResource)
	columnEntries := []
	maps.forEach(columnsMap, func(frameUniqueColumnId, columnInfo) {
		columnEntry := p.column(frameUniqueColumnId, columnInfo)
		columnEntries = append(columnEntries, columnEntry)
	})
	return columnEntries
}


_detectFormatFromFileName := func(fileName) {
	if text.has_suffix(fileName, ".csv") {
		return "csv"
	} else if text.has_suffix(fileName, ".tsv") {
		return "tsv"
	} else if text.has_suffix(fileName, ".ndjson") || text.has_suffix(fileName, ".jsonl") {
		return "ndjson"
	} else if text.has_suffix(fileName, ".parquet") {
		return "parquet"
	}
	return undefined // No format detected
}







workflow := func() {
	steps := []


	inFiles := []










	inFrames := {}

	outFiles := []

	outContentFiles := []











	outFramesInfo := {}

	anonymousFilesCounter := 0
	anonymousDFCounter := 0
	pcolumnCounter := 0
	pframeCounter := 0

	cpu := undefined
	mem := undefined
	inputCache := undefined
	queue := execConstants.MEDIUM_QUEUE

	self := undefined

	id := workflowCounter
	workflowCounter += 1

	self = {
		_wfId: id,







		addRawStep: func(step) {
			ll.assert(is_map(step), "step must be a map")
			ll.assert(is_string(step.type), "step.type must be a string discriminator")
			self._addStep(func(ctx) {
				return step
			})
			return self
		},

		_addStep: func(step) {
			ll.assert(is_callable(step), "step must be a function")
			steps = append(steps, step)
		},


















































		frame: func(frameInput, ...optionsRaw) {
			ll.assert(!is_undefined(frameInput), "frameInput must be provided")

			opts := {}
			if len(optionsRaw) > 0 {
				if len(optionsRaw) == 1 && is_map(optionsRaw[0]) {
					opts = optionsRaw[0]
				} else {
					ll.panic("frame options must be a single map argument")
				}
			}

			fileRef := undefined
			fileContent := undefined
			inputSchema := undefined
			inputXsvType := undefined
			finalDataFrameId := undefined
			finalFileName := undefined
			finalFormat := undefined
			inferSchemaOpt := undefined
			optionsSchema := undefined
			nRowsOpt := undefined
			ignoreErrorsOpt := undefined
			commentPrefixOpt := undefined

			if is_map(frameInput) && !is_undefined(frameInput.file) {
				fileRef = frameInput.file
				inputXsvType = frameInput.xsvType

				if !is_undefined(frameInput.schema) {
					if !is_array(frameInput.schema) {
						ll.panic("frameInput.schema (for structural input) must be an array if provided. Got: %T", frameInput.schema)
					}
					inputSchema = frameInput.schema
				}

				finalFormat = inputXsvType
			} else if smart.isReference(frameInput) {
				fileRef = frameInput
			} else if is_string(frameInput) || is_bytes(frameInput) {
				fileContent = frameInput
			} else if _isPEntry(frameInput) {
				finalFormat = "pentry"
			} else {
				ll.panic("unknown frame input type: %v", frameInput)
			}


			if !is_undefined(opts.format) {
				validation.assertType(opts.format, "string,regex=csv|tsv|parquet|ndjson",
					"format must be one of [csv, tsv, parquet, ndjson], found: %v", opts.format)
				if !is_undefined(finalFormat) && finalFormat != opts.format {
					ll.panic("format and xsvType from structural input cannot be used together. Got: %v and %v", opts.format, finalFormat)
				}
				finalFormat = opts.format
			} else if !is_undefined(opts.xsvType) {
				pframesUtil.validateXsvType(opts.xsvType)
				if !is_undefined(finalFormat) && finalFormat != opts.xsvType {
					ll.panic("opts.xsvType and xsvType from structural input cannot be used together. Got: %v and %v", opts.xsvType, finalFormat)
				}
				finalFormat = opts.xsvType
			}

			if !is_undefined(opts.id) {
				finalDataFrameId = opts.id
			} else {
				finalDataFrameId = self._newAnonymousDataFrameId()
			}

			if !is_undefined(opts.fileName) {
				finalFileName = opts.fileName
				if is_undefined(finalFormat) {
					detectedFormat := _detectFormatFromFileName(finalFileName)
					if !is_undefined(detectedFormat) {
						finalFormat = detectedFormat
					}
				}
			} else {
				finalFileName = self._newAnonymousFileId(finalFormat)
			}


			if is_undefined(finalFormat) {
				ll.panic("Unable to determine file format. Please specify 'format' or 'xsvType' option, or use a fileName with a recognizable extension (.csv, .tsv, .ndjson, .jsonl)")
			}

			if !is_undefined(opts.inferSchema) {
				if !is_bool(opts.inferSchema) {
					ll.panic("'inferSchema' option must be a boolean. Got: %T", opts.inferSchema)
				}
				inferSchemaOpt = opts.inferSchema
			}

			if !is_undefined(opts.schema) {
				if !is_array(opts.schema) {
					ll.panic("'schema' option (from options map) must be an array. Got: %T", opts.schema)
				}
				optionsSchema = opts.schema
			}

			if !is_undefined(opts.nRows) {
				if !is_int(opts.nRows) || opts.nRows < 0 {
					ll.panic("'nRows' option must be a non-negative integer. Got: %v", opts.nRows)
				}
				nRowsOpt = opts.nRows
			}

			if !is_undefined(opts.ignoreErrors) {
				if !is_bool(opts.ignoreErrors) {
					ll.panic("'ignoreErrors' option must be a boolean. Got: %T", opts.ignoreErrors)
				}
				ignoreErrorsOpt = opts.ignoreErrors
			}

			if !is_undefined(opts.commentPrefix) {
				if !is_string(opts.commentPrefix) {
					ll.panic("'commentPrefix' option must be a string. Got: %T", opts.commentPrefix)
				}
				commentPrefixOpt = opts.commentPrefix
			}

			if !is_undefined(fileRef) {
				ll.assert(
					smart.isReference(fileRef),
					"frameInput.file (for structural input) or frameInput (for direct reference) must be a valid resource reference, got: %v",
					fileRef)
				inFiles = append(inFiles, { file: fileRef, name: finalFileName })
			} else if !is_undefined(fileContent) {
				ll.assert(is_string(fileContent), "frameInput (for direct content) must be a string, got: %v", fileContent)
				inFiles = append(inFiles, { content: fileContent, name: finalFileName })
			}


			step := undefined
			if finalFormat == "pentry" {
				columnsMap := frameInput._getColumnsMap()


				translation := {}
				rename := {}
				renamedColumnsMap := {}

				maps.forEach(columnsMap, func(frameUniqueColumnId, columnInfo) {
					workflowUniqueColumnId := self._newUniqueColumnId()

					translation[workflowUniqueColumnId] = frameUniqueColumnId
					rename[frameUniqueColumnId] = workflowUniqueColumnId
					renamedColumnsMap[workflowUniqueColumnId] = columnInfo
				})

				renamedQuery := util.renameSpecQueryColumns(frameInput._getSpecQuery(), rename)


				ll.assert(is_undefined(inFrames[finalDataFrameId]),
					"frame with name '%s' already added", finalDataFrameId)
				inFrames[finalDataFrameId] = {
					columns: renamedColumnsMap
				}

				step = {
					type: "read_frame",
					name: finalDataFrameId,
					request: { query: renamedQuery },
					translation: translation
				}
			} else if finalFormat == "parquet" {
				step = {
					type: "read_parquet",
					file: finalFileName,
					name: finalDataFrameId
				}
			} else if finalFormat == "ndjson" {
				step = {
					type: "read_ndjson",
					file: finalFileName,
					name: finalDataFrameId
				}
			} else { // csv or tsv
				step = {
					type: "read_csv",
					file: finalFileName,
					name: finalDataFrameId
				}

				if finalFormat == "csv" {
					step.delimiter = ","
				} else if finalFormat == "tsv" {
					step.delimiter = "\t"
				}

				if !is_undefined(commentPrefixOpt) {
					step.commentPrefix = commentPrefixOpt
				}
			}


			finalSchemaToUse := undefined
			if !is_undefined(optionsSchema) {
				finalSchemaToUse = optionsSchema
			} else {
				finalSchemaToUse = inputSchema
			}


			if !is_undefined(finalSchemaToUse) && finalFormat != "parquet" {
				step.schema = finalSchemaToUse
			}

			if !is_undefined(inferSchemaOpt) && inferSchemaOpt == false {
				step.inferSchema = false
			}

			if !is_undefined(ignoreErrorsOpt) && ignoreErrorsOpt == true {
				step.ignoreErrors = true
			}

			if !is_undefined(nRowsOpt) {
				step.nRows = nRowsOpt
			}

			self.addRawStep(step)
			return _newDataFrame(self, finalDataFrameId)
		},











		frameFromColumnBundle: func(bundle, ...options) {
			options = len(options) > 0 && is_map(options[0]) ? options[0] : {}
			util.validateParamsForFrameFromColumnBundle(options)

			columns := is_array(options.columns) ? options.columns : bundle.getColumnKeys()

			axes := bundle.getAxesSpec(columns...)

			entries := []
			aliases := []

			for axis in axes {
				aliases = append(aliases, exp.sc.axis(axis).alias(pframesSpec.getAxisId(axis)))
			}

			for column in columns {
				decoded := json.decode(column)
				aliases = append(aliases, exp.col(column))

				if (is_undefined(decoded.axisFilters)) {
					entry := p.column(column, bundle.getColumn(decoded))
					entries = append(entries, entry)
				} else {
					entry := p.slicedColumn(
						column,
						bundle.getColumn(decoded, {
							keepColumns: true,
							ignoreNonPartitioned: true
						}),
						slices.map(decoded.axisFilters, func(axisFilter) {
							return {
								axisIndex: axisFilter[0],
								constant: axisFilter[1]
							}
						})
					)
					entries = append(entries, entry)
				}
			}

			return self.frame(p.full(entries...)).select(aliases...)
		},

		_newAnonymousDataFrameId: func() {
			anonymousDFCounter += 1
			return "anonymous_" + string(anonymousDFCounter)
		},
		_newAnonymousFileId: func(extension) {
			anonymousFilesCounter += 1
			return "anonymous_" + string(anonymousFilesCounter) + "." + extension
		},
		_newUniqueColumnId: func() {
			pcolumnCounter += 1
			return "pcolumn_" + string(pcolumnCounter)
		},
		_newUniquePFrameName: func() {
			pframeCounter += 1
			return "pframe_" + string(pframeCounter)
		},
		_saveFile: func(name) {
			outFiles = append(outFiles, name)
		},
		_saveFileContent: func(name) {
			outContentFiles = append(outContentFiles, name)
		},
		_saveFrameInfo: func(name, rename, colInfo) {
			ll.assert(
				outFramesInfo[name] == undefined,
				"frame with name '%s' already saved, please pick a different name",
				name
			)
			outFramesInfo[name] = {
				colInfo: colInfo,
				rename: rename
			}
		},




		inHeavyQueue: func() {
			queue = execConstants.HEAVY_QUEUE
			return self
		},




		inMediumQueue: func() {
			queue = execConstants.MEDIUM_QUEUE
			return self
		},




		inLightQueue: func() {
			queue = execConstants.LIGHT_QUEUE
			return self
		},




		inUiQueue: func() {
			queue = execConstants.UI_TASKS_QUEUE
			return self
		},






		cpu: func(amount) {
			cpu = amount
			return self
		},












		mem: func(amount) {
			mem = amount
			return self
		},










		cacheInputs: func(time) {
			ll.assert(
				is_int(time),
				"input cache time must be an integer. " +
					"Did you forget to import a standard tengo library 'times'?")
			inputCache = time
			return self
		},







		run: func() {
			workflowRunTpl := assets.importTemplate("@platforma-sdk/workflow-tengo:pt.workflow-run")

			rawColumnSpecs := []
			for _, frame in inFrames {
				for _, columnInfo in frame.columns {
					rawColumnSpecs = append(rawColumnSpecs, columnInfo.spec)
				}
			}
			specDistiller := pframesSpec.createSpecDistiller(rawColumnSpecs)

			canonicalMap := {}
			distilledAxesSpecs := {}
			maps.forEach(inFrames, func(_, frame) {
				maps.forEach(frame.columns, func(_, columnInfo) {
					for axis in columnInfo.spec.axesSpec {
						distilledAxis := specDistiller.distill(axis)
						if is_undefined(canonicalMap[distilledAxis.name]) {
							canonicalMap[distilledAxis.name] = {}
						}
						canonicalizedAxisSpec := canonical.encode(distilledAxis)
						if is_undefined(canonicalMap[distilledAxis.name][canonicalizedAxisSpec]) {
							canonicalMap[distilledAxis.name][canonicalizedAxisSpec] = true
							if is_undefined(distilledAxesSpecs[distilledAxis.name]) {
								distilledAxesSpecs[distilledAxis.name] = []
							}
							distilledAxesSpecs[distilledAxis.name] = append(distilledAxesSpecs[distilledAxis.name], distilledAxis)
						}
					}
				})
			})

			ctx := {
				specDistiller: specDistiller,
				distilledAxesSpecs: distilledAxesSpecs
			}
			workflowJsonStruct := {
				workflow: slices.map(steps, func(step) {
					return step(ctx)
				})
			}

			inFilesMap := {}
			inFilesContentMap := {}
			for inFile in inFiles {
				if !is_undefined(inFile.file) {
					inFilesMap[inFile.name] = inFile.file
				} else if !is_undefined(inFile.content) {
					inFilesContentMap[inFile.name] = inFile.content
				} else {
					ll.panic("inFile must have either file or content, got: %v", inFile)
				}
			}

			outFrames := {}
			for _, frameInfo in outFramesInfo {
				columnIds := maps.getKeys(frameInfo.colInfo)
				outFrames[frameInfo.rename] = columnIds
			}

			inFramesDistilled := maps.mapValues(inFrames, func(frame) {
				return {
					columns: maps.mapValues(frame.columns, func(columnInfo) {
						return {
							spec: specDistiller.distill(columnInfo.spec),
							data: columnInfo.data
						}
					})
				}
			})

			runInputs := {
				workflowJsonStruct: workflowJsonStruct,
				inFiles: inFilesMap,
				inFilesContent: inFilesContentMap,
				inFrames: inFramesDistilled,
				outFiles: outFiles,
				outFilesContent: outContentFiles,
				outFrames: outFrames
			}
			runMetaInputs := {
				queue: queue,
				cpu: cpu,
				mem: mem,
				inputCache: inputCache
			}
			ptResult := render.create(workflowRunTpl, runInputs, {
				metaInputs: runMetaInputs
			});

			self := undefined
			self = ll.toStrict({






				getFile: func(fileName) {
					return ptResult.resolveOutput(["outFiles", fileName])
				},







				getFileContent: func(fileName) {
					return ptResult.resolveOutput(["outFilesContent", fileName])
				},







				getFrameDirect: func(frameName) {
					frameInfo := outFramesInfo[frameName]
					ll.assert(!is_undefined(frameInfo), "saveFrameDirect was not called for frame '%s'", frameName)

					pf := pframesBuilder.pFrameBuilder()
					for columnId, columnInfo in frameInfo.colInfo {
						columnData := ptResult.resolveOutput(["outFrames", frameInfo.rename, columnId])
						pf.add(columnInfo.id, columnInfo.spec, columnData)
					}
					return pf.build()
				}
			})
			return self
		}
	}

    return ll.toStrict(self)
}

_newDataFrameGroupBy = func(parentWorkflow, dfName, groupByExpressions) {
	self := undefined
	self = ll.toStrict({
















		agg: func(...aggExpressions) {
			ll.assert(len(aggExpressions) > 0, "agg method requires at least one aggregation expression.")

			outputDfName := parentWorkflow._newAnonymousDataFrameId()
			parentWorkflow._addStep(func(ctx) {
				aggregations := slices.map(aggExpressions, func(aggExpr) {
					if !exp._isAggregation(aggExpr) {
						ll.panic("Invalid argument to agg: Expected an aggregation expression object, got %v", aggExpr)
					}
					return aggExpr._getAggregation(ctx)
				})

				return {
					type: "aggregate",
					inputTable: dfName,
					outputTable: outputDfName,
					groupBy: exp._mapToExpressionStructList(groupByExpressions, "col", ctx),
					aggregations: aggregations
				}
			})
			return _newDataFrame(parentWorkflow, outputDfName)
		}
	})
	return self
}

_newDataFrame = func(parentWorkflow, dfName) {
	_mapExprsToStepCols := func(items, methodName, ctx) {
		ll.assert(len(items) > 0, methodName + " requires at least one expression argument.")
		cols := []
		for item in items {
			if exp._isExpression(item) {
				cols = append(cols, item._getExpression(ctx))
			} else {
				ll.panic("Invalid argument to " + methodName + ": Expected an expression/column object, got %v", item)
			}
		}
		return cols
	}

	_mapExprsToStepColsAllowingStrings := func(items, methodName, ctx) {
		ll.assert(len(items) > 0, methodName + " requires at least one expression argument.")
		cols := []
		for item in items {
			if is_string(item) {

				colExpr := exp.col(item)
				cols = append(cols, colExpr._getExpression(ctx))
			} else if exp._isExpression(item) {
				cols = append(cols, item._getExpression(ctx))
			} else {
				ll.panic("Invalid argument to " + methodName + ": Expected an expression/column object or string column name, got %v", item)
			}
		}
		return cols
	}

	_addSaveStep := func(outputFile, ...options) {
		opts := {}
		if len(options) > 0 {
			opts = options[0]
		}

		finalFormat := undefined


		if !is_undefined(opts.format) {
			validation.assertType(opts.format, "string,regex=csv|tsv|parquet|ndjson",
				"format must be one of [csv, tsv, parquet, ndjson], found: %v", opts.format)
			finalFormat = opts.format
		} else if !is_undefined(opts.xsvType) {
			pframesUtil.validateXsvType(opts.xsvType)
			finalFormat = opts.xsvType
		} else {

			detectedFormat := _detectFormatFromFileName(outputFile)
			if !is_undefined(detectedFormat) {
				finalFormat = detectedFormat
			}
		}

		if is_undefined(finalFormat) {
			ll.panic("Unable to determine output format from file extension. Please specify 'format' or 'xsvType' option, or use a file name with a recognizable extension (.csv, .tsv, .ndjson, .jsonl)")
		}

		step := undefined
		if finalFormat == "parquet" {
			step = {
				type: "write_parquet",
				table: dfName,
				file: outputFile
			}
		} else if finalFormat == "ndjson" {
			step = {
				type: "write_ndjson",
				table: dfName,
				file: outputFile
			}
		} else { // csv or tsv
			delimiter := undefined
			if finalFormat == "csv" {
				delimiter = ","
			} else if finalFormat == "tsv" {
				delimiter = "\t"
			}

			step = {
				type: "write_csv",
				table: dfName,
				file: outputFile,
				delimiter: delimiter
			}
		}

		if !is_undefined(opts.columns) {
			step.columns = opts.columns
		}

		parentWorkflow.addRawStep(step)
	}

	self := undefined

	self = ll.toStrict({
		_getWorkflow: func() {
			return parentWorkflow
		},
		_getDfName: func() {
			return dfName
		},














		withColumns: func(...expressions) {
			outputDfName := parentWorkflow._newAnonymousDataFrameId()
			parentWorkflow._addStep(func(ctx) {
				stepCols := _mapExprsToStepCols(expressions, "withColumns", ctx)
				return {
					type: "with_columns",
					inputTable: dfName,
					outputTable: outputDfName,
					columns: stepCols
				}
			})
			return _newDataFrame(parentWorkflow, outputDfName)
		},















		select: func(...expressions) {
			outputDfName := parentWorkflow._newAnonymousDataFrameId()
			parentWorkflow._addStep(func(ctx) {
				stepCols := _mapExprsToStepColsAllowingStrings(expressions, "select", ctx)
				return {
					type: "select",
					inputTable: dfName,
					outputTable: outputDfName,
					columns: stepCols
				}
			})
			return _newDataFrame(parentWorkflow, outputDfName)
		},






















		unique: func(...options) {
			opts := {}
			if len(options) > 0 {
				opts = options[0]
			}

			if !is_undefined(opts.keep) {
				validKeep := opts.keep == "any" || opts.keep == "none" || opts.keep == "first" || opts.keep == "last"
				if !validKeep {
					ll.panic("keep must be 'any', 'none', 'first', or 'last'. Got: %v", opts.keep)
				}
			}

			outputDfName := parentWorkflow._newAnonymousDataFrameId()
			parentWorkflow._addStep(func(ctx) {
				step := {
					type: "unique",
					inputTable: dfName,
					outputTable: outputDfName
				}
				if !is_undefined(opts.subset) {
					if exp._isSelector(opts.subset) {
						step.subset = opts.subset._getExpression(ctx)
					} else {
						step.subset = opts.subset
					}
				}
				if !is_undefined(opts.keep) {
					step.keep = opts.keep
				}
				if !is_undefined(opts.maintainOrder) {
					step.maintainOrder = opts.maintainOrder
				}
				return step
			})
			return _newDataFrame(parentWorkflow, outputDfName)
		},














		addColumns: func(...expressions) {
			parentWorkflow._addStep(func(ctx) {
				stepCols := _mapExprsToStepCols(expressions, "addColumns", ctx)
				return {
					type: "add_columns",
					table: dfName,
					columns: stepCols
				}
			})
			return self
		},



























		save: func(outputFile, ...options) {
			_addSaveStep(outputFile, options...)

			parentWorkflow._saveFile(outputFile)

			return self
		},
























		saveContent: func(outputFile, ...options) {
			_addSaveStep(outputFile, options...)

			parentWorkflow._saveFileContent(outputFile)

			return self
		},















		saveFrameDirect: func(frameName, params) {
			ll.assert(len(frameName) > 0, "frameName must be a non-empty string")
			util.validateParamsForSaveFrameDirect(params)

			projection := []
			translation := {}
			columnCount := 0
			renameColumn := func(column) {
				columnCount += 1
				name := "col" + string(columnCount)
				translation[column] = name
				projection = append(projection, exp.col(column).alias(name))
			}
			for ax in params.axes {
				renameColumn(ax.column)
			}
			for col in params.columns {
				renameColumn(col.column)
			}
			df := self.select(projection...)

			colInfo := util.makeFrameColumnInfo(params, translation)
			sequentialFrameName := parentWorkflow._newUniquePFrameName()
			parentWorkflow._saveFrameInfo(frameName, sequentialFrameName, colInfo)

			step := util.makeWriteFrameStep(sequentialFrameName, df._getDfName(), params, translation)
			parentWorkflow.addRawStep(step)

			return self
		},











		groupBy: func(...expressions) {
			ll.assert(len(expressions) > 0, "groupBy requires at least one expression argument.")


			for expr in expressions {
				if !is_string(expr) && !exp._isExpression(expr) {
					ll.panic("Invalid argument to groupBy: Expected a column name (string) or an expression object, got %v", expr)
				}
			}
			return _newDataFrameGroupBy(parentWorkflow, dfName, expressions)
		},













		filter: func(...predicates) {
			ll.assert(len(predicates) > 0, "filter method requires at least one predicate expression.")

			allConditions := []
			for p in predicates {
				if !exp._isExpression(p) {
					ll.panic("Invalid argument in filter: Expected an expression object, got %v", p)
				}
				allConditions = append(allConditions, p)
			}

			condition := undefined
			if len(allConditions) == 1 {
				condition = allConditions[0]
			} else {
				condition = exp.and(allConditions...)
			}

			outputDfName := parentWorkflow._newAnonymousDataFrameId()
			parentWorkflow._addStep(func(ctx) {
				return {
					type: "filter",
					inputTable: dfName,
					outputTable: outputDfName,
					condition: condition._getExpression(ctx)
				}
			})
			return _newDataFrame(parentWorkflow, outputDfName)
		},









		limit: func(nRows) {
			ll.assert(is_int(nRows) && nRows > 0, "limit argument must be a positive number.")

			outputDfName := parentWorkflow._newAnonymousDataFrameId()
			parentWorkflow.addRawStep({
				type: "limit",
				inputTable: dfName,
				outputTable: outputDfName,
				n: nRows
			})
			return _newDataFrame(parentWorkflow, outputDfName)
		},















		slice: func(offset, length) {
			ll.assert(is_int(offset) && offset >= 0, "slice offset must be a non-negative integer.")
			ll.assert(is_int(length) && length >= 0, "slice length must be a non-negative integer.")

			outputDfName := parentWorkflow._newAnonymousDataFrameId()
			parentWorkflow.addRawStep({
				type: "slice",
				inputTable: dfName,
				outputTable: outputDfName,
				offset: offset,
				length: length
			})
			return _newDataFrame(parentWorkflow, outputDfName)
		},

























		sort: func(by, ...optionsRaw) {
			ll.assert(is_array(by), "First argument to sort must be an array of sort keys (column names or expressions).")
			ll.assert(len(by) > 0, "Sort keys array cannot be empty.")

			opts := {}
			if len(optionsRaw) > 0 {
				ll.assert(len(optionsRaw) == 1, "sort expects at most one options map argument after the sort keys array.")
				if !is_map(optionsRaw[0]) {
					ll.panic("Second argument to sort (if provided) must be an options map. Got: %T", optionsRaw[0])
				}
				opts = optionsRaw[0]
			}

			descendingOpt := opts.descending
			nullsLastOpt := opts.nulls_last

			if !is_undefined(descendingOpt) && !is_bool(descendingOpt) && !is_array(descendingOpt) {
				ll.panic("sort 'descending' option must be a boolean or an array of booleans. Got: %T", descendingOpt)
			}
			if is_array(descendingOpt) && len(descendingOpt) != len(by) {
				ll.panic("sort 'descending' array length (%d) must match the number of sort keys (%d).", len(descendingOpt), len(by))
			}

			if !is_undefined(nullsLastOpt) && !is_bool(nullsLastOpt) && !is_array(nullsLastOpt) {
				ll.panic("sort 'nulls_last' option must be a boolean or an array of booleans. Got: %T", nullsLastOpt)
			}
			if is_array(nullsLastOpt) && len(nullsLastOpt) != len(by) {
				ll.panic("sort 'nulls_last' array length (%d) must match the number of sort keys (%d).", len(nullsLastOpt), len(by))
			}

			outputDfName := parentWorkflow._newAnonymousDataFrameId()

			parentWorkflow._addStep(func(ctx) {
				sortDirectives := []
				for i, item in by {
					directive := { value: undefined }

					if is_string(item) {
						directive.value = exp.col(item)._getExpression(ctx)
					} else if exp._isExpression(item) {
						directive.value = item._getExpression(ctx)
					} else {
						ll.panic("Invalid sort key at index %d in array: Expected a column name (string) or an expression object, got %T", i, item)
					}


					itemDesc := false // Polars default
					if is_array(descendingOpt) {
						val := descendingOpt[i]
						ll.assert(is_bool(val), "Elements of 'descending' array (at index %d) must be booleans. Got: %T", i, val)
						itemDesc = val
					} else if is_bool(descendingOpt) {
						itemDesc = descendingOpt
					}
					if itemDesc { // Only include if true, as PTabler 'descending' defaults to false
						directive.descending = true
					}


					itemNullsLast := undefined // Let PTabler use its default if not specified
					if is_array(nullsLastOpt) {
						val := nullsLastOpt[i]
						ll.assert(is_bool(val), "Elements of 'nulls_last' array (at index %d) must be booleans. Got: %T", i, val)
						itemNullsLast = val
					} else if is_bool(nullsLastOpt) {
						itemNullsLast = nullsLastOpt
					}

					if !is_undefined(itemNullsLast) { // Only include if explicitly set
						directive.nullsLast = itemNullsLast
					}

					sortDirectives = append(sortDirectives, directive)
				}

				return {
					type: "sort",
					inputTable: dfName,
					outputTable: outputDfName,
					by: sortDirectives
				}
			})
			return _newDataFrame(parentWorkflow, outputDfName)
		},

































		join: func(rightDf, opts) {
			if is_undefined(opts) || !is_map(opts) {
				ll.panic("join options must be a map argument.")
			}
			if is_undefined(rightDf) || is_undefined(rightDf._getDfName) || is_undefined(rightDf._getWorkflow) {
				ll.panic("rightDf argument for join must be a DataFrame object created by pt.workflow().frame().")
			}
			if rightDf._getWorkflow()._wfId != parentWorkflow._wfId {
				ll.panic("Both DataFrames in a join operation must belong to the same workflow.")
			}

			_ensureArrayLocal := func(val, argName) {
				if is_undefined(val) {
					return undefined
				}
				if is_string(val) {
					return [val]
				}
				if is_array(val) {
					for i, item in val {
						if !is_string(item) {
							ll.panic("'%s' option: if an array, all elements must be strings. Found type %T at index %d.", argName, item, i)
						}
					}
					return val
				}
				ll.panic("'%s' option must be a string or an array of strings. Got: %T", argName, val)
			}

			_validateAndSetColumnsLocal := func(columnsOpt, optName) {
				if is_undefined(columnsOpt) {
					return undefined
				}
				if !is_array(columnsOpt) {
					ll.panic("'%s' option must be an array of column mappings or strings. Got: %T", optName, columnsOpt)
				}
				mappings := []
				for i, item in columnsOpt {
					entry := undefined
					if is_string(item) {
						entry = { column: item }
					} else if is_map(item) {
						if is_undefined(item.column) || !is_string(item.column) {
							ll.panic("Each map mapping in '%s' at index %d must have a 'column' string field. Got: %v", optName, i, item.column)
						}
						entry = { column: item.column }
						if !is_undefined(item.rename) {
							if !is_string(item.rename) {
								ll.panic("The 'rename' field in '%s' mapping at index %d must be a string. Got: %T", optName, i, item.rename)
							}
							entry.rename = item.rename
						}
					} else {
						ll.panic("Each element in '%s' at index %d must be a string or a map. Got: %T", optName, i, item)
					}
					mappings = append(mappings, entry)
				}
				if len(mappings) == 0 { // PTabler schema might not like empty arrays for this.
					return undefined
				}
				return mappings
			}

			how := opts.how
			if is_undefined(how) {
				how = "inner" // Default join type
			}
			validHowTypes := ["inner", "left", "right", "full", "cross"]
			if !slices.hasElement(validHowTypes, how) {
				ll.panic("Invalid 'how' value for join: '%s'. Must be one of %v", how, validHowTypes)
			}

			onOpt := _ensureArrayLocal(opts.on, "on")
			leftOnOpt := _ensureArrayLocal(opts.leftOn, "leftOn")
			rightOnOpt := _ensureArrayLocal(opts.rightOn, "rightOn")

			outputDfName := parentWorkflow._newAnonymousDataFrameId()
			joinStep := {
				type: "join",
				leftTable: dfName,
				rightTable: rightDf._getDfName(),
				outputTable: outputDfName,
				how: how
			}

			if how == "cross" {
				if !is_undefined(onOpt) || !is_undefined(leftOnOpt) || !is_undefined(rightOnOpt) {
					ll.panic("'on', 'leftOn', 'rightOn' options cannot be used with 'cross' join.")
				}
				if !is_undefined(opts.coalesce) {
					ll.panic("'coalesce' option cannot be used with 'cross' join.")
				}
			} else { // 'inner', 'left', 'right', 'full'
				if !is_undefined(onOpt) {
					if !is_undefined(leftOnOpt) || !is_undefined(rightOnOpt) {
						ll.panic("For '%s' join, you must specify either 'on' option or both 'leftOn' and 'rightOn' options. 'on' was provided, but so was 'leftOn' or 'rightOn'.", how)
					}
					ll.assert(len(onOpt) > 0, "'on' option, if provided, cannot be an empty list.")
					joinStep.leftOn = onOpt
					joinStep.rightOn = onOpt
				} else if !is_undefined(leftOnOpt) && !is_undefined(rightOnOpt) {
					ll.assert(len(leftOnOpt) > 0, "'leftOn' option, if provided, cannot be an empty list.")
					ll.assert(len(rightOnOpt) > 0, "'rightOn' option, if provided, cannot be an empty list.")
					if len(leftOnOpt) != len(rightOnOpt) {
						ll.panic("'leftOn' and 'rightOn' lists must have the same number of columns. Got %d and %d.", len(leftOnOpt), len(rightOnOpt))
					}
					joinStep.leftOn = leftOnOpt
					joinStep.rightOn = rightOnOpt
				} else {
					ll.panic("For '%s' join, you must specify either 'on' option or both 'leftOn' and 'rightOn' options.", how)
				}

				if !is_undefined(opts.coalesce) {
					if !is_bool(opts.coalesce) {
						ll.panic("'coalesce' option must be a boolean. Got: %T", opts.coalesce)
					}
					joinStep.coalesce = opts.coalesce
				}
			}

			leftCols := _validateAndSetColumnsLocal(opts.leftColumns, "leftColumns")
			if !is_undefined(leftCols) {
				joinStep.leftColumns = leftCols
			}

			rightCols := _validateAndSetColumnsLocal(opts.rightColumns, "rightColumns")
			if !is_undefined(rightCols) {
				joinStep.rightColumns = rightCols
			}

			parentWorkflow.addRawStep(joinStep)
			return _newDataFrame(parentWorkflow, outputDfName)
		},









		withoutColumns: func(...columns) {
			ll.assert(is_array(columns), "Argument to withoutColumns must be an array of column names.")
			ll.assert(len(columns) > 0, "Columns array for withoutColumns cannot be empty.")
			for i, colName in columns {
				ll.assert(is_string(colName), "Each element in the columns array for withoutColumns (at index %d) must be a string. Got: %T", i, colName)
			}

			outputDfName := parentWorkflow._newAnonymousDataFrameId()
			parentWorkflow.addRawStep({
				type: "without_columns",
				inputTable: dfName,
				outputTable: outputDfName,
				columns: columns
			})
			return _newDataFrame(parentWorkflow, outputDfName)
		}
	})
	return self
}























concat := func(dataframes, ...optionsRaw) {
	ll.assert(is_array(dataframes), "First argument to concat must be an array of DataFrame objects.")
	ll.assert(len(dataframes) > 0, "DataFrame array for concat cannot be empty.")

	opts := {}
	if len(optionsRaw) > 0 {
		if len(optionsRaw) == 1 && is_map(optionsRaw[0]) {
			opts = optionsRaw[0]
		} else {
			ll.panic("concat options must be a single map argument")
		}
	}

	parentWorkflow := undefined
	inputTableNames := []

	for i, df in dataframes {
		if is_undefined(df) || is_undefined(df._getDfName) || is_undefined(df._getWorkflow) {
			ll.panic("Argument at index %d in dataframes array is not a valid DataFrame object.", i)
		}
		currentWorkflow := df._getWorkflow()
		if is_undefined(parentWorkflow) {
			parentWorkflow = currentWorkflow
		} else if currentWorkflow._wfId != parentWorkflow._wfId {
			ll.panic("All DataFrames in a concat operation must belong to the same workflow.")
		}
		inputTableNames = append(inputTableNames, df._getDfName())
	}

	outputDfName := parentWorkflow._newAnonymousDataFrameId()

	concatenateStep := {
		type: "concatenate",
		inputTables: inputTableNames,
		outputTable: outputDfName
	}

	if !is_undefined(opts.columns) {
		if !is_array(opts.columns) {
			ll.panic("'columns' option for concat must be an array of strings.")
		}
		for i, colName in opts.columns {
			if !is_string(colName) {
				ll.panic("Each element in 'columns' option array (at index %d) must be a string. Got: %T", i, colName)
			}
		}
		concatenateStep.columns = opts.columns
	}

	parentWorkflow.addRawStep(concatenateStep)
	return _newDataFrame(parentWorkflow, outputDfName)
}






























treeJoin := func(dataframes, opts) {
	ll.assert(is_array(dataframes), "First argument to treeJoin must be an array of DataFrame objects.")
	ll.assert(len(dataframes) > 0, "DataFrame array for treeJoin cannot be empty.")

	if !is_map(opts) || is_undefined(opts.on) {
		ll.panic("treeJoin requires options with 'on' field")
	}
	if !is_string(opts.on) {
		ll.panic("'on' option must be a string")
	}


	if len(dataframes) == 1 {
		return dataframes[0]
	}


	parentWorkflow := undefined
	for i, df in dataframes {
		if is_undefined(df) || is_undefined(df._getDfName) || is_undefined(df._getWorkflow) {
			ll.panic("Argument at index %d in dataframes array is not a valid DataFrame object.", i)
		}
		currentWorkflow := df._getWorkflow()
		if is_undefined(parentWorkflow) {
			parentWorkflow = currentWorkflow
		} else if currentWorkflow._wfId != parentWorkflow._wfId {
			ll.panic("All DataFrames in a treeJoin operation must belong to the same workflow.")
		}
	}

	pfs := dataframes


	for len(pfs) > 1 {
		nextPfs := []

		for i := 1; i < len(pfs); i += 2 {
			pf1 := pfs[i - 1]
			pf2 := pfs[i]
			joined := pf1.join(pf2, { on: opts.on, how: "full", coalesce: true })
			nextPfs = append(nextPfs, joined)
		}

		if len(pfs) % 2 == 1 {
			nextPfs = append(nextPfs, pfs[len(pfs) - 1])
		}
		pfs = nextPfs
	}

	ll.assert(len(pfs) == 1, "Expected exactly one data frame, got " + string(len(pfs)))
	return pfs[0]
}

export ll.toStrict({
	p: p,
	workflow: workflow,
	concat: concat,
	treeJoin: treeJoin,

	axis: exp.axis,
	col: exp.col,
	lit: exp.lit,
	concatStr: exp.concatStr,
	minHorizontal: exp.minHorizontal,
	maxHorizontal: exp.maxHorizontal,
	allHorizontal: exp.allHorizontal,
	anyHorizontal: exp.anyHorizontal,
	and: exp.and,
	or: exp.or,
	rank: exp.rank,
	when: exp.when,
	rawExp: exp.rawExp,
	sc: exp.sc
})
