/*
 * Decompiled with CFR 0.152.
 */
package zmq;

import java.util.ArrayDeque;
import java.util.Deque;
import zmq.Address;
import zmq.Blob;
import zmq.Ctx;
import zmq.Dist;
import zmq.IOThread;
import zmq.Msg;
import zmq.Mtrie;
import zmq.Options;
import zmq.Pipe;
import zmq.SessionBase;
import zmq.SocketBase;

public class XPub
extends SocketBase {
    private final Mtrie subscriptions;
    private final Dist dist;
    boolean verbose;
    private boolean more;
    private final Deque<Blob> pending_data;
    private final Deque<Integer> pending_flags;
    private static Mtrie.IMtrieHandler mark_as_matching = new Mtrie.IMtrieHandler(){

        @Override
        public void invoke(Pipe pipe_, byte[] data, int size, Object arg_) {
            XPub self = (XPub)arg_;
            self.dist.match(pipe_);
        }
    };
    private static Mtrie.IMtrieHandler send_unsubscription = new Mtrie.IMtrieHandler(){

        @Override
        public void invoke(Pipe pipe_, byte[] data_, int size, Object arg_) {
            XPub self = (XPub)arg_;
            if (self.options.type != 1) {
                byte[] unsub = new byte[size + 1];
                unsub[0] = 0;
                System.arraycopy(data_, 0, unsub, 1, size);
                self.pending_data.add(Blob.createBlob(unsub, false));
                self.pending_flags.add(0);
            }
        }
    };

    public XPub(Ctx parent_, int tid_, int sid_) {
        super(parent_, tid_, sid_);
        this.options.type = 9;
        this.verbose = false;
        this.more = false;
        this.subscriptions = new Mtrie();
        this.dist = new Dist();
        this.pending_data = new ArrayDeque<Blob>();
        this.pending_flags = new ArrayDeque<Integer>();
    }

    @Override
    protected void xattach_pipe(Pipe pipe_, boolean icanhasall_) {
        assert (pipe_ != null);
        this.dist.attach(pipe_);
        if (icanhasall_) {
            this.subscriptions.add(null, pipe_);
        }
        this.xread_activated(pipe_);
    }

    @Override
    protected void xread_activated(Pipe pipe_) {
        Msg sub = null;
        while ((sub = pipe_.read()) != null) {
            byte[] data = sub.data();
            int size = sub.size();
            if (size > 0 && (data[0] == 0 || data[0] == 1)) {
                boolean unique = data[0] == 0 ? this.subscriptions.rm(data, 1, pipe_) : this.subscriptions.add(data, 1, pipe_);
                if (this.options.type != 9 || !unique && (data[0] != 1 || !this.verbose)) continue;
                this.pending_data.add(Blob.createBlob(data, true));
                this.pending_flags.add(0);
                continue;
            }
            this.pending_data.add(Blob.createBlob(data, true));
            this.pending_flags.add(sub.flags());
        }
    }

    @Override
    protected void xwrite_activated(Pipe pipe_) {
        this.dist.activated(pipe_);
    }

    @Override
    public boolean xsetsockopt(int option_, Object optval_) {
        if (option_ != 40) {
            return false;
        }
        this.verbose = (Integer)optval_ == 1;
        return true;
    }

    @Override
    protected void xterminated(Pipe pipe_) {
        this.subscriptions.rm(pipe_, send_unsubscription, this);
        this.dist.terminated(pipe_);
    }

    @Override
    protected boolean xsend(Msg msg_) {
        boolean rc;
        boolean msg_more = msg_.hasMore();
        if (!this.more) {
            this.subscriptions.match(msg_.data(), msg_.size(), mark_as_matching, this);
        }
        if (!(rc = this.dist.send_to_matching(msg_))) {
            return false;
        }
        if (!msg_more) {
            this.dist.unmatch();
        }
        this.more = msg_more;
        return true;
    }

    @Override
    protected boolean xhas_out() {
        return this.dist.has_out();
    }

    @Override
    protected Msg xrecv() {
        if (this.pending_data.isEmpty()) {
            this.errno.set(35);
            return null;
        }
        Blob first = this.pending_data.pollFirst();
        Msg msg = new Msg(first.data());
        int flags = this.pending_flags.pollFirst();
        msg.setFlags(flags);
        return msg;
    }

    @Override
    protected boolean xhas_in() {
        return !this.pending_data.isEmpty();
    }

    public static class XPubSession
    extends SessionBase {
        public XPubSession(IOThread io_thread_, boolean connect_, SocketBase socket_, Options options_, Address addr_) {
            super(io_thread_, connect_, socket_, options_, addr_);
        }
    }
}

