/*
 * Decompiled with CFR 0.152.
 */
package fr.esrf.TangoApi.events;

import fr.esrf.Tango.DevError;
import fr.esrf.Tango.DevFailed;
import fr.esrf.Tango.DevVarLongStringArray;
import fr.esrf.TangoApi.ApiUtil;
import fr.esrf.TangoApi.CallBack;
import fr.esrf.TangoApi.Database;
import fr.esrf.TangoApi.DeviceData;
import fr.esrf.TangoApi.DeviceProxy;
import fr.esrf.TangoApi.events.EventCallBackStruct;
import fr.esrf.TangoApi.events.EventChannelStruct;
import fr.esrf.TangoApi.events.EventConsumer;
import fr.esrf.TangoApi.events.EventQueue;
import fr.esrf.TangoApi.events.IEventConsumer;
import fr.esrf.TangoApi.events.KeepAliveThread;
import fr.esrf.TangoApi.events.ZMQutils;
import fr.esrf.TangoApi.events.ZmqMainThread;
import fr.esrf.TangoDs.Except;
import fr.esrf.TangoDs.TangoConst;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Enumeration;
import org.omg.CosEventComm.Disconnected;
import org.omg.CosNotification.StructuredEvent;

public class ZmqEventConsumer
extends EventConsumer
implements TangoConst,
Runnable,
IEventConsumer {
    private static ZmqEventConsumer instance = null;
    private Thread runner;

    public static ZmqEventConsumer getInstance() throws DevFailed {
        if (instance == null) {
            instance = new ZmqEventConsumer();
        }
        return instance;
    }

    private ZmqEventConsumer() throws DevFailed {
        ZmqMainThread zmqMainThread = new ZmqMainThread(ZMQutils.getContext());
        zmqMainThread.start();
        this.addShutdownHook();
    }

    private void addShutdownHook() {
        this.runner = new Thread(this);
        this.runner.setName("ZmqEventConsumer");
        Runtime.getRuntime().addShutdownHook(new Thread(){

            @Override
            public void run() {
                System.out.println("======== Shutting down ZMQ event system ==========");
                KeepAliveThread.getInstance().stopThread();
                try {
                    ZmqEventConsumer.this.runner.join();
                }
                catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        });
        this.runner.start();
    }

    @Override
    public void run() {
    }

    @Override
    public int subscribe_event(DeviceProxy device, int event, CallBack callback, int max_size, boolean stateless) throws DevFailed {
        String event_name = eventNames[event];
        ApiUtil.printTrace("=============> subscribing for " + device.name() + "." + event_name);
        if (callback == null && max_size >= 0 && device.getEventQueue() == null) {
            if (max_size > 0) {
                device.setEventQueue(new EventQueue(max_size));
            } else {
                device.setEventQueue(new EventQueue());
            }
        }
        String deviceName = device.fullName();
        String callback_key = deviceName.toLowerCase();
        callback_key = callback_key + "." + event_name;
        try {
            ApiUtil.printTrace("calling callEventSubscriptionAndConnect() method");
            this.callEventSubscriptionAndConnect(device, event_name);
            ApiUtil.printTrace("call callEventSubscriptionAndConnect() method done");
        }
        catch (DevFailed e) {
            if (!stateless || e.errors[0].desc.equals("Command ZmqEventSubscriptionChange not found")) {
                throw e;
            }
            EventCallBackStruct new_event_callback_struct = new EventCallBackStruct(device, event_name, "", callback, max_size, ++subscribe_event_id, event, false);
            failed_event_callback_map.put(callback_key, new_event_callback_struct);
            return subscribe_event_id;
        }
        String channelName = (String)device_channel_map.get(deviceName);
        if (channelName == null) {
            int start = deviceName.indexOf(47, "tango:// ".length());
            deviceName = deviceName.substring(start + 1);
            channelName = (String)device_channel_map.get(deviceName);
        }
        EventChannelStruct event_channel_struct = (EventChannelStruct)channel_map.get(channelName);
        event_channel_struct.last_subscribed = System.currentTimeMillis();
        EventCallBackStruct failed_struct = (EventCallBackStruct)failed_event_callback_map.get(callback_key);
        int evnt_id = failed_struct == null ? ++subscribe_event_id : failed_struct.id;
        EventCallBackStruct new_event_callback_struct = new EventCallBackStruct(device, event_name, channelName, callback, max_size, evnt_id, event, true);
        new_event_callback_struct.consumer = this;
        event_callback_map.put(callback_key, new_event_callback_struct);
        if (event == 0 || event == 2 || event == 1 || event == 3 || event == 4 || event == 7 || event == 5) {
            new EventConsumer.PushAttrValueLater(new_event_callback_struct).start();
        }
        return evnt_id;
    }

    private void callEventSubscriptionAndConnect(DeviceProxy device, String eventType) throws DevFailed {
        String device_name = device.name();
        String[] info = new String[]{device_name, "", "subscribe", eventType, Integer.toString(device.get_idl_version())};
        DeviceData argin = new DeviceData();
        argin.insert(info);
        String cmdName = this.getEventSubscriptionCommandName();
        ApiUtil.printTrace(device.get_adm_dev().name() + ".command_inout(\"" + cmdName + "\") for " + device_name + eventType);
        DeviceData argout = device.get_adm_dev().command_inout(cmdName, argin);
        ApiUtil.printTrace("    command_inout done.");
        this.checkDeviceConnection(device, null, argout, eventType);
    }

    @Override
    protected String getEventSubscriptionCommandName() {
        return "ZmqEventSubscriptionChange";
    }

    @Override
    protected void checkIfAlreadyConnected(DeviceProxy device, String attribute, String event_name, CallBack callback, int max_size, boolean stateless) throws DevFailed {
    }

    @Override
    protected void setAdditionalInfoToEventCallBackStruct(EventCallBackStruct callback_struct, String device_name, String attribute, String event_name, String[] filters, EventChannelStruct channel_struct) throws DevFailed {
        ApiUtil.printTrace("-------------> Set as ZmqEventConsumer for " + device_name);
        callback_struct.consumer = this;
    }

    private void connect(DeviceProxy deviceProxy, String attributeName, String eventName, DeviceData deviceData) throws DevFailed {
        String deviceName = deviceProxy.fullName();
        String adm_name = null;
        int tangoVersion = deviceData.extractLongStringArray().lvalue[0];
        try {
            adm_name = deviceProxy.adm_name();
            if (tangoVersion >= 810) {
                adm_name = adm_name.toLowerCase();
            }
        }
        catch (DevFailed e) {
            Except.throw_event_system_failed("API_BadConfigurationProperty", "Can't subscribe to event for device " + deviceName + "\n Check that device server is running...", "ZmqEventConsumer.connect");
        }
        String channelName = adm_name;
        Database dbase = null;
        if (!channel_map.containsKey(channelName)) {
            if (deviceProxy.use_db()) {
                dbase = deviceProxy.get_db_obj();
            }
            EventConsumer.ConnectionStructure connectionStructure = new EventConsumer.ConnectionStructure(deviceProxy.get_tango_host(), channelName, deviceName, attributeName, eventName, dbase, deviceData, false);
            this.connect_event_channel(connectionStructure);
        } else if (deviceProxy.use_db()) {
            dbase = deviceProxy.get_db_obj();
            ZMQutils.connectEvent(deviceProxy.get_tango_host(), deviceName, attributeName, deviceData.extractLongStringArray(), eventName, false);
        }
        EventChannelStruct eventChannelStruct = (EventChannelStruct)channel_map.get(channelName);
        eventChannelStruct.adm_device_proxy = new DeviceProxy(adm_name);
        eventChannelStruct.use_db = deviceProxy.use_db();
        eventChannelStruct.dbase = dbase;
        eventChannelStruct.setTangoRelease(tangoVersion);
        device_channel_map.put(deviceName, channelName);
    }

    private DeviceData checkZmqAddress(DeviceData deviceData, DeviceProxy deviceProxy) throws DevFailed {
        DevVarLongStringArray lsa = deviceData.extractLongStringArray();
        try {
            InetAddress iadd = InetAddress.getByName(deviceProxy.get_host_name());
            String hostAddress = iadd.getHostAddress();
            if (!lsa.svalue[0].startsWith("tcp://" + hostAddress)) {
                String wrongAdd = lsa.svalue[0];
                int idx = lsa.svalue[0].lastIndexOf(58);
                if (idx > 0) {
                    lsa.svalue[0] = "tcp://" + hostAddress + lsa.svalue[0].substring(idx);
                    lsa.svalue[1] = "tcp://" + hostAddress + lsa.svalue[1].substring(idx);
                    System.out.println(wrongAdd + " ---> " + lsa.svalue[0]);
                    deviceData = new DeviceData();
                    deviceData.insert(lsa);
                }
            }
        }
        catch (UnknownHostException e) {
            Except.throw_exception("UnknownHostException", e.toString(), "ZmqEventConsumer.checkZmqAddress()");
        }
        return deviceData;
    }

    @Override
    protected void checkDeviceConnection(DeviceProxy deviceProxy, String attribute, DeviceData deviceData, String event_name) throws DevFailed {
        deviceData = this.checkZmqAddress(deviceData, deviceProxy);
        String deviceName = deviceProxy.fullName();
        ApiUtil.printTrace("checkDeviceConnection for " + deviceName);
        if (!device_channel_map.containsKey(deviceName)) {
            ApiUtil.printTrace("    Does NOT Exist");
            this.connect(deviceProxy, attribute, event_name, deviceData);
            if (!device_channel_map.containsKey(deviceName)) {
                Except.throw_event_system_failed("API_NotificationServiceFailed", "Failed to connect to event channel for device", "EventConsumer.subscribe_event()");
            }
        } else {
            ApiUtil.printTrace(deviceName + " already connected.");
            ZMQutils.connectEvent(deviceProxy.get_tango_host(), deviceName, attribute, deviceData.extractLongStringArray(), event_name, false);
        }
    }

    @Override
    protected synchronized void connect_event_channel(EventConsumer.ConnectionStructure cs) throws DevFailed {
        DeviceProxy adminDevice = new DeviceProxy(cs.channelName);
        cs.channelName = adminDevice.fullName();
        DevVarLongStringArray lsa = cs.deviceData.extractLongStringArray();
        ApiUtil.printTrace("connect_event_channel for " + cs.channelName);
        ZMQutils.connectHeartbeat(adminDevice.get_tango_host(), adminDevice.name(), lsa, false);
        ZMQutils.connectEvent(cs.tangoHost, cs.deviceName, cs.attributeName, lsa, cs.eventName, false);
        if (cs.reconnect) {
            EventChannelStruct eventChannelStruct = (EventChannelStruct)channel_map.get(cs.channelName);
            eventChannelStruct.last_heartbeat = System.currentTimeMillis();
            eventChannelStruct.heartbeat_skipped = false;
            eventChannelStruct.has_notifd_closed_the_connection = 0;
            eventChannelStruct.setTangoRelease(lsa.lvalue[0]);
            eventChannelStruct.setIdlVersion(lsa.lvalue[1]);
        } else {
            String[] tangoHosts;
            EventChannelStruct newEventChannelStruct = new EventChannelStruct();
            newEventChannelStruct.last_heartbeat = System.currentTimeMillis();
            newEventChannelStruct.heartbeat_skipped = false;
            newEventChannelStruct.adm_device_proxy = adminDevice;
            newEventChannelStruct.has_notifd_closed_the_connection = 0;
            newEventChannelStruct.consumer = this;
            newEventChannelStruct.zmqEndpoint = lsa.svalue[0];
            newEventChannelStruct.setTangoRelease(lsa.lvalue[0]);
            newEventChannelStruct.setIdlVersion(lsa.lvalue[1]);
            channel_map.put(cs.channelName, newEventChannelStruct);
            ApiUtil.printTrace("Adding " + cs.channelName + " to channel_map");
            for (String tangoHost : tangoHosts = adminDevice.get_db_obj().getPossibleTangoHosts()) {
                tangoHost = "tango://" + tangoHost;
                boolean found = false;
                for (String possibleTangoHost : possibleTangoHosts) {
                    if (!possibleTangoHost.equals(tangoHost)) continue;
                    found = true;
                }
                if (found) continue;
                possibleTangoHosts.add(tangoHost);
            }
        }
    }

    @Override
    protected boolean reSubscribe(EventChannelStruct channelStruct, EventCallBackStruct eventCallBackStruct) {
        boolean done = false;
        try {
            ApiUtil.printTrace("====================================================\n   Try to resubscribe " + eventCallBackStruct.channel_name);
            DevVarLongStringArray lsa = ZMQutils.getEventSubscriptionInfoFromAdmDevice(channelStruct.adm_device_proxy, eventCallBackStruct.device.name(), eventCallBackStruct.attr_name, eventCallBackStruct.event_name);
            String admDeviceName = channelStruct.adm_device_proxy.name();
            if (channelStruct.getTangoRelease() >= 810) {
                admDeviceName = admDeviceName.toLowerCase();
            }
            this.push_structured_event_heartbeat(admDeviceName);
            channelStruct.heartbeat_skipped = false;
            channelStruct.last_subscribed = System.currentTimeMillis();
            channelStruct.setTangoRelease(lsa.lvalue[0]);
            channelStruct.setIdlVersion(lsa.lvalue[1]);
            eventCallBackStruct.last_subscribed = channelStruct.last_subscribed;
            done = true;
        }
        catch (DevFailed e) {
            // empty catch block
        }
        return done;
    }

    @Override
    protected void removeFilters(EventCallBackStruct cb_struct) throws DevFailed {
    }

    @Override
    protected void checkIfHeartbeatSkipped(String name, EventChannelStruct channelStruct) {
        if (KeepAliveThread.heartbeatHasBeenSkipped(channelStruct)) {
            DevError dev_error = null;
            try {
                String admDeviceName = channelStruct.adm_device_proxy.fullName();
                if (channelStruct.getTangoRelease() >= 810) {
                    admDeviceName = admDeviceName.toLowerCase();
                }
                channelStruct.adm_device_proxy = new DeviceProxy(admDeviceName);
                channelStruct.adm_device_proxy.set_timeout_millis(300);
                channelStruct.adm_device_proxy.ping();
                this.reconnectToChannel(name);
            }
            catch (DevFailed e) {
                dev_error = e.errors[0];
            }
            Enumeration<EventCallBackStruct> callbackStructs = EventConsumer.getEventCallbackMap().elements();
            while (callbackStructs.hasMoreElements()) {
                EventCallBackStruct callbackStruct = callbackStructs.nextElement();
                if (!callbackStruct.channel_name.equals(name)) continue;
                if (dev_error != null) {
                    this.pushReceivedException(channelStruct, callbackStruct, dev_error);
                }
                if (!this.reconnectToEvent(channelStruct, callbackStruct) || callbackStruct.event_name.equals(eventNames[6])) continue;
                this.readAttributeAndPush(channelStruct, callbackStruct);
            }
        }
    }

    @Override
    protected void unsubscribeTheEvent(EventCallBackStruct callbackStruct) throws DevFailed {
        ZMQutils.disConnectEvent(callbackStruct.device.get_tango_host(), callbackStruct.device.name(), callbackStruct.attr_name, callbackStruct.device.get_idl_version(), callbackStruct.event_name);
    }

    @Override
    public void push_structured_event(StructuredEvent structuredEvent) throws Disconnected {
    }

    private boolean reconnectToEvent(EventChannelStruct channelStruct, EventCallBackStruct callBackStruct) {
        boolean reConnected;
        try {
            DevVarLongStringArray lsa = ZMQutils.getEventSubscriptionInfoFromAdmDevice(channelStruct.adm_device_proxy, callBackStruct.device.name(), callBackStruct.attr_name, callBackStruct.event_name);
            ZMQutils.connectEvent(callBackStruct.device.get_tango_host(), callBackStruct.device.name(), callBackStruct.attr_name, lsa, callBackStruct.event_name, true);
            reConnected = true;
        }
        catch (DevFailed e) {
            System.err.println(e.errors[0].desc);
            reConnected = false;
        }
        return reConnected;
    }

    private boolean reconnectToChannel(String name) {
        boolean reConnected = false;
        Enumeration callbackStructs = event_callback_map.elements();
        while (callbackStructs.hasMoreElements()) {
            EventCallBackStruct eventCallBackStruct = (EventCallBackStruct)callbackStructs.nextElement();
            if (!eventCallBackStruct.channel_name.equals(name) || eventCallBackStruct.callback == null) continue;
            try {
                EventChannelStruct channelStruct = (EventChannelStruct)channel_map.get(name);
                DevVarLongStringArray lsa = ZMQutils.getEventSubscriptionInfoFromAdmDevice(channelStruct.adm_device_proxy, eventCallBackStruct.device.name(), eventCallBackStruct.attr_name, eventCallBackStruct.event_name);
                ZMQutils.connectHeartbeat(channelStruct.adm_device_proxy.get_tango_host(), channelStruct.adm_device_proxy.name(), lsa, true);
                reConnected = true;
            }
            catch (DevFailed e1) {
                reConnected = false;
            }
            break;
        }
        return reConnected;
    }
}

