/*
 * Decompiled with CFR 0.152.
 */
package fr.esrf.TangoApi.events;

import fr.esrf.Tango.AttDataReady;
import fr.esrf.Tango.AttDataReadyHelper;
import fr.esrf.Tango.AttributeConfigHelper;
import fr.esrf.Tango.AttributeConfig_3Helper;
import fr.esrf.Tango.AttributeValueHelper;
import fr.esrf.Tango.AttributeValue_3Helper;
import fr.esrf.Tango.AttributeValue_4Helper;
import fr.esrf.Tango.DevError;
import fr.esrf.Tango.DevErrorListHelper;
import fr.esrf.Tango.DevFailed;
import fr.esrf.Tango.ErrSeverity;
import fr.esrf.TangoApi.ApiUtil;
import fr.esrf.TangoApi.AttributeInfoEx;
import fr.esrf.TangoApi.CallBack;
import fr.esrf.TangoApi.Database;
import fr.esrf.TangoApi.DeviceAttribute;
import fr.esrf.TangoApi.DeviceData;
import fr.esrf.TangoApi.DeviceProxy;
import fr.esrf.TangoApi.DeviceProxyFactory;
import fr.esrf.TangoApi.IORdump;
import fr.esrf.TangoApi.events.DbEventImportInfo;
import fr.esrf.TangoApi.events.EventCallBackStruct;
import fr.esrf.TangoApi.events.EventChannelStruct;
import fr.esrf.TangoApi.events.EventConsumer;
import fr.esrf.TangoApi.events.EventData;
import fr.esrf.TangoApi.events.EventQueue;
import fr.esrf.TangoApi.events.IEventConsumer;
import fr.esrf.TangoApi.events.KeepAliveThread;
import fr.esrf.TangoDs.Except;
import fr.esrf.TangoDs.TangoConst;
import java.util.Enumeration;
import org.jacorb.orb.policies.RelativeRoundtripTimeoutPolicy;
import org.omg.CORBA.IntHolder;
import org.omg.CORBA.ORB;
import org.omg.CORBA.Object;
import org.omg.CORBA.Policy;
import org.omg.CORBA.SetOverrideType;
import org.omg.CORBA.TCKind;
import org.omg.CORBA.TIMEOUT;
import org.omg.CORBA.TypeCode;
import org.omg.CORBA.TypeCodePackage.BadKind;
import org.omg.CORBA.UserException;
import org.omg.CosEventChannelAdmin.AlreadyConnected;
import org.omg.CosEventChannelAdmin.TypeError;
import org.omg.CosNotification.EventType;
import org.omg.CosNotification.StructuredEvent;
import org.omg.CosNotifyChannelAdmin.AdminLimitExceeded;
import org.omg.CosNotifyChannelAdmin.ClientType;
import org.omg.CosNotifyChannelAdmin.ConsumerAdmin;
import org.omg.CosNotifyChannelAdmin.EventChannel;
import org.omg.CosNotifyChannelAdmin.EventChannelHelper;
import org.omg.CosNotifyChannelAdmin.ProxySupplier;
import org.omg.CosNotifyChannelAdmin.StructuredProxyPushSupplier;
import org.omg.CosNotifyChannelAdmin.StructuredProxyPushSupplierHelper;
import org.omg.CosNotifyFilter.ConstraintExp;
import org.omg.CosNotifyFilter.Filter;
import org.omg.CosNotifyFilter.FilterFactory;
import org.omg.CosNotifyFilter.FilterNotFound;
import org.omg.CosNotifyFilter.InvalidConstraint;
import org.omg.CosNotifyFilter.InvalidGrammar;
import org.omg.PortableServer.POA;
import org.omg.PortableServer.POAHelper;
import org.omg.PortableServer.POAManager;

public class NotifdEventConsumer
extends EventConsumer
implements TangoConst,
Runnable,
IEventConsumer {
    private static NotifdEventConsumer instance = null;
    private EventChannel eventChannel;
    private ConsumerAdmin consumerAdmin;
    private ProxySupplier proxySupplier;
    private ORB orb = ApiUtil.get_orb();
    private Thread runner = new Thread(this);
    private boolean orbRunning = false;

    public static NotifdEventConsumer create() throws DevFailed {
        if (instance == null) {
            instance = new NotifdEventConsumer();
        }
        return instance;
    }

    public static NotifdEventConsumer getInstance() throws DevFailed {
        if (instance == null) {
            instance = new NotifdEventConsumer();
        }
        return instance;
    }

    private NotifdEventConsumer() throws DevFailed {
        this.runner.setName("NotifdEventConsumer");
        Runtime.getRuntime().addShutdownHook(new Thread(){

            @Override
            public void run() {
                System.out.println("======== Shutting down notifd event system =======");
                NotifdEventConsumer.this.cleanup_heartbeat_filters();
                NotifdEventConsumer.this.cleanup_event_filters();
                NotifdEventConsumer.this.cleanup_heartbeat_proxies();
                KeepAliveThread.getInstance().stopThread();
                if (NotifdEventConsumer.this.orbRunning) {
                    NotifdEventConsumer.this.orb.shutdown(true);
                    try {
                        NotifdEventConsumer.this.runner.join();
                    }
                    catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        });
        this.runner.start();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        block5: {
            try {
                if (ApiUtil.in_server()) break block5;
                NotifdEventConsumer notifdEventConsumer = this;
                synchronized (notifdEventConsumer) {
                    Object obj = this.orb.resolve_initial_references("RootPOA");
                    POA poa = POAHelper.narrow(obj);
                    POAManager pman = poa.the_POAManager();
                    pman.activate();
                }
                this.orbRunning = true;
                this.orb.run();
                this.orb.destroy();
            }
            catch (UserException ex) {
                System.err.println("EventConsumer.run() : Unable to start orb");
                ex.printStackTrace();
                System.exit(1);
            }
        }
    }

    @Override
    public int subscribe_event(DeviceProxy device, int event, CallBack callback, int max_size, boolean stateless) throws DevFailed {
        return 0;
    }

    private java.lang.Object extractAttributeObject(StructuredEvent notification) throws BadKind {
        TypeCode ty = notification.remainder_of_body.type();
        if (ty.kind().equals(TCKind.tk_struct)) {
            String ty_name = ty.name();
            if (ty_name.equals("AttDataReady")) {
                return AttDataReadyHelper.extract(notification.remainder_of_body);
            }
            if (ty_name.equals("AttributeConfig_3")) {
                return new AttributeInfoEx(AttributeConfig_3Helper.extract(notification.remainder_of_body));
            }
            if (ty_name.equals("AttributeConfig_2")) {
                return new AttributeInfoEx(AttributeConfigHelper.extract(notification.remainder_of_body));
            }
            if (ty_name.equals("AttributeValue_4")) {
                return new DeviceAttribute(AttributeValue_4Helper.extract(notification.remainder_of_body));
            }
            if (ty_name.equals("AttributeValue_3")) {
                return new DeviceAttribute(AttributeValue_3Helper.extract(notification.remainder_of_body));
            }
            if (ty_name.equals("AttributeValue")) {
                return new DeviceAttribute(AttributeValueHelper.extract(notification.remainder_of_body));
            }
            DevError[] dev_err_list = new DevError[]{new DevError("API_IncompatibleAttrDataType", ErrSeverity.ERR, "Unknown structure used to pass attribute value (Need compilation ?)", "EventConsumer::extractAttributeObject()")};
            return dev_err_list;
        }
        return DevErrorListHelper.extract(notification.remainder_of_body);
    }

    private EventCallBackStruct getEventCallBackStruct(String eventName) {
        Enumeration keys = event_callback_map.keys();
        while (keys.hasMoreElements()) {
            int start;
            String key = (String)keys.nextElement();
            String shortName = key.substring((start = key.indexOf(47, "tango:// ".length())) + 1);
            if (!eventName.equalsIgnoreCase(shortName)) continue;
            return (EventCallBackStruct)event_callback_map.get(key);
        }
        return null;
    }

    @Override
    public void push_structured_event(StructuredEvent notification) {
        String domainName = notification.header.fixed_header.event_type.domain_name;
        String eventType = notification.header.fixed_header.event_name;
        try {
            if (eventType.equals("heartbeat")) {
                this.push_structured_event_heartbeat(domainName);
                return;
            }
            String eventName = domainName + "." + eventType;
            EventCallBackStruct callBackStruct = this.getEventCallBackStruct(eventName);
            if (callBackStruct != null) {
                CallBack callback = callBackStruct.callback;
                DeviceAttribute attr_value = null;
                AttributeInfoEx attr_config = null;
                AttDataReady data_ready = null;
                DevError[] dev_err_list = null;
                java.lang.Object obj = this.extractAttributeObject(notification);
                if (obj instanceof AttributeInfoEx) {
                    attr_config = (AttributeInfoEx)obj;
                } else if (obj instanceof AttDataReady) {
                    data_ready = (AttDataReady)obj;
                } else if (obj instanceof DeviceAttribute) {
                    attr_value = (DeviceAttribute)obj;
                } else if (obj instanceof DevError[]) {
                    dev_err_list = (DevError[])obj;
                }
                EventData event_data = new EventData(callBackStruct.device, domainName, eventType, callBackStruct.event_type, 1, attr_value, attr_config, data_ready, null, dev_err_list);
                if (callBackStruct.use_ev_queue) {
                    EventQueue ev_queue = callBackStruct.device.getEventQueue();
                    ev_queue.insert_event(event_data);
                } else if (callback != null) {
                    callback.push_event(event_data);
                }
            } else {
                System.err.println(eventName + " event not found");
            }
        }
        catch (Exception e) {
            e.printStackTrace();
        }
    }

    private void cleanup_event_filters() {
        Enumeration keys = event_callback_map.keys();
        while (keys.hasMoreElements()) {
            String name = (String)keys.nextElement();
            EventCallBackStruct callback_struct = (EventCallBackStruct)event_callback_map.get(name);
            if (!(callback_struct.consumer instanceof NotifdEventConsumer)) continue;
            try {
                EventChannelStruct ec_struct = (EventChannelStruct)channel_map.get(callback_struct.channel_name);
                Filter filter = ec_struct.structuredProxyPushSupplier.get_filter(callback_struct.filter_id);
                ec_struct.structuredProxyPushSupplier.remove_filter(callback_struct.filter_id);
                filter.destroy();
            }
            catch (FilterNotFound e) {}
        }
    }

    private void cleanup_heartbeat_proxies() {
        Enumeration keys = event_callback_map.keys();
        while (keys.hasMoreElements()) {
            String name = (String)keys.nextElement();
            EventCallBackStruct callback_struct = (EventCallBackStruct)event_callback_map.get(name);
            try {
                EventChannelStruct ec_struct = (EventChannelStruct)channel_map.get(callback_struct.channel_name);
                if (ec_struct.structuredProxyPushSupplier != null) {
                    ec_struct.structuredProxyPushSupplier.disconnect_structured_push_supplier();
                    ec_struct.structuredProxyPushSupplier = null;
                }
            }
            catch (Exception e) {
                System.out.println(e);
            }
            event_callback_map.remove(name);
        }
    }

    private DbEventImportInfo getEventImportInfo(String channelName, Database dbase, DeviceProxy adminDevice) throws DevFailed {
        DbEventImportInfo received = null;
        try {
            if (dbase != null) {
                adminDevice = DeviceProxyFactory.get(channelName, dbase.getUrl().getTangoHost());
                received = adminDevice.get_evt_import_info();
                if (received == null || !received.channel_exported) {
                    received = dbase.import_event(channelName);
                }
            } else {
                received = new DbEventImportInfo();
                adminDevice = DeviceProxyFactory.get(channelName);
                DeviceData data = adminDevice.command_inout("QueryEventChannelIOR");
                received.channel_ior = data.extractString();
                received.channel_exported = true;
                IORdump id = new IORdump(null, received.channel_ior);
                received.host = id.get_hostname();
            }
        }
        catch (DevFailed df) {
            if (dbase != null) {
                Except.throw_event_system_failed("API_NotificationServiceFailed", channelName + " has no event channel defined in the database " + dbase.getUrl().getTangoHost() + "\nMay be the server is not running.", "EventConsumer.connect_event_channel");
            }
            Except.throw_event_system_failed("API_NotificationServiceFailed", channelName + " did not returned event channel IOR\n" + " May be the server is not running.", "EventConsumer.connect_event_channel");
        }
        return received;
    }

    @Override
    protected void checkDeviceConnection(DeviceProxy device, String attribute, DeviceData deviceData, String event_name) throws DevFailed {
        String deviceName = device.name();
        if (!device_channel_map.containsKey(deviceName)) {
            this.connect(device, attribute, event_name, deviceData);
            if (!device_channel_map.containsKey(deviceName)) {
                Except.throw_event_system_failed("API_NotificationServiceFailed", "Failed to connect to event channel for device", "EventConsumer.subscribe_event()");
            }
        }
    }

    private void connectToNotificationDaemon(DbEventImportInfo received) throws DevFailed {
        boolean channel_exported = received.channel_exported;
        if (channel_exported) {
            Object event_channel_obj = this.orb.string_to_object(received.channel_ior);
            try {
                this.eventChannel = EventChannelHelper.narrow(event_channel_obj);
                RelativeRoundtripTimeoutPolicy p = new RelativeRoundtripTimeoutPolicy(30000000L);
                this.eventChannel._set_policy_override(new Policy[]{p}, SetOverrideType.ADD_OVERRIDE);
            }
            catch (RuntimeException e) {
                Except.throw_event_system_failed("API_NotificationServiceFailed", "Failed to connect notification daemon (hint : make sure the notifd daemon is running on this host", "EventConsumer.connect_event_channel");
            }
            if (this.eventChannel == null) {
                channel_exported = false;
            }
        }
        if (!channel_exported) {
            Except.throw_event_system_failed("API_NotificationServiceFailed", "Failed to narrow EventChannel from notification daemon (hint : make sure the notifd daemon is running on this host", "EventConsumer.connect_event_channel");
        }
        try {
            this.consumerAdmin = this.eventChannel.default_consumer_admin();
        }
        catch (Exception e) {
            Except.throw_event_system_failed("API_NotificationServiceFailed", "Received " + e.toString() + "\nduring eventChannel.default_consumer_admin() call", "EventConsumer.connect_event_channel");
        }
        if (this.consumerAdmin == null) {
            Except.throw_event_system_failed("API_NotificationServiceFailed", "Failed to get default consumer admin from notification daemon (hint : make sure the notifd daemon is running on this host", "EventConsumer.connect_event_channel");
        }
        IntHolder pId = new IntHolder();
        try {
            this.proxySupplier = this.consumerAdmin.obtain_notification_push_supplier(ClientType.STRUCTURED_EVENT, pId);
            if (this.proxySupplier == null) {
                Except.throw_event_system_failed("API_NotificationServiceFailed", "Failed to get a push supplier from notification daemon (hint : make sure the notifd daemon is running on this host", "EventConsumer.connect_event_channel");
            }
        }
        catch (TIMEOUT ex) {
            Except.throw_event_system_failed("API_NotificationServiceFailed", "Failed to get a push supplier due to a Timeout", "EventConsumer.connect_event_channel");
        }
        catch (AdminLimitExceeded ex) {
            Except.throw_event_system_failed("API_NotificationServiceFailed", "Failed to get a push supplier due to AdminLimitExceeded (hint : make sure the notifd daemon is running on this host", "EventConsumer.connect_event_channel");
        }
    }

    private StructuredProxyPushSupplier getStructuredProxyPushSupplier(String channelName) throws DevFailed {
        StructuredProxyPushSupplier structuredProxyPushSupplier = StructuredProxyPushSupplierHelper.narrow(this.proxySupplier);
        if (structuredProxyPushSupplier == null) {
            Except.throw_event_system_failed("API_NotificationServiceFailed", "Failed to narrow the push supplier due to AdminLimitExceeded (hint : make sure the notifd daemon is running on this host", "EventConsumer.connect_event_channel");
            return null;
        }
        try {
            structuredProxyPushSupplier.connect_structured_push_consumer(this._this(this.orb));
        }
        catch (NullPointerException e) {
            e.printStackTrace();
            Except.throw_event_system_failed("API_NotificationServiceFailed", e + " detected when subscribing to " + channelName, "EventConsumer.connect_event_channel");
        }
        catch (AlreadyConnected ex) {
            Except.throw_event_system_failed("API_NotificationServiceFailed", "Failed to connect the push supplier due to CosEventChannelAdmin.AlreadyConnected.AlreadyConnected  exception", "EventConsumer.connect_event_channel");
        }
        catch (TypeError ex) {
            Except.throw_event_system_failed("API_NotificationServiceFailed", "Failed to connect the push supplier due to CosEventChannelAdmin.AlreadyConnected.TypeError  exception", "EventConsumer.connect_event_channel");
        }
        return structuredProxyPushSupplier;
    }

    private void connect(DeviceProxy device_proxy, String attributeName, String eventName, DeviceData deviceData) throws DevFailed {
        String deviceName = device_proxy.name();
        String adm_name = null;
        try {
            adm_name = device_proxy.adm_name();
        }
        catch (DevFailed e) {
            Except.throw_event_system_failed("API_BadConfigurationProperty", "Can't subscribe to event for device " + deviceName + "\n Check that device server is running...", "NotifdEventConsumer.connect");
        }
        String channelName = adm_name;
        Database dbase = null;
        if (!channel_map.containsKey(channelName)) {
            if (device_proxy.use_db()) {
                dbase = device_proxy.get_db_obj();
            }
            EventConsumer.ConnectionStructure connectionStructure = new EventConsumer.ConnectionStructure(device_proxy.get_tango_host(), channelName, deviceName, attributeName, eventName, dbase, deviceData, false);
            this.connect_event_channel(connectionStructure);
        } else if (device_proxy.use_db()) {
            dbase = device_proxy.get_db_obj();
        }
        EventChannelStruct eventChannelStruct = (EventChannelStruct)channel_map.get(channelName);
        eventChannelStruct.adm_device_proxy = device_proxy.get_adm_dev();
        eventChannelStruct.use_db = device_proxy.use_db();
        eventChannelStruct.dbase = dbase;
        device_channel_map.put(deviceName, channelName);
    }

    @Override
    protected synchronized void connect_event_channel(EventConsumer.ConnectionStructure cs) throws DevFailed {
        DeviceProxy adminDevice = DeviceProxyFactory.get(cs.channelName, cs.dbase.getUrl().getTangoHost());
        DbEventImportInfo received = this.getEventImportInfo(cs.channelName, cs.dbase, adminDevice);
        int idx = received.host.indexOf(46);
        if (idx > 0) {
            received.host = received.host.substring(0, idx);
        }
        this.connectToNotificationDaemon(received);
        StructuredProxyPushSupplier structuredProxyPushSupplier = this.getStructuredProxyPushSupplier(cs.channelName);
        if (cs.reconnect) {
            EventChannelStruct eventChannelStruct = (EventChannelStruct)channel_map.get(cs.channelName);
            eventChannelStruct.eventChannel = this.eventChannel;
            eventChannelStruct.structuredProxyPushSupplier = structuredProxyPushSupplier;
            eventChannelStruct.last_heartbeat = System.currentTimeMillis();
            eventChannelStruct.heartbeat_skipped = false;
            eventChannelStruct.host = received.host;
            eventChannelStruct.has_notifd_closed_the_connection = 0;
            try {
                int filter_id = eventChannelStruct.heartbeat_filter_id;
                Filter filter = eventChannelStruct.structuredProxyPushSupplier.get_filter(filter_id);
                eventChannelStruct.structuredProxyPushSupplier.remove_filter(filter_id);
                filter.destroy();
            }
            catch (FilterNotFound e) {
                // empty catch block
            }
            String constraint_expr = "$event_name == 'heartbeat'";
            eventChannelStruct.heartbeat_filter_id = this.add_filter_for_channel(eventChannelStruct, constraint_expr);
            this.setEventChannelTimeoutMillis(eventChannelStruct.eventChannel, 3000);
        } else {
            EventChannelStruct newEventChannelStruct = new EventChannelStruct();
            newEventChannelStruct.eventChannel = this.eventChannel;
            newEventChannelStruct.structuredProxyPushSupplier = structuredProxyPushSupplier;
            newEventChannelStruct.last_heartbeat = System.currentTimeMillis();
            newEventChannelStruct.heartbeat_skipped = false;
            newEventChannelStruct.adm_device_proxy = adminDevice;
            newEventChannelStruct.host = received.host;
            newEventChannelStruct.has_notifd_closed_the_connection = 0;
            newEventChannelStruct.consumer = this;
            String constraint_expr = "$event_name == 'heartbeat'";
            newEventChannelStruct.heartbeat_filter_id = this.add_filter_for_channel(newEventChannelStruct, constraint_expr);
            channel_map.put(cs.channelName, newEventChannelStruct);
            this.setEventChannelTimeoutMillis(newEventChannelStruct.eventChannel, 3000);
        }
    }

    int add_filter_for_channel(EventChannelStruct event_channel_struct, String constraint_expr) throws DevFailed {
        Filter filter = null;
        int filter_id = -1;
        try {
            FilterFactory ffp = event_channel_struct.eventChannel.default_filter_factory();
            filter = ffp.create_filter("EXTENDED_TCL");
        }
        catch (InvalidGrammar ex) {
            Except.throw_event_system_failed("API_NotificationServiceFailed", "Caught Invalid Grammar exception while creating heartbeat filter : check filter", "EventConsumer.add_filter_for_channel");
        }
        ConstraintExp[] exp = new ConstraintExp[]{new ConstraintExp()};
        exp[0].event_types = new EventType[0];
        exp[0].constraint_expr = constraint_expr;
        try {
            if (filter != null) {
                filter.add_constraints(exp);
                filter_id = event_channel_struct.structuredProxyPushSupplier.add_filter(filter);
            }
        }
        catch (InvalidConstraint ex) {
            filter.destroy();
            Except.throw_event_system_failed("API_NotificationServiceFailed", "Caught InvalidConstraint exception while adding constraint for heartbeat : check filter", "EventConsumer.add_filter_for_channel");
        }
        return filter_id;
    }

    private String buildConstraintExpr(String device_name, String attribute, String event_name, String[] filters) {
        String str = "$domain_name == '" + device_name.toLowerCase() + "/" + attribute.toLowerCase() + "'" + " and $event_name == '" + event_name + "'";
        if (filters != null && filters.length != 0) {
            str = str + " and ((";
            for (String filter : filters) {
                str = str + filter;
            }
            str = str + " ) and $forced_event > 0.5 )";
        }
        return str;
    }

    private boolean checkIfHostHasChanged(EventChannelStruct event_channel_struct) {
        boolean retVal = false;
        try {
            IORdump dump = new IORdump(event_channel_struct.adm_device_proxy);
            String server_host = dump.get_hostname();
            int idx = server_host.indexOf(46);
            if (idx > 0) {
                server_host = server_host.substring(0, idx);
            }
            if (!event_channel_struct.host.equals(server_host)) {
                retVal = true;
            }
        }
        catch (DevFailed devFailed) {
            // empty catch block
        }
        return retVal;
    }

    @Override
    protected void checkIfAlreadyConnected(DeviceProxy device, String attribute, String event_name, CallBack callback, int max_size, boolean stateless) throws DevFailed {
        if (device == null || callback == null && max_size < 0) {
            Except.throw_wrong_syntax_exception("API_InvalidArgs", "Device or callback pointer NULL and  event queue not used !!", "EventConsumer.subscribe_event()");
        }
        if (device == null || device.name() == null) {
            Except.throw_event_system_failed("API_NotificationServiceFailed", "Failed to connect to device", "EventConsumer.subscribe_event()");
        } else {
            String callback_key = device.name().toLowerCase() + "/" + attribute + "." + event_name;
            if (event_callback_map.containsKey(callback_key)) {
                Except.throw_event_system_failed("API_MethodArgument", "Already connected to event " + callback_key, "EventConsumer.subscribe_event()");
            }
            if (stateless && failed_event_callback_map.containsKey(callback_key)) {
                Except.throw_event_system_failed("API_MethodArgument", "Already trying to connect to event " + callback_key, "EventConsumer.subscribe_event()");
            }
        }
    }

    @Override
    protected String getEventSubscriptionCommandName() {
        return "EventSubscriptionChange";
    }

    @Override
    protected void setAdditionalInfoToEventCallBackStruct(EventCallBackStruct callback_struct, String device_name, String attribute, String event_name, String[] filters, EventChannelStruct channel_struct) throws DevFailed {
        String constraint_expr = this.buildConstraintExpr(device_name, attribute, event_name, filters);
        int filter_id = this.add_filter_for_channel(channel_struct, constraint_expr);
        callback_struct.filter_constraint = constraint_expr;
        callback_struct.filter_id = filter_id;
        callback_struct.consumer = this;
    }

    private void pushServerNotRespondingException(EventChannelStruct eventChannelStruct, EventCallBackStruct callbackStruct) {
        try {
            if (eventChannelStruct != null) {
                if (eventChannelStruct.consumer instanceof NotifdEventConsumer && !callbackStruct.filter_ok) {
                    callbackStruct.filter_id = NotifdEventConsumer.getInstance().add_filter_for_channel(eventChannelStruct, callbackStruct.filter_constraint);
                    callbackStruct.filter_ok = true;
                }
            } else {
                return;
            }
            CallBack callback = callbackStruct.callback;
            DevError[] errors = new DevError[]{new DevError()};
            errors[0].severity = ErrSeverity.ERR;
            errors[0].origin = "EventConsumer.KeepAliveThread";
            errors[0].reason = "API_EventTimeout";
            errors[0].desc = "Event channel is not responding any more, maybe the server or event system is down";
            String domain_name = callbackStruct.device.name() + "/" + callbackStruct.attr_name.toLowerCase();
            EventData event_data = new EventData(eventChannelStruct.adm_device_proxy, domain_name, callbackStruct.event_name, 1, callbackStruct.event_type, null, null, null, null, errors);
            event_data.device = callbackStruct.device;
            event_data.name = callbackStruct.device.name();
            event_data.event = callbackStruct.event_name;
            if (callbackStruct.use_ev_queue) {
                EventQueue ev_queue = callbackStruct.device.getEventQueue();
                ev_queue.insert_event(event_data);
            } else {
                callback.push_event(event_data);
            }
        }
        catch (DevFailed devFailed) {
            // empty catch block
        }
    }

    @Override
    protected void checkIfHeartbeatSkipped(String name, EventChannelStruct eventChannelStruct) {
        if (KeepAliveThread.heartbeatHasBeenSkipped(eventChannelStruct) || eventChannelStruct.heartbeat_skipped || eventChannelStruct.notifd_failed) {
            eventChannelStruct.heartbeat_skipped = true;
            DevError dev_error = null;
            try {
                eventChannelStruct.eventChannel.MyFactory();
                if (this.checkIfHostHasChanged(eventChannelStruct)) {
                    eventChannelStruct.notifd_failed = true;
                }
            }
            catch (RuntimeException e1) {
                dev_error = new DevError();
                dev_error.severity = ErrSeverity.ERR;
                dev_error.origin = "NotifdEventConsumer.checkIfHeartbeatSkipped()";
                dev_error.reason = "API_EventException";
                dev_error.desc = "Connection failed with notify daemon";
                int pos = e1.toString().indexOf(":");
                if (pos > 0) {
                    dev_error.desc = dev_error.desc + "  (" + e1.toString().substring(0, pos) + ")";
                }
                eventChannelStruct.notifd_failed = true;
                try {
                    DeviceProxyFactory.get(name, eventChannelStruct.dbase.getUrl().getTangoHost()).set_evt_import_info(null);
                }
                catch (DevFailed e) {
                    System.err.println("API received a DevFailed :\t" + e.errors[0].desc);
                }
            }
            if (!eventChannelStruct.use_db) {
                eventChannelStruct.notifd_failed = true;
            }
            if (!eventChannelStruct.notifd_failed && eventChannelStruct.has_notifd_closed_the_connection >= 3) {
                eventChannelStruct.notifd_failed = true;
            }
            if (eventChannelStruct.notifd_failed) {
                boolean bl = eventChannelStruct.notifd_failed = !this.reconnect_to_channel(name);
                if (!eventChannelStruct.notifd_failed) {
                    this.reconnect_to_event(name);
                }
            }
            Enumeration<EventCallBackStruct> callback_structs = EventConsumer.getEventCallbackMap().elements();
            while (callback_structs.hasMoreElements()) {
                EventCallBackStruct callback_struct = callback_structs.nextElement();
                if (!callback_struct.channel_name.equals(name)) continue;
                if (dev_error != null) {
                    this.pushReceivedException(eventChannelStruct, callback_struct, dev_error);
                } else {
                    this.pushServerNotRespondingException(eventChannelStruct, callback_struct);
                }
                if (callback_struct.event_name.equals(eventNames[6]) || eventChannelStruct.notifd_failed || !eventChannelStruct.consumer.reSubscribe(eventChannelStruct, callback_struct)) continue;
                this.readAttributeAndPush(eventChannelStruct, callback_struct);
            }
        } else {
            eventChannelStruct.has_notifd_closed_the_connection = 0;
        }
    }

    @Override
    protected boolean reSubscribe(EventChannelStruct event_channel_struct, EventCallBackStruct callback_struct) {
        boolean retVal = true;
        try {
            DeviceData subscriber_in = new DeviceData();
            String[] subscriber_info = new String[]{callback_struct.device.name(), callback_struct.attr_name, "subscribe", callback_struct.event_name};
            subscriber_in.insert(subscriber_info);
            event_channel_struct.adm_device_proxy.command_inout("EventSubscriptionChange", subscriber_in);
            event_channel_struct.heartbeat_skipped = false;
            callback_struct.last_subscribed = event_channel_struct.last_subscribed = System.currentTimeMillis();
        }
        catch (Exception e) {
            retVal = false;
        }
        return retVal;
    }

    void reconnect_to_event(String name) {
        Enumeration callback_structs = event_callback_map.elements();
        while (callback_structs.hasMoreElements()) {
            EventCallBackStruct callback_struct = (EventCallBackStruct)callback_structs.nextElement();
            if (!callback_struct.channel_name.equals(name) || callback_struct.callback == null) continue;
            try {
                EventChannelStruct event_channel_struct = (EventChannelStruct)channel_map.get(name);
                callback_struct.filter_id = this.add_filter_for_channel(event_channel_struct, callback_struct.filter_constraint);
                callback_struct.filter_ok = true;
            }
            catch (DevFailed e1) {
                callback_struct.filter_ok = false;
            }
        }
    }

    private void setEventChannelTimeoutMillis(EventChannel eventChannel, int millis) {
        RelativeRoundtripTimeoutPolicy p = new RelativeRoundtripTimeoutPolicy(10000 * millis);
        eventChannel._set_policy_override(new Policy[]{p}, SetOverrideType.ADD_OVERRIDE);
    }

    private void cleanup_heartbeat_filters() {
        Enumeration channel_names = channel_map.keys();
        while (channel_names.hasMoreElements()) {
            String name = (String)channel_names.nextElement();
            EventChannelStruct eventChannelStruct = (EventChannelStruct)channel_map.get(name);
            if (!(eventChannelStruct.consumer instanceof NotifdEventConsumer)) continue;
            try {
                int filter_id = eventChannelStruct.heartbeat_filter_id;
                Filter filter = eventChannelStruct.structuredProxyPushSupplier.get_filter(filter_id);
                eventChannelStruct.structuredProxyPushSupplier.remove_filter(filter_id);
                filter.destroy();
            }
            catch (FilterNotFound e) {}
        }
    }

    @Override
    protected void removeFilters(EventCallBackStruct cb_struct) throws DevFailed {
        try {
            StructuredProxyPushSupplier supplier;
            Filter filter;
            EventChannelStruct ec_struct = (EventChannelStruct)channel_map.get(cb_struct.channel_name);
            if (ec_struct != null && (filter = (supplier = ec_struct.structuredProxyPushSupplier).get_filter(cb_struct.filter_id)) != null) {
                supplier.remove_filter(cb_struct.filter_id);
                filter.destroy();
            }
        }
        catch (FilterNotFound e) {
            Except.throw_event_system_failed("API_EventNotFound", "Failed to unsubscribe event, caught exception while calling remove_filter() (hint: check notification daemon is running)", "EventConsumer.unsubscribe_event()");
        }
    }

    @Override
    protected void unsubscribeTheEvent(EventCallBackStruct callbackStruct) {
    }

    boolean reconnect_to_channel(String name) {
        boolean ret = true;
        Enumeration callback_structs = event_callback_map.elements();
        while (callback_structs.hasMoreElements()) {
            EventCallBackStruct callback_struct = (EventCallBackStruct)callback_structs.nextElement();
            if (!callback_struct.channel_name.equals(name) || callback_struct.callback == null) continue;
            try {
                EventChannelStruct event_channel_struct = (EventChannelStruct)channel_map.get(name);
                event_channel_struct.adm_device_proxy.ping();
                this.connect_event_channel(new EventConsumer.ConnectionStructure(callback_struct.device.get_tango_host(), name, event_channel_struct.dbase, true));
                ret = true;
            }
            catch (DevFailed e1) {
                ret = false;
            }
            break;
        }
        return ret;
    }
}

