/*
 * P&F Demand Response Engine
 *
 * Interface to live data feeds.
 */

import Debug from 'debug';
const debug = Debug('pf-dre-ems-web:live');

const URL_WEBSOCKET = process.env.REACT_APP_WEBSOCKET_URL || '';
const DRE_WEBSOCKET_TOKEN = 'pf-dre-ems-web';

export type TMetrics = Record<string, any>;
type LiveDataCallback = (metrics: Record<string, TMetrics[]>) => void;

export type TSubscriptionID = number;

// Global unique ID for websocket subscriptions.
let idNextSubscription: TSubscriptionID = 1;

class LiveData {
	private subscribedDevices: Record<string, TSubscriptionID[]>;
	private callbacks: Record<TSubscriptionID, LiveDataCallback>;
	private socket!: WebSocket;

	// Are we currently connected to the Websocket?
	private connected: boolean;

	// Are we in the process of connecting to the Websocket?
	private connecting: boolean;

	constructor() {
		this.subscribedDevices = {};
		this.callbacks = {};

		this.connected = false;
		this.connecting = false;
	}

	connect() {
		if (this.connected || this.connecting) return;

		this.connecting = true;
		this.socket = new WebSocket(URL_WEBSOCKET);
		this.socket.addEventListener('open', event => {
			// Once the socket is connected, release any pending subscribeDevices()
			// calls.
			this.connecting = false;
			this.connected = true;
			this.onOpen(event);
		});
		this.socket.addEventListener('message', event => {
			this.onMessage(event);
		});
		this.socket.addEventListener('error', event => {
			// The 'close' event will get fired too.
		});
		this.socket.addEventListener('close', event => {
			this.connecting = false;
			this.connected = false;
			this.onClose(event);
		});
	}

	/**
	 * Handle a successful connection to the WebSocket server.
	 */
	async onOpen(event: Event) {
		// Request the latest data
		debug('Websocket connected');
		this.resubscribe();
	}

	async onClose(event: Event) {
		debug('Websocket connection lost, reconnecting');
		this.connect();
	}

	/**
	 * Process an incoming WebSocket message.
	 */
	async onMessage(event: MessageEvent<any>) {
		type MetricsMessage = Record<string, TMetrics[]>;
		const message = JSON.parse(event.data);
		switch (message.action ?? '_') {
			case 'subscribe':
				if (message.errors && message.errors.length > 0) {
					console.error('Websocket device subscription failed:', message.errors);
					// TODO: Pass error to UI somehow
				}
				break;
			case '_':
				let data: Record<TSubscriptionID, Record<string, TMetrics[]>> = {};
				for (const [deviceCode, metricList] of Object.entries(message as MetricsMessage)) {

					const subscriberList = [
						...this.subscribedDevices[deviceCode] ?? [],
						...this.subscribedDevices['*'] ?? [],
					];
					if (subscriberList.length === 0) continue;

					for (const metrics of metricList) {
						for (const idSubscription of subscriberList) {
							if (!data[idSubscription]) {
								data[idSubscription] = {};
							}
							if (!data[idSubscription][deviceCode]) {
								data[idSubscription][deviceCode] = [];
							}
							data[idSubscription][deviceCode].push(metrics);
						}
					}
				}
				for (const [idSubscription, devices] of Object.entries(data)) {
					this.callbacks[idSubscription as unknown as TSubscriptionID](devices);
				}
				break;
			default:
				debug('Unhandled message from websocket:', message);
				break;
		}
	}

	resubscribe() {
		const devices = Object.keys(this.subscribedDevices);
		if (devices.length > 0) {
			debug('Resubscribing to:', devices);
			this.socket.send(JSON.stringify({
				action: 'subscribe',
				token: DRE_WEBSOCKET_TOKEN,
				devices: devices,
			}));
		}
	}

	/**
	 * Request live updates for one or more devices.
	 */
	subscribeDevices(devices: string[], cb: LiveDataCallback): TSubscriptionID {
		const idSubscription = idNextSubscription++;

		// Reference count the subscriptions.
		let add = [];
		for (const d of devices) {
			if (this.subscribedDevices[d] === undefined) {
				this.subscribedDevices[d] = [];
				add.push(d);
			}
			this.subscribedDevices[d].push(idSubscription);
			this.callbacks[idSubscription] = cb;
		}

		if ((add.length > 0) && (this.connected)) {
			debug('Subscribing to:', add);
			this.socket.send(JSON.stringify({
				action: 'subscribe',
				token: DRE_WEBSOCKET_TOKEN,
				devices: add,
			}));
		} // else nothing to do or not connected yet, leave for resubscribe().

		// Ensure we're connected (no-op if we are connected).  If we weren't
		// connected, resubscribe() will handle the devices we just subbed to.
		this.connect();

		return idSubscription;
	}

	/**
	 * Cease receiving live updates for one or more devices.
	 */
	unsubscribeDevices(idSubscription: TSubscriptionID) {
		// Subtract from the reference count.
		let remove: string[] = [];
		for (const deviceCode of Object.keys(this.subscribedDevices)) {
			this.subscribedDevices[deviceCode] =
				this.subscribedDevices[deviceCode].filter(s => s !== idSubscription);

			if (this.subscribedDevices[deviceCode].length === 0) {
				remove.push(deviceCode);
				delete this.subscribedDevices[deviceCode];
			}
		}

		// Only unsubscribe once the reference count reaches zero.
		if ((remove.length > 0) && (this.connected)) {
			debug('Unsubscribing from:', remove);
			this.socket.send(JSON.stringify({
				action: 'unsubscribe',
				devices: remove,
			}));
		} // else not connected, nothing to do.
	}
}

const singletonLive = new LiveData();

export default singletonLive;
