"use strict"; var __create = Object.create; var __defProp = Object.defineProperty; var __getOwnPropDesc = Object.getOwnPropertyDescriptor; var __getOwnPropNames = Object.getOwnPropertyNames; var __getProtoOf = Object.getPrototypeOf; var __hasOwnProp = Object.prototype.hasOwnProperty; var __export = (target, all) => { for (var name in all) __defProp(target, name, { get: all[name], enumerable: true }); }; var __copyProps = (to, from, except, desc) => { if (from && typeof from === "object" || typeof from === "function") { for (let key of __getOwnPropNames(from)) if (!__hasOwnProp.call(to, key) && key !== except) __defProp(to, key, { get: () => from[key], enumerable: !(desc = __getOwnPropDesc(from, key)) || desc.enumerable }); } return to; }; var __toESM = (mod, isNodeMode, target) => (target = mod != null ? __create(__getProtoOf(mod)) : {}, __copyProps( isNodeMode || !mod || !mod.__esModule ? __defProp(target, "default", { value: mod, enumerable: true }) : target, mod )); var __toCommonJS = (mod) => __copyProps(__defProp({}, "__esModule", { value: true }), mod); // src/index.ts var src_exports = {}; __export(src_exports, { Client: () => Client }); module.exports = __toCommonJS(src_exports); // src/client.ts var import_node_net = __toESM(require("net"), 1); var import_debug = __toESM(require("debug"), 1); var log = (0, import_debug.default)("callbag-net:client"); function Client(socketOptions, { connectionListener = void 0, retries = 0, reconnectOnEnd = false } = {}) { let ready = false; let socket = null; let timeout = void 0; let attempts = 0; let finished = false; const printOptions = JSON.stringify(socketOptions); const buffer = []; const sinks = []; return (signal, payload) => { switch (signal) { case 0 /* START */: if (!isSinkTalkback(signal, payload)) throw new Error("SourceInitiator not passed to handshakeFactory"); handshakeFactory()(signal, payload); break; case 1 /* DATA */: if (isData(payload)) data(payload); break; case 2 /* END */: if (isError(payload) || payload === void 0) end(payload); break; default: if (likelyPullableSourceTalkback(signal)) pull(signal); break; } function handshakeFactory() { return (_start, sink) => { addSink(sink); connect(); sink(0 /* START */, talkback); function talkback() { return (signal2, error) => { if (isEndSignal(signal2)) { removeSink(sink); if (error) log("Received an error from sink", error); } if (sinks.length === 0) { disconnect(); } }; } }; } function data(payload2) { socket == null ? void 0 : socket.write(payload2); } function end(error) { sendToSinks(2 /* END */, error); disconnect(); } function pull(talkback) { talkback(1 /* DATA */); } }; function handshakeBack(sink, sourceTalkback) { log("Sending START handshake to sink"); sink(0 /* START */, sourceTalkback); } function deliver(sink, data) { log(`Sending DATA to sink`, data == null ? void 0 : data.toString(), data); sink(1 /* DATA */, data); } function terminate(sink, error) { log(`Sending END sink`, error == null ? void 0 : error.toString()); if (error) sink(2 /* END */, error); else sink(2 /* END */); } function addSink(sink) { log("Adding sink"); sinks.push(sink); } function removeSink(sink) { log("Removing sink"); if (sinks.includes(sink)) sinks.splice(sinks.indexOf(sink), 1); } function sendToSinks(signal, payload) { if (socket && ready) sinks.forEach((sink) => { if (isStartSignal(signal) && likelySourceTalkBack(payload)) handshakeBack(sink, payload); else if (isDataSignal(signal) && isData(payload)) deliver(sink, payload); else if (isEndSignal(signal) && (isError(payload) || payload === void 0)) terminate(sink, payload); }); else if (payload) buffer.push(payload); } function connect() { if (!socket) { socket = new import_node_net.default.Socket(); socket.on("connect", (error) => { clearTimeout(timeout); if (error) throw new Error(`Error connecting with options: ${printOptions} ${error}`); log(`Connected with: ${printOptions}`); let payload; while (payload = buffer.pop()) { if (likelySourceTalkBack(payload)) { continue; } if (isError(payload)) { continue; } if (isData(payload)) socket == null ? void 0 : socket.write(payload); } }); socket.on("ready", () => { log("socket ready"); ready = true; }); socket.on("data", (chunk) => { log(`Received from: ${printOptions}:`, chunk.toString()); sendToSinks(1 /* DATA */, chunk); }); socket.on("end", () => { log("socket end"); if (reconnectOnEnd) { tryReconnect(() => { sendToSinks(2 /* END */); disconnect(); }); } else { log(`Disconnecting from ${printOptions} on end`); sendToSinks(2 /* END */); disconnect(); } }); socket.on("close", (error) => { log("socket close", error ? error : ""); tryReconnect(() => { sendToSinks(2 /* END */); disconnect(); }); sendToSinks(2 /* END */, error ? error.toString() : void 0); }); socket.on("error", (error) => { log("socket close", error); if (reconnectOnEnd) { tryReconnect(() => { log(error); sendToSinks(2 /* END */, error ? error.toString() : void 0); disconnect(); }); } else { log(`Disconnecting from ${printOptions}`, error); sendToSinks(2 /* END */); disconnect(); } }); socket.connect(socketOptions, connectionListener); } } function disconnect() { log("disconnect called"); socket == null ? void 0 : socket.destroy(); } function tryReconnect(callback) { const shouldReconnect = () => retries === -1 || !finished && attempts < retries; if (retries === -1) finished = false; if (shouldReconnect()) { log(`Attempting reconnection to ${printOptions} ${retries === -1 ? "for eternity" : `: attempt ${attempts + 1} of ${retries}`}`); reconnect(); } else { log(`Ending reconnection attempts to ${printOptions}`); if (callback) callback(); disconnect(); socket = null; } } function reconnect() { clearTimeout(timeout); timeout = setTimeout(() => socket == null ? void 0 : socket.connect( socketOptions, () => { attempts = 0; } ), 1e3); attempts++; if (attempts >= retries && retries !== -1) { disconnect(); finished = true; } } } function isStartSignal(signal) { return signal === 0; } function isDataSignal(signal) { return signal === 1; } function isEndSignal(signal) { return signal === 2; } function likelyCallbag(fn) { return isFunction(fn) && (fn.length === 2 || fn.length === 3); } function likelySourceTalkBack(fn) { return isFunction(fn) && fn.length === 1; } function likelyPullableSourceTalkback(fn) { return likelySourceTalkBack(fn); } function isSinkTalkback(signal, payload) { return isStartSignal(signal) && likelyCallbag(payload); } function isFunction(fn) { return typeof fn === "function" || false; } function isString(string) { return !!(typeof string === "string" || string instanceof String); } function isError(payload) { return payload instanceof Error || isString(payload); } function isData(payload) { return Buffer.isBuffer(payload) || Uint8Array.name === "Uint8Array"; } // Annotate the CommonJS export names for ESM import in node: 0 && (module.exports = { Client }); //# sourceMappingURL=index.cjs.map