import { Router, Request, Response } from 'express';

// internal interface used to manage the content associated with a topic
interface Topic {
	lastEventId: number;
	data: string | undefined;
	subscribers: Response[];
}

// internatl interface used to manage a dictionary of topics keyed on topic name
interface Topics {
	[id: string]: Topic;
}

// find or create a topic given its name
function getTopic(topics: Topics, topic: string): Topic {
	return topics[topic] || (topics[topic] = { lastEventId: -1, data: undefined, subscribers: [] });
}

// write a single message to a client
function sendEvent(client: Response, eventId: number, data: string): void {
	setImmediate((client: Response, eventId: number, data: string) => {
		client.write(`id:${eventId}\ndata:${data}\n\n`);
	}, client, eventId, data);
}

/**
 * Creates an instance of a message broker server.
 * Many message broker servers may be created, each bound to a different base url.
 * @param cacheLastMessage When true, subscribers will receive the last message upon subscrition.
 * @returns Returns an express Router for use within an express application.
 */
export function server(cacheLastMessage: boolean = false): Router {
	const router = Router();
	const topics: Topics = {};

	// GET method is used to subscribe by EventSource clients
	router.get('*', (req: Request, res: Response) => {
		var topic = getTopic(topics, req.url);

		// remove the subscription when the connection closes
		req.on('close', () => {
			topic.subscribers.splice(topic.subscribers.indexOf(res), 1);
		});

		// create the subscription
		topic.subscribers.push(res);

		// set the response headers to specify this is an event stream
		res.set({ 'Content-Type': 'text/event-stream', 'Cache-Control': 'no-cache', 'Connection': 'keep-alive' });

		// update the client with the last message if required
		if (cacheLastMessage && topic.data) {
			sendEvent(res, topic.lastEventId, topic.data);
		}
	});

	router.post('*', (req: Request, res: Response) => {
		var topic = getTopic(topics, req.url);
		var body: Array<Buffer> = [];

		// read the post body
		req.on('data', (chunk: Buffer): void => {
			body.push(chunk);
		});

		req.on('end', (): void => {
			// update the topic with the new event details
			topic.data = Buffer.concat(body).toString();
			topic.lastEventId = (topic.lastEventId === Number.MAX_VALUE ? -1 : topic.lastEventId) + 1;

			// queue dispatch of event to all current subscribers
			topic.subscribers.forEach((subscriber) => {
				sendEvent(subscriber, topic.lastEventId, topic.data!); // NOTE: as we have just set topic.data we know it's not undefined
			});
		});

		// send response to publisher
		res.sendStatus(200);
	});

	return router
}