package io.rsocket.broker.spring.cluster;

import io.rsocket.broker.RoutingTable;
import io.rsocket.broker.frames.RouteJoin;
import io.rsocket.broker.spring.BrokerProperties;
import io.rsocket.broker.spring.cluster.AbstractConnections;
import java.io.Closeable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.messaging.rsocket.RSocketRequester;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:io/rsocket/broker/spring/cluster/RouteJoinListener.class */
public class RouteJoinListener implements Closeable {
    private final Logger logger = LoggerFactory.getLogger(getClass());
    private final Disposable disposable;

    public RouteJoinListener(BrokerProperties brokerProperties, RoutingTable routingTable, BrokerConnections brokerConnections) {
        this.disposable = routingTable.joinEvents(routeJoin -> {
            return brokerProperties.getBrokerId().equals(routeJoin.getBrokerId());
        }).flatMap(routeJoin2 -> {
            return Flux.fromIterable(brokerConnections.entries()).flatMap(brokerInfoEntry -> {
                return sendRouteJoin(brokerInfoEntry, routeJoin2);
            });
        }).subscribe();
    }

    private Mono<RouteJoin> sendRouteJoin(AbstractConnections.BrokerInfoEntry<RSocketRequester> brokerInfoEntry, RouteJoin routeJoin) {
        this.logger.info("sending RouteJoin {} to {}", routeJoin, brokerInfoEntry.getBrokerInfo());
        return brokerInfoEntry.getValue().route("cluster.route-join", new Object[0]).data(routeJoin).retrieveMono(RouteJoin.class);
    }

    @Override // java.io.Closeable, java.lang.AutoCloseable
    public void close() {
        if (this.disposable.isDisposed()) {
            return;
        }
        this.disposable.dispose();
    }
}
