//socket tài khoản

import _ from 'lodash';
import io from 'socket.io-client';

import config from './combineConfig';
import reduxStore, { dispatch } from './redux';
import { inquiryService, userService } from './services';
import * as actions from './store/actions';
import { CommonUtils, getValueFromSessionStorage, sessionKeyFactory, SocketStatus, Random } from "./utils";
import { logService } from './services';
import { emitter } from 'utils/EventEmitter';
import { queueProcessCommon } from './queueProcess'
const queueProcess = new queueProcessCommon()
// "CI";
// "OD";
// "SE";
// "PP";
// ///Thay doi thong tin tai khoan
// "CF";
// "logout";
// ///Thay doi ve lenh va vi the
// "DO";
// ///Thay doi ve tien
// "DT";
// ///Cap nhat so lenh dieu kien co so
// "COD";

export let sidSocket = undefined
let queueTime = 0 // time call func queue

const timeoutEmit = 500; // timeout phản hồi từ server

const TabID = CommonUtils.getTabID()
let token = null;
let isReloadData = false
const maxReSubSocket = 5 // Tối đa resub
let countReSubSocket = {} // count theo từng key là idReSub

logService.debug({ type: "socket_pushacc.:infoBrowser", tabID: TabID, isInfoBrowser: true, logBy: "socket_pushacc.js" }) // log thông tin Browser
let listIdSubReTry = []

function logDebug(from, msg) {
    from = sidSocket ? sidSocket + " " + from : from
    CommonUtils.logDebug(from, msg)
}
logDebug("socket_pushacc.:logDebug", "START LOG")
const globalVar = window._env_
const TAG_NAME = 'socket_pushacc.js.:'
export const socketRoomName = {
    // 'instrument': "i",    //room publish thông tin mã chứng khoán
    // 'instrument_ol': "i_ol",    //room publish thông tin mã chứng khoán lô lẻ
    // 'trade': "t",         //room publish thông tin chi tiết khớp mã chứng khoán
    // 'trade_ol': "t_ol",         //room publish thông tin chi tiết khớp mã chứng khoán lô lẻ
    // 'orderBook10': "o10",   //room publish thông tin top10 giá mua giá bán mã chứng khoán
    // 'orderBook_ol10': "o_ol10",   //room publish thông tin top10 giá mua giá bán mã chứng khoán lô lẻ
    // 'orderBook3': "o3",    //room publish thông tin top3 giá mua giá bán mã chứng khoán
    // 'orderBook_ol3': "o_ol3",    //room publish thông tin top3 giá mua giá bán mã chứng khoán lô lẻ
    // 'orderBook': "o",    //Loại event publish thông tin top3 giá mua giá bán mã chứng khoán
    // 'orderBook_ol': "o_ol",    //Loại event publish thông tin top3 giá mua giá bán mã chứng khoán lô lẻ
    // 'exchange': "e",    //room subscribe thông tin mã chứng khoán theo sàn
    // 'exchange_ol': "e_ol",    //room subscribe thông tin mã chứng khoán lô lẻ theo sàn
    // 'index': "idx",          //room publish thông tin chỉ số
    // 'putthrough': "pth",  //room publish thông tin mua bán thỏa thuận theo sàn
    // 'ptmatch': "ptm",     //room publish thông tin mua bán thỏa thuận theo sàn

    // 'channel': "ch",      //room publish các loại thông tin cho các kênh kết nối dạng server2server

    'account': "acc",      //room publish tín hiệu thay đổi balance tiểu khoản
    'customer': "cus",    //room publish thông tin liên quan đến tài khoản (Vd: thông báo đã download báo cáo xong)
    'message': "msg",      //room publish thông điệp, cảnh báo liên quan tới tài khoản, hoặc các tiểu khoản
    'report': "rpt",      //Loại event report bắn chung vào room customer
    'message_analytic': "msg.analyt",      //room publish thông điệp tin phân tích do công ty ck soạn
    "signal": "signal", // Signal của datx
    "rating": "rating" // xếp hạng của datx
};
const socketAction = {    // socket action từ server trả về
    partial: "p",    // socket action partial
    update: "u",    // socket action update
    insert: "i",    // socket action insert
    delete: "d",    // socket action delete
    signal: "s",    // socket action signal
};
const SDK_INFO = {
    '__sails_io_sdk_version': '1.2.1',
    '__sails_io_sdk_platform': 'browser',
    '__sails_io_sdk_language': 'javascript',
    // clientid: config.api.CLIENT_ID,
    // clientsecret: config.api.CLIENT_SECRET
};
//socket tài khoản
const socket = io(globalVar.api.API_BASE_URL, {
    path: '/realtime/socket.io',
    // path: '/realtime/socket.io',
    transports: ['websocket'],
    query: SDK_INFO,
    autoConnect: false
});

const operationsQueue = [];
const subscribedTopics = [];
const registeredTopics = {};
const registeredCalledIds = {};
let statusConnectSocket = "connecting"

const reSubSocket = (topics, data) => {
    // return null; // Tạm chặn logic resub chờ check log server Socket vì sao trả ra chậm.
    if (statusConnectSocket !== "connected") {
        // reconnectSocketHandler()
        return null;
    }
    let idReSub = data && data.idSub || undefined
    if (!idReSub) return null;
    if (countReSubSocket[idReSub] > maxReSubSocket) {
        // reconnectSocketHandler()
        return null;
    }
    if (countReSubSocket[idReSub]) {
        countReSubSocket[idReSub] = countReSubSocket[idReSub] + 1
    } else {
        countReSubSocket[idReSub] = 1
    }
    logDebug("socket_pushacc.:reSubSocket().:=", { idReSub, countReSubSocket: countReSubSocket[idReSub], countReSubSocket })
    let _index = _.findIndex(listIdSubReTry, (item) => { return item === idReSub })
    if (_index == -1) {
        listIdSubReTry.push(idReSub)
    }
    let _subscribedTopics = Object.keys(registeredTopics) || []
    // logDebug("socket_pushacc.:reSubSocket().:topics=", { topics, _subscribedTopics })
    if (!registeredTopics || registeredTopics.length === 0) {
        return false;
    }
    let _topics = []
    _.map(topics, (topic) => {
        if (_subscribedTopics.indexOf(topic) > -1) {
            _topics.push(topic)
        }
    })
    if (_topics && _topics.length > 0) {
        _subscribeToTopics(_topics, idReSub, countReSubSocket[idReSub])
    }
    return null;
}

const _subscribeToTopics = (topics, idReSub, countReSub) => {
    let idSub = idReSub || new Date().getTime() + "_" + Random.randomIdChars(6)
    if (!topics || topics.length === 0) {
        return false;
    }

    if (!socket.connected) {
        //console.log(TAG_NAME + 'Socket not ready, topics will be subscribe later');
        return false;
    }

    let state, tabId, SID_socket;
    token = CommonUtils.getTokenCurr()
    tabId = TabID
    SID_socket = sessionStorage.getItem("sidSocketCurr")
    let timeSub = new Date().getTime()
    let data = {
        op: 'subscribe',
        args: topics,
        token: token,
        tabId: tabId,
        idSub: idSub,
        timeSub: timeSub,
        SID_socket,
        countReSub
    }
    logService.debug({ typeCall: "_subscribeToTopics.:START", data, logBy: "socket_pushacc.js" })
    timeSub = new Date().getTime()
    data.timeSub = timeSub
    // https://socket.io/docs/v4/client-api/
    socket.timeout(timeoutEmit).emit("get", {
        url: '/client/send',
        method: 'get',
        headers: {

        },
        data: data
    }, (err, response) => {
        let timeRes = new Date().getTime() - timeSub
        data.timeRes = timeRes
        data.isReloadData = isReloadData
        if (err) {
            state = reduxStore.getState();
            let IsActiveTabBrowser = state.app.isActiveTabBrowser
            data.IsActiveTabBrowser = IsActiveTabBrowser
            // the server did not acknowledge the event in the given delay
            logDebug("socket_acc.:subscribe.:err=", { err, data: data })
            logService.error({ typeCall: "_subscribeToTopics", err: err, data: data.SID_socket, logBy: "socket_pushacc.js" })
            // reconnectSocketHandler(0)
            reSubSocket(topics, data)
            return false;
        }
        if (response.statusCode === 200) {
            //console.log(TAG_NAME + 'Subscribed to topics ' + topics.join(', '));
        } else {
            //console.log(TAG_NAME + 'Fail to subscribe to topics ' + topics.join(', '));
        }
    }
    );

    return true;
};

const _unsubscribeFromTopics = (topics, idReSub) => {
    let idSub = idReSub || new Date().getTime() + "_" + Random.randomIdChars(6)
    if (!topics || topics.length === 0) {
        return;
    }

    if (!socket.connected) {
        //console.log(TAG_NAME + 'Socket not ready, unsubscribe is skipped');
        return;
    }
    let state, tabId, SID_socket;
    token = CommonUtils.getTokenCurr()
    let timeSub = new Date().getTime()
    let data = {
        op: 'unsubscribe',
        args: topics,
        token: token,
        tabId: tabId,
        idSub: idSub,
        timeSub: timeSub,
        SID_socket,
    }
    logService.debug({ typeCall: "_unsubscribeFromTopics.:START", data, logBy: "socket_pushacc.js" })
    timeSub = new Date().getTime()
    data.timeSub = timeSub
    socket.timeout(timeoutEmit).emit("get", {
        url: '/client/send',
        method: 'get',
        headers: {},
        data: data
    }, (err, response) => {
        let timeRes = new Date().getTime() - timeSub
        data.timeRes = timeRes
        data.isReloadData = isReloadData
        if (err) {
            state = reduxStore.getState();
            let IsActiveTabBrowser = state.app.isActiveTabBrowser
            data.IsActiveTabBrowser = IsActiveTabBrowser
            // the server did not acknowledge the event in the given delay
            logDebug("socket_acc.:unsubscribe.:err=", { err, data: data })
            logService.error({ typeCall: "_unsubscribeFromTopics", err: err, data: data.SID_socket, logBy: "socket_pushacc.js" })
            // reconnectSocketHandler(0)
            // reSubSocket(topics, data)
            return false;
        }
        if (response.statusCode === 200) {
            //console.log(TAG_NAME + 'Subscribed to topics ' + topics.join(', '));
        } else {
            //console.log(TAG_NAME + 'Fail to subscribe to topics ' + topics.join(', '));
            logService.error({ typeCall: "_unsubscribeFromTopics", tabId: TabID, sidSocket: sidSocket, token: token, idSub: idSub, response: response, timeRes: timeRes, logBy: "socket_pushacc.js" })
        }
    }
    );
};

// Clear callerId with empty topic and clear topic with empty callerId
const _prunce = () => {
    const willBeDeleteTopics = [];
    _.forIn(registeredTopics, (callerIds, topic) => {
        if (!callerIds || callerIds.length === 0) {
            willBeDeleteTopics.push(topic);
        }
    });
    _.forEach(willBeDeleteTopics, (topic) => {
        delete registeredTopics[topic];
    });

    const willBeDeleteCallerIds = [];
    _.forIn(registeredCalledIds, (topics, callerId) => {
        if (!topics || topics.length === 0) {
            willBeDeleteCallerIds.push(callerId);
        }
    });
    _.forEach(willBeDeleteCallerIds, (callerId) => {
        delete registeredCalledIds[callerId];
    });
};

const _clearSubscribedTopics = () => {
    subscribedTopics.length = 0;
};

const _registerTopic = (topic, callerId) => {
    if (registeredTopics.hasOwnProperty(topic)) {
        const callerIds = registeredTopics[topic];
        if (!_.includes(callerIds, callerId)) {
            callerIds.push(callerId);
        }
    } else {
        registeredTopics[topic] = [callerId];
    }

    if (registeredCalledIds.hasOwnProperty(callerId)) {
        const topics = registeredCalledIds[callerId];
        if (!_.includes(topics, topic)) {
            topics.push(topic);
        }
    } else {
        registeredCalledIds[callerId] = [topic];
    }
};

const _unregisterTopic = (topic, callerId) => {
    if (registeredTopics.hasOwnProperty(topic)) {
        const callerIds = registeredTopics[topic];
        _.remove(callerIds, (element) => {
            return callerId === element;
        });
    }

    if (registeredCalledIds.hasOwnProperty(callerId)) {
        const topics = registeredCalledIds[callerId];
        _.remove(topics, (element) => {
            return topic === element;
        });
    }
};

const _unregisterTopics = (topics, callerId, dontCallApply) => {
    if (topics && topics.length > 0) {
        for (let i = 0; i < topics.length; i++) {
            _unregisterTopic(topics[i], callerId);
        }
    }

    _prunce();

    if (!dontCallApply) {
        _apply();
    }
};

const _unregisterCallerId = (callerId, dontCallApply) => {
    if (registeredCalledIds.hasOwnProperty(callerId)) {
        const topics = registeredCalledIds[callerId];
        if (topics && topics.length > 0) {
            _unregisterTopics([...topics], callerId, dontCallApply);
        }
    }
};

const _registerTopics = (topics, callerId) => {
    // Unregister all topic previously registered by this callerId
    _unregisterCallerId(callerId, true);

    if (topics && topics.length > 0) {
        for (let i = 0; i < topics.length; i++) {
            _registerTopic(topics[i], callerId);
        }
    }

    _apply();
};

const _apply = () => {
    const needToSubscribeTopics = [];
    _.forIn(registeredTopics, (callerIds, topic) => {
        if (callerIds.length > 0) {
            needToSubscribeTopics.push(topic);
        }
    });

    const willBeSubscribeTopics = _.difference(needToSubscribeTopics, subscribedTopics);
    const willBeUnSubscribeTopics = _.difference(subscribedTopics, needToSubscribeTopics);

    if (_subscribeToTopics(willBeSubscribeTopics)) {
        _.forEach(willBeSubscribeTopics, (topic) => {
            if (!_.includes(subscribedTopics, topic)) {
                subscribedTopics.push(topic);
            }
        });
    }

    _unsubscribeFromTopics(willBeUnSubscribeTopics);
    _.forEach(willBeUnSubscribeTopics, (topic) => {
        if (_.includes(subscribedTopics, topic)) {
            _.remove(subscribedTopics, (element) => {
                return element === topic;
            });
        }
    });
};

const _scheduleToExecuteOperations = () => {
    let timer = setTimeout(() => { _executeOperations(); clearTimeout(timer) }, 100);
};

const _executeOperation = (operation) => {
    const { action, data } = operation;
    switch (action) {
        case 're-subscribe':
            _apply();
            break;
        case 'clear-subscribed':
            _clearSubscribedTopics();
            break;
        case 'register': {
            const { topics, callerId } = data;
            _registerTopics(topics, callerId);
            break;
        }
        case 'unregister': {
            const { topics, callerId } = data;
            _unregisterTopics(topics, callerId);
            break;
        }
        case 'unregister-callerId': {
            const { callerId } = data;
            _unregisterCallerId(callerId, false);
            break;
        }
        default:
            //console.log(TAG_NAME + 'Unknown socket action ' + action);
            break;
    }
};

const _executeOperations = () => {
    while (operationsQueue.length > 0) {
        const operation = operationsQueue.shift();
        _executeOperation(operation);
    }
    _scheduleToExecuteOperations();
};

const _requestReSubscribeToAllSubscribedTopics = () => {
    let context = {
        action: 're-subscribe'
    }
    queueProcess.add(_executeOperation, context, queueTime)
};

const _requestClearSubscribedTopics = () => {
    let context = {
        action: 'clear-subscribed'
    }
    queueProcess.add(_executeOperation, context, queueTime)
};

// Fired upon a connection including a successful reconnection
socket.on('connect', () => {
    //console.log(TAG_NAME + 'Socket connected');

    _requestReSubscribeToAllSubscribedTopics();

    dispatch(actions.setSocketConnectStatus(SocketStatus.CONNECTED));

});

// Fired upon a disconnection including a abnormal disconnection
socket.on('disconnect', () => {
    //console.log(TAG_NAME + 'Socket disconnected');
    _requestClearSubscribedTopics();
    dispatch(actions.setSocketConnectStatus(SocketStatus.DISCONNECTED));
});


const timeSkip = config.app.TIME_SKIP_EVENT_ACCOUNT || 500; // defaut 500ms
let timerAccount = {
    'OD': {}, 'CFMOD': {}, 'CI': {}, 'SE': {}, 'CA': {}, 'P': {}, 'PP': {}, 'DO': {}, 'DT': {}, 'COD': {}
};
socket.on(socketRoomName.account, (message) => {
    if (message.d.length > 0) {
        let { ACC: accountId, DT: type } = message.d[0];
        dispatch(actions.onSocketAccountAction(accountId, type));
        // if (type == 'OD') {
        // OFF nén tín hiệu
        // if (!timerAccount[type][accountId]) {
        //     timerAccount[type][accountId] = setTimeout(function () {
        //         dispatch(actions.onSocketAccountAction(accountId, type));
        //         clearTimeout(timerAccount[type][accountId])
        //         timerAccount[type][accountId] = undefined
        //     }, timeSkip)
        // }
        // }
    }
});

let timerCustomer = { 'OD': undefined };
socket.on(socketRoomName.customer, (message) => {
    if (message.d.length > 0) {
        const { DT: type } = message.d[0];
        if (type == 'CF') dispatch(actions.changeCustomerInfoEventtype(message));
        else if (type == 'AFU') {
            userService.getPermissionInfo().then((data) => { }).catch(error => { }) //giang.ngo: gọi lại hàm getPermissionInfo để cập nhật lại thông tin permissioninfo của tài khoản
            dispatch(actions.changeAccountInfoEventtype(message));
        } else {
            /**
             * xử lí các event theo custid
             * example: reload so lenh moi gioi
             * case1: type=OD
             * ...
             */
            // if (type == 'OD') {
            let { CUSTID: custid } = message.d[0];
            dispatch(actions.onSocketCustomerAction(custid, type));
            // OFF nén tín hiệu
            // if (!timerCustomer[type]) {
            //     timerCustomer[type] = setTimeout(function () {
            //         dispatch(actions.onSocketCustomerAction(custid, type));
            //         clearTimeout(timerCustomer[type])
            //         timerCustomer[type] = undefined
            //     }, timeSkip)
            // }
            // }
        };
    }
});

socket.on(socketRoomName.report, (message) => {
    let reportKeys = getValueFromSessionStorage(sessionKeyFactory.reportKeys);
    if (reportKeys) {
        reportKeys = JSON.parse(reportKeys);
    } else reportKeys = {};
    if (message.d.length > 0) {
        const { autoid, fekey } = message.d[0];
        if (autoid) {
            if (fekey != null && !reportKeys[fekey]) return;
            delete reportKeys[fekey];
            inquiryService.getReport(autoid);
        }
    }
});

socket.on("logout", (message) => {
    const token = CommonUtils.getTokenCurr()
    const { CLIENTID, REASON, ACCESS_TOKEN } = message;
    if (CLIENTID == "*" || CLIENTID == globalVar.api.CLIENT_ID) {
        if (REASON == "KILL_SESSION") {
            //ko đá bản thân
            if (token !== ACCESS_TOKEN) {
                dispatch(actions.logoutByOther());
            };
        } else if (REASON == "CHANGE_PASS") {
            dispatch(actions.changePassWordInfoEventtype(message))
        }
    }
});
socket.on(socketRoomName.message, (message) => {
    // dispatch(actions.updateAccountNotifyList());
    dispatch(actions.updateAccountNotifyCounter());
});
socket.on(socketRoomName.message_analytic, (message) => {
    // Trường hợp xóa tin không hiển thị thông báo nhận thông điệp
    const state = reduxStore.getState();
    if (message.d && message.d.length > 0 && state.user.isLoggedIn) {
        dispatch(actions.toastToNoticeWhenReceiveMessage());
    }
    // dispatch(actions.updateAnalyticNotifyList());
});

export const on = (eventName, eventHandler) => {
    socket.on(eventName, (data) => {
        eventHandler(data);
    })
};

export const connect = () => {
    dispatch(actions.setSocketConnectStatus(SocketStatus.CONNECTING));
    statusConnectSocket = "connected"
    //console.log(TAG_NAME + 'Socket connecting');
    socket.open();
};

export const disconnect = () => {
    statusConnectSocket = "disconnect"
    logDebug(`socket_pushacc.:msg.:${"disconnect"}`)
    logService.debug({ typeCall: "disconnect", tabId: TabID, sidSocket: sidSocket, token: token, logBy: "socket_pushacc.js" })
    dispatch(actions.setSocketConnectStatus(SocketStatus.DISCONNECTED));
    return socket.close();
};


socket.on('connect_error', err => {
    statusConnectSocket = "connect_error"
    // dispatch(actions.setSocketMarketConnectFirstTime(false));
    dispatch(actions.setSocketConnectStatus(SocketStatus.ERROR));
    logDebug(`socket_pushacc.:on.:${"connect_error"}.:err=`, { err })
    logService.error({ typeOn: "connect_error", tabId: TabID, sidSocket: sidSocket, token: token, err: err, logBy: "socket_pushacc.js" })
});

socket.on('connect_failed', err => {
    statusConnectSocket = "connect_failed"
    dispatch(actions.setSocketConnectStatus(SocketStatus.ERROR));
    logDebug(`socket_pushacc.:on.:${"connect_failed.:"}.:err=`, { err })
    logService.error({ typeOn: "connect_failed", tabId: TabID, sidSocket: sidSocket, token: token, err: err, logBy: "socket_pushacc.js" })
});

socket.io.on("reconnect_attempt", (attempt) => {
    statusConnectSocket = "reconnect_attempt"
    dispatch(actions.setSocketConnectStatus(SocketStatus.WARNING));
    logDebug(`socket_pushacc.:on.:${"reconnect_attempt"}.:attempt=`, { attempt })
    logService.error({ typeOn: "reconnect_attempt", tabId: TabID, sidSocket: sidSocket, token: token, attempt_err: attempt, logBy: "socket_pushacc.js" })
});

socket.io.on("reconnect_error", (error) => {
    statusConnectSocket = "reconnect_error"
    dispatch(actions.setSocketConnectStatus(SocketStatus.ERROR));
    logDebug(`socket_pushacc.:on.:${"reconnect_error"}.:err=`, { error })
    logService.error({ typeOn: "reconnect_error", tabId: TabID, sidSocket: sidSocket, token: token, err: error, logBy: "socket_pushacc.js" })
});

socket.io.on("reconnect_failed", (err) => {
    statusConnectSocket = "reconnect_failed"
    dispatch(actions.setSocketConnectStatus(SocketStatus.ERROR));
    logDebug(`socket_pushacc.:on.:${"reconnect_failed"}.:err=`, { err })
    logService.error({ typeOn: "reconnect_failed", tabId: TabID, sidSocket: sidSocket, token: token, err: err, logBy: "socket_pushacc.js" })
});

socket.on('reconnecting', () => {
    statusConnectSocket = "reconnecting"
    dispatch(actions.setSocketConnectStatus(SocketStatus.CONNECTING));
    logDebug(`socket_pushacc.:msg.:${"reconnecting"}`)
    logService.debug({ typeOn: "reconnecting", tabId: TabID, sidSocket: sidSocket, token: token, logBy: "socket_pushacc.js" })
});

const timerReconnect = 100
const maxRetryReconnectSocket = 20
let countRetryReconnectSocket = 0
function reconnectSocketHandler() {
    countRetryReconnectSocket++
    console.debug("reconnectSocketHandler.:", countRetryReconnectSocket, statusConnectSocket)
    let timerInc = timerReconnect * countRetryReconnectSocket
    if (countRetryReconnectSocket > 0 && countRetryReconnectSocket <= 5) {
        // ưu tiên 5 lần đầu connect nhanh 
        timerInc = 50
    }
    if (countRetryReconnectSocket <= maxRetryReconnectSocket) {
        // giảm tốc độ của việc reconect
        let timer = setTimeout(function () {
            disconnect()
            connect()
            clearTimeout(timer)
        }, timerInc)
    }
}

export const registerTopics = (topics, callerId) => {
    if (!callerId) {
        console.error('CallerId need when register interested topics');
        return;
    }
    let context = {
        action: 'register',
        data: {
            topics: topics,
            callerId: callerId
        }
    }
    queueProcess.add(_executeOperation, context, queueTime)
};

export const unregisterTopics = (topics, callerId) => {
    if (!callerId) {
        console.error('CallerId need when unregister interested topics');
        return;
    }
    let context = {
        action: 'unregister',
        data: {
            topics: topics,
            callerId: callerId
        }
    }
    queueProcess.add(_executeOperation, context, queueTime)
};

// export const unregisterTradeTopic = (symbol, callerId) => {
//     unregisterTopics([socketRoomName.trade + ':' + symbol], callerId);
// };

export const unregisterAnalyticMessage = (language, custid, callerId) => {
    unregisterTopics([socketRoomName.message_analytic + ':' + language, socketRoomName.message_analytic + ':' + language + ':' + custid], callerId);
};

export const unregisterCallerId = (callerId) => {
    if (!callerId) {
        console.error('CallerId need when unregister interested topics');
        return;
    }
    let context = {
        action: 'unregister-callerId',
        data: {
            callerId: callerId
        }
    }
    queueProcess.add(_executeOperation, context, queueTime)
};

export const registerAccountTopics = (accountId, callerId) => {
    registerTopics([socketRoomName.account + ':' + accountId], callerId);
};
export const registerAllAccountTopics = (accountIds, callerId) => {
    var subcribeAccounts = []
    accountIds.forEach(accountId => {
        subcribeAccounts.push([socketRoomName.account + ':' + accountId])
    })
    registerTopics(subcribeAccounts, callerId);
};
export const registerCustomerTopics = (custid, callerId) => {
    registerTopics([socketRoomName.customer + ':' + custid], callerId);
};
export const registerAccountMessageTopics = (custid, callerId) => {
    registerTopics([socketRoomName.message + ':' + custid], callerId);
};
export const registerAnalyticMessageTopics = (language, custid, callerId) => {
    registerTopics([socketRoomName.message_analytic + ':' + language, socketRoomName.message_analytic + ':' + language + ':' + custid], callerId);
};

export const registerDatxAllSignalInfo = (callerId) => {
    registerTopics([socketRoomName.signal + ':x_buy', socketRoomName.signal + ':x_speculator', socketRoomName.signal + ':x_canslim', socketRoomName.signal + ':x_darvas'], callerId);
}

export const registerDatxRatingInfo = (callerId) => {
    registerTopics([socketRoomName.signal + ':market'], callerId);
}


// setInterval(() => {
//     let message={
//         "a": "open",
//         "d": [
//             {
//                 "open_date": "2024-04-08T10:10:24",
//                 "closed_price": 0,
//                 "symbol": "MBB",
//                 "event_type": "x_buy",
//                 "sector_key": "banks",
//                 "stop_loss_price": 31.8,
//                 "signal_id": Random.randomComponentId(),
//                 "open_price": 35.3,
//                 "status": "open",
//                 "take_profit_price": 40.6
//             }
//         ]
//     }
//     const { a: action, d: data, k: keys } = message;
//     let convertData = CommonUtils.convertSignalDataDatx({ ...data[0], status: action });
//     emitter.emit('DATX_SIGNAL', { action: action, data: convertData });
// }, 1000);


socket.on('signal', (message) => {
    const { a: action, d: data, type } = message;

    if (type && type === 'signal') {
        if (data && data.length > 0) {
            let convertData = CommonUtils.convertSignalDataDatx({ ...data[0], status: action });
            emitter.emit('DATX_SIGNAL', { action: action, data: convertData });
        }
    }

    if (type && type === "sector-rating") {
        emitter.emit('DATX_SECTOR_RATING');
    }

    if (type && type === "stock-rating") {
        emitter.emit('DATX_STOCK_RATING');
    }

    if (type && type === "market-us") { // Trạng thái thị trường cơ sở
        emitter.emit('DATX_MARKET_US');
    }

    if (type && type === "market-de") { // Trạng thái thị trường phái sinh
        emitter.emit('DATX_MARKET_DE');
    }
});

_scheduleToExecuteOperations();