/*
 * Decompiled with CFR 0.152.
 */
package zmq;

import zmq.Address;
import zmq.Ctx;
import zmq.Dist;
import zmq.FQ;
import zmq.IOThread;
import zmq.Msg;
import zmq.Options;
import zmq.Pipe;
import zmq.SessionBase;
import zmq.SocketBase;
import zmq.Trie;

public class XSub
extends SocketBase {
    private final FQ fq;
    private final Dist dist;
    private final Trie subscriptions;
    private boolean has_message;
    private Msg message;
    private boolean more;
    private static Trie.ITrieHandler send_subscription = new Trie.ITrieHandler(){

        @Override
        public void added(byte[] data_, int size, Object arg_) {
            Pipe pipe = (Pipe)arg_;
            Msg msg = new Msg(size + 1);
            msg.put((byte)1);
            msg.put(data_, 1, size);
            boolean sent = pipe.write(msg);
            if (!sent) {
                msg.close();
            }
        }
    };

    public XSub(Ctx parent_, int tid_, int sid_) {
        super(parent_, tid_, sid_);
        this.options.type = 10;
        this.has_message = false;
        this.more = false;
        this.options.linger = 0;
        this.fq = new FQ();
        this.dist = new Dist();
        this.subscriptions = new Trie();
    }

    @Override
    protected void xattach_pipe(Pipe pipe_, boolean icanhasall_) {
        assert (pipe_ != null);
        this.fq.attach(pipe_);
        this.dist.attach(pipe_);
        this.subscriptions.apply(send_subscription, pipe_);
        pipe_.flush();
    }

    @Override
    protected void xread_activated(Pipe pipe_) {
        this.fq.activated(pipe_);
    }

    @Override
    protected void xwrite_activated(Pipe pipe_) {
        this.dist.activated(pipe_);
    }

    @Override
    protected void xterminated(Pipe pipe_) {
        this.fq.terminated(pipe_);
        this.dist.terminated(pipe_);
    }

    @Override
    protected void xhiccuped(Pipe pipe_) {
        this.subscriptions.apply(send_subscription, pipe_);
        pipe_.flush();
    }

    @Override
    protected boolean xsend(Msg msg_) {
        byte[] data = msg_.data();
        if (data.length < 1 || data[0] != 0 && data[0] != 1) {
            throw new IllegalArgumentException("subscription flag");
        }
        if (data[0] == 1) {
            this.subscriptions.add(data, 1);
            return this.dist.send_to_all(msg_);
        }
        if (this.subscriptions.rm(data, 1)) {
            return this.dist.send_to_all(msg_);
        }
        return true;
    }

    @Override
    protected boolean xhas_out() {
        return true;
    }

    @Override
    protected Msg xrecv() {
        Msg msg_ = null;
        if (this.has_message) {
            msg_ = this.message;
            this.has_message = false;
            this.more = msg_.has_more();
            return msg_;
        }
        block0: while (true) {
            if ((msg_ = this.fq.recv(this.errno)) == null) {
                return null;
            }
            if (this.more || !this.options.filter || this.match(msg_)) {
                this.more = msg_.has_more();
                return msg_;
            }
            do {
                if (!msg_.has_more()) continue block0;
                msg_ = this.fq.recv(this.errno);
            } while ($assertionsDisabled || msg_ != null);
            break;
        }
        throw new AssertionError();
    }

    @Override
    protected boolean xhas_in() {
        if (this.more) {
            return true;
        }
        if (this.has_message) {
            return true;
        }
        block0: while (true) {
            this.message = this.fq.recv(this.errno);
            if (this.message == null) {
                return false;
            }
            if (!this.options.filter || this.match(this.message)) {
                this.has_message = true;
                return true;
            }
            do {
                if (!this.message.has_more()) continue block0;
                this.message = this.fq.recv(this.errno);
            } while ($assertionsDisabled || this.message != null);
            break;
        }
        throw new AssertionError();
    }

    private boolean match(Msg msg_) {
        return this.subscriptions.check(msg_.data());
    }

    public static class XSubSession
    extends SessionBase {
        public XSubSession(IOThread io_thread_, boolean connect_, SocketBase socket_, Options options_, Address addr_) {
            super(io_thread_, connect_, socket_, options_, addr_);
        }
    }
}

