package io.rsocket.broker.spring.cluster;

import io.netty.buffer.ByteBufAllocator;
import io.netty.buffer.Unpooled;
import io.rsocket.ConnectionSetupPayload;
import io.rsocket.RSocket;
import io.rsocket.broker.common.WellKnownKey;
import io.rsocket.broker.common.spring.ClientTransportFactory;
import io.rsocket.broker.common.spring.MimeTypes;
import io.rsocket.broker.frames.BrokerInfo;
import io.rsocket.broker.frames.BrokerInfoFlyweight;
import io.rsocket.broker.rsocket.RoutingRSocketFactory;
import io.rsocket.broker.rsocket.WeightedStatsAwareRSocket;
import io.rsocket.broker.spring.BrokerProperties;
import io.rsocket.core.DefaultConnectionSetupPayload;
import io.rsocket.frame.SetupFrameCodec;
import io.rsocket.loadbalance.WeightedStatsRequestInterceptor;
import io.rsocket.metadata.WellKnownMimeType;
import io.rsocket.transport.ClientTransport;
import io.rsocket.util.DefaultPayload;
import java.net.URI;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.ObjectProvider;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.messaging.rsocket.RSocketStrategies;
import org.springframework.messaging.rsocket.annotation.support.RSocketMessageHandler;
import reactor.core.Disposable;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;

/* loaded from: input_file:io/rsocket/broker/spring/cluster/ClusterNodeConnectionManager.class */
public class ClusterNodeConnectionManager {
    private final Logger logger = LoggerFactory.getLogger(getClass());
    private final Sinks.Many<BrokerProperties.Broker> connectionEvents = Sinks.many().multicast().directBestEffort();
    private final BrokerProperties properties;
    private final BrokerConnections brokerConnections;
    private final ProxyConnections proxyConnections;
    private final RSocketMessageHandler messageHandler;
    private final RSocketStrategies strategies;
    private final RoutingRSocketFactory routingRSocketFactory;
    private final ObjectProvider<ClientTransportFactory> transportFactories;
    private final BrokerInfo localBrokerInfo;
    private RSocket rSocket;

    public ClusterNodeConnectionManager(BrokerProperties brokerProperties, BrokerConnections brokerConnections, ProxyConnections proxyConnections, RSocketMessageHandler rSocketMessageHandler, RSocketStrategies rSocketStrategies, RoutingRSocketFactory routingRSocketFactory, ObjectProvider<ClientTransportFactory> objectProvider) {
        this.properties = brokerProperties;
        this.brokerConnections = brokerConnections;
        this.proxyConnections = proxyConnections;
        this.messageHandler = rSocketMessageHandler;
        this.strategies = rSocketStrategies;
        this.routingRSocketFactory = routingRSocketFactory;
        this.transportFactories = objectProvider;
        setupRSocket();
        this.localBrokerInfo = getLocalBrokerInfo(brokerProperties);
        this.connectionEvents.asFlux().map(this::connect).subscribe();
    }

    public Sinks.Many<BrokerProperties.Broker> getConnectionEventPublisher() {
        return this.connectionEvents;
    }

    private Disposable connect(BrokerProperties.Broker broker) {
        this.logger.info("connecting to {}", broker);
        RSocketRequester connect = connect(broker.getCluster(), this.localBrokerInfo, null, this.rSocket);
        return connect.route("cluster.broker-info", new Object[0]).data(this.localBrokerInfo).retrieveMono(BrokerInfo.class).map(brokerInfo -> {
            this.brokerConnections.put(brokerInfo, connect);
            return brokerInfo;
        }).flatMap(brokerInfo2 -> {
            return connect(broker.getProxy(), null, this.localBrokerInfo, this.routingRSocketFactory.create()).rsocketClient().source().map(rSocket -> {
                this.proxyConnections.put(brokerInfo2, rSocket);
                return rSocket;
            });
        }).subscribe();
    }

    static BrokerInfo getLocalBrokerInfo(BrokerProperties brokerProperties) {
        return BrokerInfo.from(brokerProperties.getBrokerId()).with(WellKnownKey.BROKER_PROXY_URI, brokerProperties.getUri().toString()).with(WellKnownKey.BROKER_CLUSTER_URI, brokerProperties.getCluster().getUri().toString()).build();
    }

    private RSocketRequester connect(URI uri, Object obj, Object obj2, RSocket rSocket) {
        RSocketRequester.Builder dataMimeType = RSocketRequester.builder().rsocketStrategies(this.strategies).dataMimeType(MimeTypes.BROKER_FRAME_MIME_TYPE);
        if (obj != null) {
            dataMimeType.setupData(obj);
        }
        if (obj2 != null) {
            dataMimeType.setupMetadata(obj2, MimeTypes.BROKER_FRAME_MIME_TYPE);
        }
        dataMimeType.rsocketConnector(rSocketConnector -> {
            rSocketConnector.interceptors(interceptorRegistry -> {
                interceptorRegistry.forRequestsInResponder(rSocket2 -> {
                    WeightedStatsRequestInterceptor weightedStatsRequestInterceptor = new WeightedStatsRequestInterceptor();
                    interceptorRegistry.forRequester(rSocket2 -> {
                        return new WeightedStatsAwareRSocket(rSocket2, weightedStatsRequestInterceptor);
                    });
                    return weightedStatsRequestInterceptor;
                });
            }).acceptor((connectionSetupPayload, rSocket2) -> {
                return Mono.just(rSocket);
            });
        });
        return dataMimeType.transport((ClientTransport) this.transportFactories.orderedStream().filter(clientTransportFactory -> {
            return clientTransportFactory.supports(uri);
        }).findFirst().map(clientTransportFactory2 -> {
            return (ClientTransport) clientTransportFactory2.create(uri);
        }).orElseThrow(() -> {
            return new IllegalArgumentException("Unknown transport " + this.properties);
        }));
    }

    private void setupRSocket() {
        this.messageHandler.responder().accept(getConnectionSetupPayload(), new RSocket() { // from class: io.rsocket.broker.spring.cluster.ClusterNodeConnectionManager.1
        }).subscribe(rSocket -> {
            this.rSocket = rSocket;
        });
    }

    private ConnectionSetupPayload getConnectionSetupPayload() {
        return getConnectionSetupPayload(this.messageHandler.getRSocketStrategies().dataBufferFactory().getByteBufAllocator(), this.properties);
    }

    static DefaultConnectionSetupPayload getConnectionSetupPayload(ByteBufAllocator byteBufAllocator, BrokerProperties brokerProperties) {
        BrokerInfo localBrokerInfo = getLocalBrokerInfo(brokerProperties);
        return new DefaultConnectionSetupPayload(SetupFrameCodec.encode(byteBufAllocator, false, 1, 1, WellKnownMimeType.MESSAGE_RSOCKET_COMPOSITE_METADATA.getString(), MimeTypes.BROKER_FRAME_MIME_TYPE.toString(), DefaultPayload.create(BrokerInfoFlyweight.encode(byteBufAllocator, localBrokerInfo.getBrokerId(), localBrokerInfo.getTimestamp(), localBrokerInfo.getTags(), 0).retain(), Unpooled.EMPTY_BUFFER)));
    }
}
