package io.rsocket.broker.spring.cluster;

import io.rsocket.RSocket;
import io.rsocket.broker.common.Id;
import io.rsocket.broker.frames.BrokerInfo;
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;

/* loaded from: input_file:io/rsocket/broker/spring/cluster/AbstractConnections.class */
public abstract class AbstractConnections<T> {
    protected final Logger logger = LoggerFactory.getLogger(getClass());
    protected final Sinks.Many<BrokerInfoEntry<T>> joinEvents = Sinks.many().multicast().directBestEffort();
    protected final Sinks.Many<BrokerInfo> leaveEvents = Sinks.many().multicast().directBestEffort();
    protected final Map<Id, BrokerInfoEntry<T>> connections = new ConcurrentHashMap();

    /* loaded from: input_file:io/rsocket/broker/spring/cluster/AbstractConnections$BrokerInfoEntry.class */
    public static class BrokerInfoEntry<T> {
        final T value;
        final BrokerInfo brokerInfo;

        public BrokerInfoEntry(T t, BrokerInfo brokerInfo) {
            this.value = t;
            this.brokerInfo = brokerInfo;
        }

        public T getValue() {
            return this.value;
        }

        public BrokerInfo getBrokerInfo() {
            return this.brokerInfo;
        }
    }

    public boolean contains(BrokerInfo brokerInfo) {
        return this.connections.containsKey(brokerInfo.getBrokerId());
    }

    public Collection<BrokerInfoEntry<T>> entries() {
        return this.connections.values();
    }

    public T get(BrokerInfo brokerInfo) {
        BrokerInfoEntry<T> brokerInfoEntry = this.connections.get(brokerInfo.getBrokerId());
        if (brokerInfoEntry == null) {
            return null;
        }
        return brokerInfoEntry.value;
    }

    public T put(BrokerInfo brokerInfo, T t) {
        this.logger.debug("adding {} RSocket {}", brokerInfo, t);
        BrokerInfoEntry<T> brokerInfoEntry = new BrokerInfoEntry<>(t, brokerInfo);
        BrokerInfoEntry<T> put = this.connections.put(brokerInfo.getBrokerId(), brokerInfoEntry);
        if (put != null) {
            return put.value;
        }
        this.joinEvents.tryEmitNext(brokerInfoEntry);
        registerCleanup(brokerInfo, t);
        return null;
    }

    public T remove(BrokerInfo brokerInfo) {
        BrokerInfoEntry<T> remove = this.connections.remove(brokerInfo.getBrokerId());
        if (remove == null) {
            return null;
        }
        this.leaveEvents.tryEmitNext(remove.getBrokerInfo());
        return remove.value;
    }

    protected abstract Mono<RSocket> getRSocket(T t);

    protected void registerCleanup(BrokerInfo brokerInfo, T t) {
        getRSocket(t).flatMap(rSocket -> {
            return rSocket.onClose().doFinally(signalType -> {
                this.logger.debug("removing connection {}", brokerInfo);
                this.connections.remove(brokerInfo.getBrokerId());
                this.leaveEvents.tryEmitNext(brokerInfo);
            });
        }).subscribe();
    }

    public Flux<BrokerInfoEntry<T>> joinEvents() {
        return Flux.mergeSequential(new Publisher[]{Flux.fromIterable(this.connections.values()), this.joinEvents.asFlux()});
    }

    public Flux<BrokerInfo> leaveEvents() {
        return this.leaveEvents.asFlux();
    }
}
