package io.rsocket.broker.rsocket;

import io.rsocket.Payload;
import io.rsocket.RSocket;
import io.rsocket.broker.frames.Address;
import java.util.function.Function;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

/* loaded from: input_file:io/rsocket/broker/rsocket/RoutingRSocket.class */
public class RoutingRSocket implements RSocket {
    private final RSocketLocator rSocketLocator;
    private final Function<Payload, Address> addressExtractor;

    public RoutingRSocket(RSocketLocator rSocketLocator, Function<Payload, Address> function) {
        this.rSocketLocator = rSocketLocator;
        this.addressExtractor = function;
    }

    public Mono<Void> fireAndForget(Payload payload) {
        try {
            return locate(payload).fireAndForget(payload);
        } catch (Throwable th) {
            payload.release();
            return Mono.error(handleError(th));
        }
    }

    public Mono<Payload> requestResponse(Payload payload) {
        try {
            return locate(payload).requestResponse(payload);
        } catch (Throwable th) {
            payload.release();
            return Mono.error(handleError(th));
        }
    }

    public Mono<Void> metadataPush(Payload payload) {
        try {
            return locate(payload).metadataPush(payload);
        } catch (Throwable th) {
            payload.release();
            return Mono.error(handleError(th));
        }
    }

    public Flux<Payload> requestStream(Payload payload) {
        try {
            return locate(payload).requestStream(payload);
        } catch (Throwable th) {
            payload.release();
            return Flux.error(handleError(th));
        }
    }

    public Flux<Payload> requestChannel(Publisher<Payload> publisher) {
        return Flux.from(publisher).switchOnFirst((signal, flux) -> {
            if (!signal.hasValue()) {
                return flux;
            }
            Payload payload = (Payload) signal.get();
            try {
                return locate(payload).requestChannel(flux);
            } catch (Throwable th) {
                payload.release();
                return Flux.error(handleError(th));
            }
        });
    }

    private RSocket locate(Payload payload) {
        Address apply = this.addressExtractor.apply(payload);
        if (this.rSocketLocator.supports(apply.getRoutingType())) {
            return this.rSocketLocator.locate(apply);
        }
        throw new IllegalStateException("No RSocketLocator for RoutingType " + apply.getRoutingType());
    }

    private Throwable handleError(Throwable th) {
        return th;
    }
}
