package io.rsocket.broker.spring.cluster;

import io.rsocket.broker.RoutingTable;
import io.rsocket.broker.common.WellKnownKey;
import io.rsocket.broker.frames.BrokerFrame;
import io.rsocket.broker.frames.BrokerInfo;
import io.rsocket.broker.frames.RouteJoin;
import io.rsocket.broker.spring.BrokerProperties;
import io.rsocket.exceptions.ApplicationErrorException;
import io.rsocket.exceptions.RejectedSetupException;
import java.net.URI;
import java.time.Duration;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.handler.annotation.MessageMapping;
import org.springframework.messaging.rsocket.RSocketRequester;
import org.springframework.messaging.rsocket.annotation.ConnectMapping;
import org.springframework.stereotype.Controller;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;

@Controller
/* loaded from: input_file:io/rsocket/broker/spring/cluster/ClusterController.class */
public class ClusterController {
    private final BrokerProperties properties;
    private final BrokerConnections brokerConnections;
    private final RoutingTable routingTable;
    private final Consumer<BrokerProperties.Broker> connectionEventPublisher;
    private final Logger logger = LoggerFactory.getLogger(getClass());
    private final Sinks.Many<BrokerInfo> connectEvents = Sinks.many().multicast().directBestEffort();

    public ClusterController(BrokerProperties brokerProperties, BrokerConnections brokerConnections, RoutingTable routingTable, Consumer<BrokerProperties.Broker> consumer) {
        this.properties = brokerProperties;
        this.brokerConnections = brokerConnections;
        this.routingTable = routingTable;
        this.connectionEventPublisher = consumer;
        this.connectEvents.asFlux().delayElements(Duration.ofSeconds(1L)).flatMap(brokerInfo -> {
            return sendBrokerInfo(brokerConnections.get(brokerInfo), brokerInfo);
        }).subscribe();
    }

    @ConnectMapping
    public Mono<Void> onConnect(BrokerFrame brokerFrame, RSocketRequester rSocketRequester) {
        if (!(brokerFrame instanceof BrokerInfo)) {
            return Mono.empty();
        }
        BrokerInfo brokerInfo = (BrokerInfo) brokerFrame;
        if (brokerInfo.getBrokerId().equals(this.properties.getBrokerId())) {
            return Mono.empty();
        }
        this.logger.info("received connection from {}", brokerInfo);
        if (this.brokerConnections.contains(brokerInfo)) {
            return Mono.error(new RejectedSetupException("Duplicate connection from " + brokerInfo));
        }
        this.brokerConnections.put(brokerInfo, rSocketRequester);
        this.connectEvents.tryEmitNext(brokerInfo);
        return Mono.empty();
    }

    @MessageMapping({"cluster.remote-broker-info"})
    public void brokerInfoUpdate(BrokerInfo brokerInfo) {
        this.logger.info("received remote BrokerInfo {}", brokerInfo);
        BrokerProperties.Broker broker = new BrokerProperties.Broker();
        broker.setCluster(URI.create(brokerInfo.getTags().get(WellKnownKey.BROKER_CLUSTER_URI)));
        broker.setProxy(URI.create(brokerInfo.getTags().get(WellKnownKey.BROKER_PROXY_URI)));
        this.connectionEventPublisher.accept(broker);
    }

    @MessageMapping({"cluster.broker-info"})
    public Mono<BrokerInfo> brokerInfo(BrokerInfo brokerInfo, RSocketRequester rSocketRequester) {
        this.logger.info("received brokerInfo from {}", brokerInfo);
        if (this.brokerConnections.contains(brokerInfo)) {
            this.logger.debug("connection for broker already exists {}", brokerInfo);
            return Mono.just(getLocalBrokerInfo());
        }
        this.brokerConnections.put(brokerInfo, rSocketRequester);
        return sendBrokerInfo(rSocketRequester, brokerInfo);
    }

    private Mono<BrokerInfo> sendBrokerInfo(RSocketRequester rSocketRequester, BrokerInfo brokerInfo) {
        BrokerInfo localBrokerInfo = getLocalBrokerInfo();
        return rSocketRequester.route("cluster.broker-info", new Object[0]).data(localBrokerInfo).retrieveMono(BrokerInfo.class).map(brokerInfo2 -> {
            return localBrokerInfo;
        });
    }

    private BrokerInfo getLocalBrokerInfo() {
        return BrokerInfo.from(this.properties.getBrokerId()).with(WellKnownKey.BROKER_PROXY_URI, this.properties.getUri().toString()).with(WellKnownKey.BROKER_CLUSTER_URI, this.properties.getCluster().getUri().toString()).build();
    }

    @MessageMapping({"cluster.route-join"})
    private Mono<RouteJoin> routeJoin(RouteJoin routeJoin) {
        this.logger.info("received RouteJoin {}", routeJoin);
        BrokerInfo build = BrokerInfo.from(routeJoin.getBrokerId()).build();
        if (!this.brokerConnections.contains(build)) {
            return Mono.error(new ApplicationErrorException("No connection for broker " + build));
        }
        this.routingTable.add(routeJoin);
        return Mono.just(routeJoin);
    }

    @MessageMapping({"hello"})
    public Mono<String> hello(String str) {
        return Mono.just("Hello " + str);
    }
}
