package io.smallrye.reactive.messaging.amqp;

import io.smallrye.common.annotation.CheckReturnValue;
import io.smallrye.mutiny.Uni;
import io.smallrye.mutiny.groups.UniOnFailure;
import io.smallrye.reactive.messaging.amqp.i18n.AMQPExceptions;
import io.smallrye.reactive.messaging.amqp.i18n.AMQPLogging;
import io.vertx.mutiny.amqp.AmqpClient;
import io.vertx.mutiny.amqp.AmqpConnection;
import io.vertx.mutiny.core.Context;
import io.vertx.mutiny.core.Vertx;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.stream.Collectors;

/* loaded from: input_file:io/smallrye/reactive/messaging/amqp/ConnectionHolder.class */
public class ConnectionHolder {
    private final AmqpClient client;
    private final AmqpConnectorCommonConfiguration configuration;
    private final AtomicReference<CurrentConnection> holder = new AtomicReference<>();
    private final Vertx vertx;
    private Consumer<Throwable> callback;

    /* loaded from: input_file:io/smallrye/reactive/messaging/amqp/ConnectionHolder$CurrentConnection.class */
    private static class CurrentConnection {
        final AmqpConnection connection;
        final Context context;

        private CurrentConnection(AmqpConnection amqpConnection, Context context) {
            this.connection = amqpConnection;
            this.context = context;
        }
    }

    public ConnectionHolder(AmqpClient amqpClient, AmqpConnectorCommonConfiguration amqpConnectorCommonConfiguration, Vertx vertx) {
        this.client = amqpClient;
        this.configuration = amqpConnectorCommonConfiguration;
        this.vertx = vertx;
    }

    public Context getContext() {
        CurrentConnection currentConnection = this.holder.get();
        if (currentConnection != null) {
            return currentConnection.context;
        }
        return null;
    }

    @CheckReturnValue
    public Uni<Boolean> isConnected() {
        AmqpConnection amqpConnection;
        CurrentConnection currentConnection = this.holder.get();
        if (currentConnection != null && (amqpConnection = currentConnection.connection) != null) {
            Uni item = Uni.createFrom().item(() -> {
                return Boolean.valueOf(!amqpConnection.isDisconnected());
            });
            Context context = currentConnection.context;
            Objects.requireNonNull(context);
            return item.runSubscriptionOn(context::runOnContext);
        }
        return Uni.createFrom().item(false);
    }

    public static List<String> capabilities(AmqpConnection amqpConnection) {
        return (List) Arrays.stream(amqpConnection.getDelegate().unwrap().getRemoteOfferedCapabilities()).map((v0) -> {
            return v0.toString();
        }).collect(Collectors.toList());
    }

    public static boolean supportAnonymousRelay(AmqpConnection amqpConnection) {
        return capabilities(amqpConnection).contains("ANONYMOUS-RELAY");
    }

    public Vertx getVertx() {
        return this.vertx;
    }

    public int getHealthTimeout() {
        return this.configuration.getHealthTimeout().intValue();
    }

    public synchronized void onFailure(Consumer<Throwable> consumer) {
        this.callback = consumer;
    }

    @CheckReturnValue
    public Uni<AmqpConnection> getOrEstablishConnection() {
        return Uni.createFrom().item(() -> {
            CurrentConnection currentConnection = this.holder.get();
            if (currentConnection == null || currentConnection.connection == null || currentConnection.connection.isDisconnected()) {
                return null;
            }
            return currentConnection.connection;
        }).onItem().ifNull().switchTo(() -> {
            Integer reconnectInterval = this.configuration.getReconnectInterval();
            Integer reconnectAttempts = this.configuration.getReconnectAttempts();
            CurrentConnection currentConnection = this.holder.get();
            if (currentConnection != null && currentConnection.connection != null && !currentConnection.connection.isDisconnected()) {
                return Uni.createFrom().item(currentConnection.connection);
            }
            UniOnFailure onFailure = this.client.connect().onSubscription().invoke(uniSubscription -> {
                AMQPLogging.log.establishingConnection();
            }).onItem().transform(amqpConnection -> {
                AMQPLogging.log.connectionEstablished();
                this.holder.set(new CurrentConnection(amqpConnection, Vertx.currentContext()));
                amqpConnection.exceptionHandler(th -> {
                    Consumer<Throwable> consumer;
                    this.holder.set(null);
                    AMQPLogging.log.connectionFailure(th);
                    synchronized (this) {
                        consumer = this.callback;
                    }
                    if (consumer != null) {
                        consumer.accept(th);
                    }
                });
                if (!amqpConnection.isDisconnected() && this.holder.get() != null) {
                    return amqpConnection;
                }
                this.holder.set(null);
                throw AMQPExceptions.ex.illegalStateConnectionDisconnected();
            }).onFailure();
            AMQPLogging aMQPLogging = AMQPLogging.log;
            Objects.requireNonNull(aMQPLogging);
            return onFailure.invoke(aMQPLogging::unableToConnectToBroker).onFailure().retry().withBackOff(Duration.ofSeconds(1L), Duration.ofSeconds(reconnectInterval.intValue())).atMost(reconnectAttempts.intValue()).onFailure().invoke(th -> {
                this.holder.set(null);
                AMQPLogging.log.unableToRecoverFromConnectionDisruption(th);
            });
        });
    }

    public static CompletionStage<Void> runOnContext(Context context, Runnable runnable) {
        CompletableFuture completableFuture = new CompletableFuture();
        if (Vertx.currentContext() == context) {
            runnable.run();
            completableFuture.complete(null);
        } else {
            context.runOnContext(() -> {
                runnable.run();
                completableFuture.complete(null);
            });
        }
        return completableFuture;
    }

    public static CompletionStage<Void> runOnContextAndReportFailure(Context context, Throwable th, Runnable runnable) {
        CompletableFuture completableFuture = new CompletableFuture();
        if (Vertx.currentContext() == context) {
            runnable.run();
            completableFuture.completeExceptionally(th);
        } else {
            context.runOnContext(() -> {
                runnable.run();
                completableFuture.completeExceptionally(th);
            });
        }
        return completableFuture;
    }
}
