import { HUB_URL } from "../consts";
import { HubApiError, createApiError, InvalidApiResponseFormatError } from "../error";
import type {
	ApiBucketBatchResponse,
	ApiCommitHeader,
	ApiCommitLfsFile,
	ApiCommitOperation,
	ApiLfsBatchRequest,
	ApiLfsBatchResponse,
	ApiLfsCompleteMultipartRequest,
	ApiPreuploadRequest,
	ApiPreuploadResponse,
} from "../types/api/api-commit";
import type { CredentialsParams, RepoDesignation } from "../types/public";
import { checkCredentials } from "../utils/checkCredentials";
import { chunk } from "../utils/chunk";
import { promisesQueue } from "../utils/promisesQueue";
import { promisesQueueStreaming } from "../utils/promisesQueueStreaming";
import { sha256 } from "../utils/sha256";
import { toRepoId } from "../utils/toRepoId";
import { WebBlob } from "../utils/WebBlob";
import { eventToGenerator } from "../utils/eventToGenerator";
import { base64FromBytes } from "../utils/base64FromBytes";
import { isFrontend } from "../utils/isFrontend";
import { createBlobs } from "../utils/createBlobs";
import type { XetTokenParams } from "../utils/uploadShards";
import { uploadShards } from "../utils/uploadShards";
import { splitAsyncGenerator } from "../utils/splitAsyncGenerator";
import { SplicedBlob } from "../utils/SplicedBlob";

const CONCURRENT_SHAS = 5;
const CONCURRENT_LFS_UPLOADS = 5;
const MULTIPART_PARALLEL_UPLOAD = 5;

export interface CommitDeletedEntry {
	operation: "delete";
	path: string;
}

export type ContentSource = Blob | URL;

export interface CommitFile {
	operation: "addOrUpdate";
	path: string;
	content: ContentSource;
	// forceLfs?: boolean
}

/**
 * Opitmized when only the beginning or the end of the file is replaced
 *
 * todo: handle other cases
 */
export interface CommitEditFile {
	operation: "edit";
	path: string;
	/** Later, will be ContentSource. For now simpler to just handle blobs */
	originalContent: Blob;
	edits: Array<{
		/**
		 * Later, will be ContentSource. For now simpler to just handle blobs
		 *
		 * originalContent from [start, end) will be replaced by this
		 */
		content: Blob;
		/**
		 * The start position of the edit in the original content
		 */
		start: number;
		/**
		 * The end position of the edit in the original content
		 *
		 * originalContent from [start, end) will be replaced by the edit
		 */
		end: number;
	}>;
}

type CommitBlob = Omit<CommitFile, "content"> & { content: Blob };

// TODO: find a nice way to handle LFS & non-LFS files in an uniform manner, see https://github.com/huggingface/moon-landing/issues/4370
// export type CommitRenameFile = {
// 	operation: "rename";
// 	path:      string;
// 	oldPath:   string;
// 	content?:  ContentSource;
// };

/**
 * Server-side copy of a file from a source repo/bucket to the destination repo.
 *
 * Only supported when the destination repo is a bucket. The source file must be xet-backed,
 * so the caller is responsible for resolving the source path to its {@link sourceXetHash}
 * (typically via {@link pathsInfo} or {@link listFiles}).
 *
 * For higher-level helpers that perform the resolution and handle non-xet source files,
 * see {@link copyFile}, {@link copyFiles} and {@link copyFolder}.
 */
export interface CommitCopyFile {
	operation: "copy";
	path: string;
	sourceXetHash: string;
	sourceRepo: RepoDesignation;
}

export type CommitOperation =
	| CommitDeletedEntry
	| CommitFile
	| CommitEditFile
	| CommitCopyFile /* | CommitRenameFile */;
type CommitBlobOperation = Exclude<CommitOperation, CommitFile> | CommitBlob;

export type CommitParams = {
	title: string;
	description?: string;
	repo: RepoDesignation;
	operations: CommitOperation[];
	/** @default "main" */
	branch?: string;
	/**
	 * Parent commit. Optional
	 *
	 * - When opening a PR: will use parentCommit as the parent commit
	 * - When committing on a branch: Will make sure that there were no intermediate commits
	 */
	parentCommit?: string;
	isPullRequest?: boolean;
	hubUrl?: string;
	/**
	 * Whether to use web workers to compute SHA256 hashes.
	 *
	 * @default false
	 */
	useWebWorkers?: boolean | { minSize?: number; poolSize?: number };
	/**
	 * Maximum depth of folders to upload. Files deeper than this will be ignored
	 *
	 * @default 5
	 */
	maxFolderDepth?: number;
	/**
	 * Custom fetch function to use instead of the default one, for example to use a proxy or edit headers.
	 */
	fetch?: typeof fetch;
	abortSignal?: AbortSignal;
	/**
	 * @default true
	 *
	 * Use xet protocol: https://huggingface.co/blog/xet-on-the-hub to upload, rather than a basic S3 PUT
	 */
	useXet?: boolean;
	// Credentials are optional due to custom fetch functions or cookie auth
} & Partial<CredentialsParams>;

export interface CommitOutput {
	pullRequestUrl?: string;
	commit: {
		oid: string;
		url: string;
	};
	hookOutput: string;
}

function isFileOperation(op: CommitOperation): op is CommitBlob {
	const ret = op.operation === "addOrUpdate";

	if (ret && !(op.content instanceof Blob)) {
		throw new TypeError("Precondition failed: op.content should be a Blob");
	}

	return ret;
}

export type CommitProgressEvent =
	| {
			event: "phase";
			phase: "preuploading" | "uploadingLargeFiles" | "committing";
	  }
	| {
			event: "fileProgress";
			path: string;
			progress: number;
			state: "hashing" | "uploading" | "error";
	  };

/**
 * Internal function for now, used by commit.
 *
 * Can be exposed later to offer fine-tuned progress info
 *
 * CommitOutput is not present for bucket commits
 */
export async function* commitIter(params: CommitParams): AsyncGenerator<CommitProgressEvent, CommitOutput | undefined> {
	const accessToken = checkCredentials(params);
	const repoId = toRepoId(params.repo);

	if (repoId.type === "bucket") {
		return yield* commitIterBucket(params);
	}

	if (params.operations.some((op) => op.operation === "copy")) {
		throw new Error("'copy' operations are only supported when the destination repo is a bucket");
	}

	yield { event: "phase", phase: "preuploading" };

	let useXet = params.useXet ?? true;

	const lfsShas = new Map<string, string | null>();

	const abortController = new AbortController();
	const abortSignal = abortController.signal;

	// Polyfill see https://discuss.huggingface.co/t/why-cant-i-upload-a-parquet-file-to-my-dataset-error-o-throwifaborted-is-not-a-function/62245
	if (!abortSignal.throwIfAborted) {
		abortSignal.throwIfAborted = () => {
			if (abortSignal.aborted) {
				throw new DOMException("Aborted", "AbortError");
			}
		};
	}

	if (params.abortSignal) {
		params.abortSignal.addEventListener("abort", () => abortController.abort());
	}

	try {
		const allOperations = (
			await Promise.all(
				params.operations.map(async (operation) => {
					if (operation.operation === "edit") {
						// Convert EditFile operation to a file operation with SplicedBlob
						const splicedBlob = SplicedBlob.create(
							operation.originalContent,
							operation.edits.map((splice) => ({ insert: splice.content, start: splice.start, end: splice.end })),
						);
						return {
							operation: "addOrUpdate" as const,
							path: operation.path,
							content: splicedBlob,
						};
					}

					if (operation.operation !== "addOrUpdate") {
						return operation;
					}

					if (!(operation.content instanceof URL)) {
						/** TS trick to enforce `content` to be a `Blob` */
						return { ...operation, content: operation.content };
					}

					const lazyBlobs = await createBlobs(operation.content, operation.path, {
						fetch: params.fetch,
						maxFolderDepth: params.maxFolderDepth,
					});

					abortSignal?.throwIfAborted();

					return lazyBlobs.map((blob) => ({
						...operation,
						content: blob.blob,
						path: blob.path,
					}));
				}),
			)
		).flat(1);

		const gitAttributes = allOperations.filter(isFileOperation).find((op) => op.path === ".gitattributes")?.content;

		for (const operations of chunk(allOperations.filter(isFileOperation), 100)) {
			const payload: ApiPreuploadRequest = {
				gitAttributes: gitAttributes && (await gitAttributes.text()),
				files: await Promise.all(
					operations.map(async (operation) => ({
						path: operation.path,
						size: operation.content.size,
						sample: base64FromBytes(new Uint8Array(await operation.content.slice(0, 512).arrayBuffer())),
					})),
				),
			};

			abortSignal?.throwIfAborted();

			const res = await (params.fetch ?? fetch)(
				`${params.hubUrl ?? HUB_URL}/api/${repoId.type}s/${repoId.name}/preupload/${encodeURIComponent(
					params.branch ?? "main",
				)}` + (params.isPullRequest ? "?create_pr=1" : ""),
				{
					method: "POST",
					headers: {
						...(accessToken && { Authorization: `Bearer ${accessToken}` }),
						"Content-Type": "application/json",
					},
					body: JSON.stringify(payload),
					signal: abortSignal,
				},
			);

			if (!res.ok) {
				throw await createApiError(res);
			}

			const json: ApiPreuploadResponse = await res.json();

			for (const file of json.files) {
				if (file.uploadMode === "lfs") {
					lfsShas.set(file.path, null);
				}
			}
		}

		yield { event: "phase", phase: "uploadingLargeFiles" };

		for (const operations of chunk(
			allOperations.filter(isFileOperation).filter((op) => lfsShas.has(op.path)),
			100,
		)) {
			const shas = yield* eventToGenerator<
				{ event: "fileProgress"; state: "hashing"; path: string; progress: number },
				string[]
			>((yieldCallback, returnCallback, rejectCallack) => {
				return promisesQueue(
					operations.map((op) => async () => {
						const iterator = sha256(op.content, { useWebWorker: params.useWebWorkers, abortSignal: abortSignal });
						let res: IteratorResult<number, string>;
						do {
							res = await iterator.next();
							if (!res.done) {
								yieldCallback({ event: "fileProgress", path: op.path, progress: res.value, state: "hashing" });
							}
						} while (!res.done);
						const sha = res.value;
						lfsShas.set(op.path, res.value);
						return sha;
					}),
					CONCURRENT_SHAS,
				).then(returnCallback, rejectCallack);
			});

			abortSignal?.throwIfAborted();

			const payload: ApiLfsBatchRequest = {
				operation: "upload",
				// multipart is a custom protocol for HF
				transfers: ["basic", "multipart", ...(useXet ? ["xet" as const] : [])],
				hash_algo: "sha_256",
				...(!params.isPullRequest && {
					ref: {
						name: params.branch ?? "main",
					},
				}),
				objects: operations.map((op, i) => ({
					oid: shas[i],
					size: op.content.size,
				})),
			};

			const res = await (params.fetch ?? fetch)(
				`${params.hubUrl ?? HUB_URL}/${repoId.type === "model" ? "" : repoId.type + "s/"}${
					repoId.name
				}.git/info/lfs/objects/batch`,
				{
					method: "POST",
					headers: {
						...(accessToken && { Authorization: `Bearer ${accessToken}` }),
						Accept: "application/vnd.git-lfs+json",
						"Content-Type": "application/vnd.git-lfs+json",
					},
					body: JSON.stringify(payload),
					signal: abortSignal,
				},
			);

			if (!res.ok) {
				throw await createApiError(res);
			}

			const json: ApiLfsBatchResponse = await res.json();
			const batchRequestId = res.headers.get("X-Request-Id") || undefined;

			const shaToOperation = new Map(operations.map((op, i) => [shas[i], op]));

			if (useXet && json.transfer !== "xet") {
				useXet = false;
			}

			let xetParams: XetTokenParams | null = null;

			if (useXet) {
				// First get all the files that are already uploaded out of the way
				for (const obj of json.objects) {
					const op = shaToOperation.get(obj.oid);
					if (!op) {
						throw new InvalidApiResponseFormatError("Unrequested object ID in response");
					}

					if (obj.error) {
						const errorMessage = `Error while doing LFS batch call for ${operations[shas.indexOf(obj.oid)].path}: ${
							obj.error.message
						}${batchRequestId ? ` - Request ID: ${batchRequestId}` : ""}`;
						throw new HubApiError(res.url, obj.error.code, batchRequestId, errorMessage);
					}

					if (!obj.actions?.upload) {
						// Already uploaded
						yield {
							event: "fileProgress",
							path: op.path,
							progress: 1,
							state: "uploading",
						};
					} else {
						const headers = new Headers(obj.actions.upload.header);

						xetParams = {
							sessionId: headers.get("X-Xet-Session-Id") ?? undefined,
							casUrl: headers.get("X-Xet-Cas-Url") ?? undefined,
							accessToken: headers.get("X-Xet-Access-Token") ?? undefined,
							expiresAt: headers.get("X-Xet-Token-Expiration")
								? new Date(parseInt(headers.get("X-Xet-Token-Expiration") ?? "0") * 1000)
								: undefined,
							refreshWriteTokenUrl: obj.actions.upload.href,
						};
					}
				}
				const source = (async function* () {
					for (const obj of json.objects) {
						const op = shaToOperation.get(obj.oid);
						if (!op || !obj.actions?.upload) {
							continue;
						}
						abortSignal?.throwIfAborted();
						yield { content: op.content, path: op.path, sha256: obj.oid };
					}
				})();
				if (xetParams) {
					const fixedXetParams = xetParams;
					const sources = splitAsyncGenerator(source, 5);
					yield* eventToGenerator((yieldCallback, returnCallback, rejectCallback) =>
						Promise.all(
							sources.map(async function (source) {
								for await (const event of uploadShards(source, {
									fetch: params.fetch,
									accessToken,
									hubUrl: params.hubUrl ?? HUB_URL,
									repo: repoId,
									xetParams: fixedXetParams,
									// todo: maybe leave empty if PR?
									rev: params.branch ?? "main",
									isPullRequest: params.isPullRequest,
									yieldCallback: (event) => yieldCallback({ ...event, state: "uploading" }),
								})) {
									if (event.event === "file") {
										yieldCallback({
											event: "fileProgress" as const,
											path: event.path,
											progress: 1,
											state: "uploading" as const,
										});
									} else if (event.event === "fileProgress") {
										yieldCallback({
											event: "fileProgress" as const,
											path: event.path,
											progress: event.progress,
											state: "uploading" as const,
										});
									}
								}
							}),
						).then(() => returnCallback(undefined), rejectCallback),
					);
				} else {
					// No LFS file to upload
				}
			} else {
				yield* eventToGenerator<CommitProgressEvent, void>((yieldCallback, returnCallback, rejectCallback) => {
					return promisesQueueStreaming(
						json.objects.map((obj) => async () => {
							const op = shaToOperation.get(obj.oid);

							if (!op) {
								throw new InvalidApiResponseFormatError("Unrequested object ID in response");
							}

							abortSignal?.throwIfAborted();

							if (obj.error) {
								const errorMessage = `Error while doing LFS batch call for ${operations[shas.indexOf(obj.oid)].path}: ${
									obj.error.message
								}${batchRequestId ? ` - Request ID: ${batchRequestId}` : ""}`;
								throw new HubApiError(res.url, obj.error.code, batchRequestId, errorMessage);
							}
							if (!obj.actions?.upload) {
								// Already uploaded
								yieldCallback({
									event: "fileProgress",
									path: op.path,
									progress: 1,
									state: "uploading",
								});
								return;
							}
							yieldCallback({
								event: "fileProgress",
								path: op.path,
								progress: 0,
								state: "uploading",
							});
							const content = op.content;

							const header = obj.actions.upload.header;
							if (header?.chunk_size) {
								const chunkSize = parseInt(header.chunk_size);

								// multipart upload
								// parts are in upload.header['00001'] to upload.header['99999']

								const completionUrl = obj.actions.upload.href;
								const parts = Object.keys(header).filter((key) => /^[0-9]+$/.test(key));

								if (parts.length !== Math.ceil(content.size / chunkSize)) {
									throw new Error("Invalid server response to upload large LFS file, wrong number of parts");
								}

								const completeReq: ApiLfsCompleteMultipartRequest = {
									oid: obj.oid,
									parts: parts.map((part) => ({
										partNumber: +part,
										etag: "",
									})),
								};

								// Defined here so that it's not redefined at each iteration (and the caller can tell it's for the same file)
								const progressCallback = (progress: number) =>
									yieldCallback({ event: "fileProgress", path: op.path, progress, state: "uploading" });

								await promisesQueueStreaming(
									parts.map((part) => async () => {
										abortSignal?.throwIfAborted();

										const index = parseInt(part) - 1;
										const slice = content.slice(index * chunkSize, (index + 1) * chunkSize);

										const res = await (params.fetch ?? fetch)(header[part], {
											method: "PUT",
											/** Unfortunately, browsers don't support our inherited version of Blob in fetch calls */
											body: slice instanceof WebBlob && isFrontend ? await slice.arrayBuffer() : slice,
											signal: abortSignal,
											...({
												progressHint: {
													path: op.path,
													part: index,
													numParts: parts.length,
													progressCallback,
												},
												// eslint-disable-next-line @typescript-eslint/no-explicit-any
											} as any),
										});

										if (!res.ok) {
											throw await createApiError(res, {
												requestId: batchRequestId,
												message: `Error while uploading part ${part} of ${
													operations[shas.indexOf(obj.oid)].path
												} to LFS storage`,
											});
										}

										const eTag = res.headers.get("ETag");

										if (!eTag) {
											throw new Error("Cannot get ETag of part during multipart upload");
										}

										completeReq.parts[Number(part) - 1].etag = eTag;
									}),
									MULTIPART_PARALLEL_UPLOAD,
								);

								abortSignal?.throwIfAborted();

								const res = await (params.fetch ?? fetch)(completionUrl, {
									method: "POST",
									body: JSON.stringify(completeReq),
									headers: {
										Accept: "application/vnd.git-lfs+json",
										"Content-Type": "application/vnd.git-lfs+json",
									},
									signal: abortSignal,
								});

								if (!res.ok) {
									throw await createApiError(res, {
										requestId: batchRequestId,
										message: `Error completing multipart upload of ${
											operations[shas.indexOf(obj.oid)].path
										} to LFS storage`,
									});
								}

								yieldCallback({
									event: "fileProgress",
									path: op.path,
									progress: 1,
									state: "uploading",
								});
							} else {
								const res = await (params.fetch ?? fetch)(obj.actions.upload.href, {
									method: "PUT",
									headers: {
										...(batchRequestId ? { "X-Request-Id": batchRequestId } : undefined),
									},
									/** Unfortunately, browsers don't support our inherited version of Blob in fetch calls */
									body: content instanceof WebBlob && isFrontend ? await content.arrayBuffer() : content,
									signal: abortSignal,
									...({
										progressHint: {
											path: op.path,
											progressCallback: (progress: number) =>
												yieldCallback({
													event: "fileProgress",
													path: op.path,
													progress,
													state: "uploading",
												}),
										},
										// eslint-disable-next-line @typescript-eslint/no-explicit-any
									} as any),
								});

								if (!res.ok) {
									throw await createApiError(res, {
										requestId: batchRequestId,
										message: `Error while uploading ${operations[shas.indexOf(obj.oid)].path} to LFS storage`,
									});
								}

								yieldCallback({
									event: "fileProgress",
									path: op.path,
									progress: 1,
									state: "uploading",
								});
							}
						}),
						CONCURRENT_LFS_UPLOADS,
					).then(returnCallback, rejectCallback);
				});
			}
		}

		abortSignal?.throwIfAborted();

		yield { event: "phase", phase: "committing" };

		return yield* eventToGenerator<CommitProgressEvent, CommitOutput>(
			async (yieldCallback, returnCallback, rejectCallback) =>
				(params.fetch ?? fetch)(
					`${params.hubUrl ?? HUB_URL}/api/${repoId.type}s/${repoId.name}/commit/${encodeURIComponent(
						params.branch ?? "main",
					)}` + (params.isPullRequest ? "?create_pr=1" : ""),
					{
						method: "POST",
						headers: {
							...(accessToken && { Authorization: `Bearer ${accessToken}` }),
							"Content-Type": "application/x-ndjson",
						},
						body: [
							{
								key: "header",
								value: {
									summary: params.title,
									description: params.description,
									parentCommit: params.parentCommit,
								} satisfies ApiCommitHeader,
							},
							...((await Promise.all(
								allOperations.map((operation) => {
									if (isFileOperation(operation)) {
										const sha = lfsShas.get(operation.path);
										if (sha) {
											return {
												key: "lfsFile",
												value: {
													path: operation.path,
													algo: "sha256",
													size: operation.content.size,
													oid: sha,
												} satisfies ApiCommitLfsFile,
											};
										}
									}

									return convertOperationToNdJson(operation);
								}),
							)) satisfies ApiCommitOperation[]),
						]
							.map((x) => JSON.stringify(x))
							.join("\n"),
						signal: abortSignal,
						...({
							progressHint: {
								progressCallback: (progress: number) => {
									// For now, we display equal progress for all files
									// We could compute the progress based on the size of `convertOperationToNdJson` for each of the files instead
									for (const op of allOperations) {
										if (isFileOperation(op) && !lfsShas.has(op.path)) {
											yieldCallback({
												event: "fileProgress",
												path: op.path,
												progress,
												state: "uploading",
											});
										}
									}
								},
							},
							// eslint-disable-next-line @typescript-eslint/no-explicit-any
						} as any),
					},
				)
					.then(async (res) => {
						if (!res.ok) {
							throw await createApiError(res);
						}

						const json = await res.json();

						returnCallback({
							pullRequestUrl: json.pullRequestUrl,
							commit: {
								oid: json.commitOid,
								url: json.commitUrl,
							},
							hookOutput: json.hookOutput,
						});
					})
					.catch(rejectCallback),
		);
	} catch (err) {
		// For parallel requests, cancel them all if one fails
		abortController.abort();
		throw err;
	}
}

export async function* commitIterBucket(params: CommitParams): AsyncGenerator<CommitProgressEvent> {
	const accessToken = checkCredentials(params);
	const repoId = toRepoId(params.repo);

	if (params.useXet === false) {
		throw new Error("useXet must be true or undefined for buckets");
	}

	const abortController = new AbortController();
	const abortSignal = abortController.signal;

	// Polyfill see https://discuss.huggingface.co/t/why-cant-i-upload-a-parquet-file-to-my-dataset-error-o-throwifaborted-is-not-a-function/62245
	if (!abortSignal.throwIfAborted) {
		abortSignal.throwIfAborted = () => {
			if (abortSignal.aborted) {
				throw new DOMException("Aborted", "AbortError");
			}
		};
	}

	if (params.abortSignal) {
		params.abortSignal.addEventListener("abort", () => abortController.abort());
	}

	try {
		const allOperations = (
			await Promise.all(
				params.operations.map(async (operation) => {
					if (operation.operation === "edit") {
						// Convert EditFile operation to a file operation with SplicedBlob
						const splicedBlob = SplicedBlob.create(
							operation.originalContent,
							operation.edits.map((splice) => ({ insert: splice.content, start: splice.start, end: splice.end })),
						);
						return {
							operation: "addOrUpdate" as const,
							path: operation.path,
							content: splicedBlob,
						};
					}

					if (operation.operation !== "addOrUpdate") {
						return operation;
					}

					if (!(operation.content instanceof URL)) {
						/** TS trick to enforce `content` to be a `Blob` */
						return { ...operation, content: operation.content };
					}

					const lazyBlobs = await createBlobs(operation.content, operation.path, {
						fetch: params.fetch,
						maxFolderDepth: params.maxFolderDepth,
					});

					abortSignal?.throwIfAborted();

					return lazyBlobs.map((blob) => ({
						...operation,
						content: blob.blob,
						path: blob.path,
					}));
				}),
			)
		).flat(1);

		yield { event: "phase", phase: "uploadingLargeFiles" };

		for (const operations of chunk(allOperations.filter(isFileOperation), 100)) {
			const xetHashes = new Map<string, string>();
			abortSignal?.throwIfAborted();

			// First get all the files that are already uploaded out of the way

			const source = (async function* () {
				for (const operation of operations) {
					abortSignal?.throwIfAborted();
					yield { content: operation.content, path: operation.path };
				}
			})();

			const xetParams: XetTokenParams = {
				sessionId: crypto.randomUUID(),
				refreshWriteTokenUrl: `${params.hubUrl ?? HUB_URL}/api/${repoId.type}s/${repoId.name}/xet-write-token`,
			};
			const sources = splitAsyncGenerator(source, 5);
			yield* eventToGenerator((yieldCallback, returnCallback, rejectCallback) =>
				Promise.all(
					sources.map(async function (source) {
						for await (const event of uploadShards(source, {
							fetch: params.fetch,
							accessToken,
							hubUrl: params.hubUrl ?? HUB_URL,
							repo: repoId,
							xetParams,
							rev: params.branch ?? "main",
							yieldCallback: (event) => yieldCallback({ ...event, state: "uploading" }),
						})) {
							if (event.event === "file") {
								yieldCallback({
									event: "fileProgress" as const,
									path: event.path,
									progress: 1,
									state: "uploading" as const,
								});
								xetHashes.set(event.path, event.xetHash);
							} else if (event.event === "fileProgress") {
								yieldCallback({
									event: "fileProgress" as const,
									path: event.path,
									progress: event.progress,
									state: "uploading" as const,
								});
							}
						}
					}),
				).then(() => returnCallback(undefined), rejectCallback),
			);

			const resp = await (params.fetch ?? fetch)(
				`${params.hubUrl ?? HUB_URL}/api/${repoId.type}s/${repoId.name}/batch`,
				{
					method: "POST",
					headers: {
						...(accessToken && { Authorization: `Bearer ${accessToken}` }),
						"Content-Type": "application/x-ndjson",
					},
					body: [...xetHashes.entries()]
						.map(([path, xetHash]) =>
							JSON.stringify({
								type: "addFile",
								path,
								xetHash,
							}),
						)
						.join("\n"),
					signal: abortSignal,
				},
			);

			if (!resp.ok && resp.status !== 422) {
				throw await createApiError(resp);
			}

			const json = (await resp.json()) as ApiBucketBatchResponse;

			for (const failed of json.failed) {
				yield {
					event: "fileProgress",
					path: failed.path,
					progress: 0,
					state: "error",
				};
			}
		}

		abortSignal?.throwIfAborted();

		const copyOperations = allOperations.filter(
			(operation): operation is CommitCopyFile => operation.operation === "copy",
		);

		for (const copyChunk of chunk(copyOperations, 100)) {
			abortSignal?.throwIfAborted();

			const resp = await (params.fetch ?? fetch)(
				`${params.hubUrl ?? HUB_URL}/api/${repoId.type}s/${repoId.name}/batch`,
				{
					method: "POST",
					headers: {
						...(accessToken && { Authorization: `Bearer ${accessToken}` }),
						"Content-Type": "application/x-ndjson",
					},
					body: copyChunk
						.map((op) => {
							const sourceRepoId = toRepoId(op.sourceRepo);
							return JSON.stringify({
								type: "copyFile",
								path: op.path,
								xetHash: op.sourceXetHash,
								sourceRepoType: sourceRepoId.type,
								sourceRepoId: sourceRepoId.name,
							});
						})
						.join("\n"),
					signal: abortSignal,
				},
			);

			if (!resp.ok && resp.status !== 422) {
				throw await createApiError(resp);
			}

			const json = (await resp.json()) as ApiBucketBatchResponse;

			for (const failed of json.failed) {
				yield {
					event: "fileProgress",
					path: failed.path,
					progress: 0,
					state: "error",
				};
			}
		}

		abortSignal?.throwIfAborted();

		const deletedOperations = allOperations.filter((operation) => operation.operation === "delete");

		if (deletedOperations.length > 0) {
			const resp = await (params.fetch ?? fetch)(
				`${params.hubUrl ?? HUB_URL}/api/${repoId.type}s/${repoId.name}/batch`,
				{
					method: "POST",
					headers: {
						...(accessToken && { Authorization: `Bearer ${accessToken}` }),
						"Content-Type": "application/x-ndjson",
					},
					body: deletedOperations
						.map((operation) =>
							JSON.stringify({
								type: "deleteFile",
								path: operation.path,
							}),
						)
						.join("\n"),
					signal: abortSignal,
				},
			);

			if (!resp.ok) {
				throw await createApiError(resp);
			}

			const json = await resp.json();

			if (json.failed.length > 0) {
				const failedPaths = json.failed.slice(0, 5).map((f: { path: string }) => f.path);
				throw new Error(
					`Failed to delete ${json.failed.length} file(s): ${failedPaths.join(", ")}${json.failed.length > 5 ? "..." : ""}, request ID: ${resp.headers.get("X-Request-Id")}`,
				);
			}
		}

		abortSignal?.throwIfAborted();
	} catch (err) {
		// For parallel requests, cancel them all if one fails
		abortController.abort();
		throw err;
	}
}

/**
 * @returns undefined for bucket uploads, CommitOutput otherwise
 */
export async function commit(params: CommitParams): Promise<CommitOutput | undefined> {
	const iterator = commitIter(params);
	const failedPaths: string[] = [];
	let failedCount = 0;
	let res = await iterator.next();
	while (!res.done) {
		if (res.value.event === "fileProgress" && res.value.state === "error") {
			failedCount++;
			if (failedPaths.length < 5) {
				failedPaths.push(res.value.path);
			}
		}
		res = await iterator.next();
	}
	if (failedCount > 0) {
		throw new Error(
			`Failed to upload ${failedCount} file(s): ${failedPaths.join(", ")}${failedCount > 5 ? "..." : ""}`,
		);
	}
	return res.value;
}

async function convertOperationToNdJson(operation: CommitBlobOperation): Promise<ApiCommitOperation> {
	switch (operation.operation) {
		case "addOrUpdate": {
			// todo: handle LFS
			return {
				key: "file",
				value: {
					content: base64FromBytes(new Uint8Array(await operation.content.arrayBuffer())),
					path: operation.path,
					encoding: "base64",
				},
			};
		}
		// case "rename": {
		// 	// todo: detect when remote file is already LFS, and in that case rename as LFS
		// 	return {
		// 		key:   "file",
		// 		value: {
		// 			content: operation.content,
		// 			path:    operation.path,
		// 			oldPath: operation.oldPath
		// 		}
		// 	};
		// }
		case "delete": {
			return {
				key: "deletedFile",
				value: {
					path: operation.path,
				},
			};
		}
		case "edit": {
			// Note: By the time we get here, splice operations should have been converted to addOrUpdate operations with SplicedBlob
			// But we handle this case for completeness
			throw new Error(
				"Edit operations should be converted to addOrUpdate operations before reaching convertOperationToNdJson",
			);
		}
		default:
			throw new TypeError("Unknown operation: " + (operation as { operation: string }).operation);
	}
}
