/*
 * Decompiled with CFR 0.152.
 */
package de.themoep.connectorplugin.lib.lettuce.core;

import de.themoep.connectorplugin.lib.lettuce.core.AbstractRedisClient;
import de.themoep.connectorplugin.lib.lettuce.core.ClientOptions;
import de.themoep.connectorplugin.lib.lettuce.core.CommandListenerWriter;
import de.themoep.connectorplugin.lib.lettuce.core.ConnectionBuilder;
import de.themoep.connectorplugin.lib.lettuce.core.ConnectionFuture;
import de.themoep.connectorplugin.lib.lettuce.core.ConnectionState;
import de.themoep.connectorplugin.lib.lettuce.core.RedisChannelWriter;
import de.themoep.connectorplugin.lib.lettuce.core.RedisCommandTimeoutException;
import de.themoep.connectorplugin.lib.lettuce.core.RedisConnectionException;
import de.themoep.connectorplugin.lib.lettuce.core.RedisURI;
import de.themoep.connectorplugin.lib.lettuce.core.SslConnectionBuilder;
import de.themoep.connectorplugin.lib.lettuce.core.StatefulRedisConnectionImpl;
import de.themoep.connectorplugin.lib.lettuce.core.api.StatefulRedisConnection;
import de.themoep.connectorplugin.lib.lettuce.core.codec.RedisCodec;
import de.themoep.connectorplugin.lib.lettuce.core.codec.StringCodec;
import de.themoep.connectorplugin.lib.lettuce.core.internal.ExceptionFactory;
import de.themoep.connectorplugin.lib.lettuce.core.internal.Futures;
import de.themoep.connectorplugin.lib.lettuce.core.internal.LettuceAssert;
import de.themoep.connectorplugin.lib.lettuce.core.internal.LettuceStrings;
import de.themoep.connectorplugin.lib.lettuce.core.protocol.CommandExpiryWriter;
import de.themoep.connectorplugin.lib.lettuce.core.protocol.CommandHandler;
import de.themoep.connectorplugin.lib.lettuce.core.protocol.DefaultEndpoint;
import de.themoep.connectorplugin.lib.lettuce.core.protocol.Endpoint;
import de.themoep.connectorplugin.lib.lettuce.core.protocol.PushHandler;
import de.themoep.connectorplugin.lib.lettuce.core.pubsub.PubSubCommandHandler;
import de.themoep.connectorplugin.lib.lettuce.core.pubsub.PubSubEndpoint;
import de.themoep.connectorplugin.lib.lettuce.core.pubsub.StatefulRedisPubSubConnection;
import de.themoep.connectorplugin.lib.lettuce.core.pubsub.StatefulRedisPubSubConnectionImpl;
import de.themoep.connectorplugin.lib.lettuce.core.resource.ClientResources;
import de.themoep.connectorplugin.lib.lettuce.core.sentinel.StatefulRedisSentinelConnectionImpl;
import de.themoep.connectorplugin.lib.lettuce.core.sentinel.api.StatefulRedisSentinelConnection;
import de.themoep.connectorplugin.lib.netty.util.internal.logging.InternalLogger;
import de.themoep.connectorplugin.lib.netty.util.internal.logging.InternalLoggerFactory;
import de.themoep.connectorplugin.lib.reactor.core.publisher.Mono;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.function.Supplier;

public class RedisClient
extends AbstractRedisClient {
    private static final InternalLogger logger = InternalLoggerFactory.getInstance(RedisClient.class);
    private static final RedisURI EMPTY_URI = new RedisURI();
    private final RedisURI redisURI;

    protected RedisClient(ClientResources clientResources, RedisURI redisURI) {
        super(clientResources);
        RedisClient.assertNotNull(redisURI);
        this.redisURI = redisURI;
        this.setDefaultTimeout(redisURI.getTimeout());
    }

    protected RedisClient() {
        this(null, EMPTY_URI);
    }

    public static RedisClient create() {
        return new RedisClient(null, EMPTY_URI);
    }

    public static RedisClient create(RedisURI redisURI) {
        RedisClient.assertNotNull(redisURI);
        return new RedisClient(null, redisURI);
    }

    public static RedisClient create(String uri) {
        LettuceAssert.notEmpty((CharSequence)uri, "URI must not be empty");
        return new RedisClient(null, RedisURI.create(uri));
    }

    public static RedisClient create(ClientResources clientResources) {
        RedisClient.assertNotNull(clientResources);
        return new RedisClient(clientResources, EMPTY_URI);
    }

    public static RedisClient create(ClientResources clientResources, String uri) {
        RedisClient.assertNotNull(clientResources);
        LettuceAssert.notEmpty((CharSequence)uri, "URI must not be empty");
        return RedisClient.create(clientResources, RedisURI.create(uri));
    }

    public static RedisClient create(ClientResources clientResources, RedisURI redisURI) {
        RedisClient.assertNotNull(clientResources);
        RedisClient.assertNotNull(redisURI);
        return new RedisClient(clientResources, redisURI);
    }

    public StatefulRedisConnection<String, String> connect() {
        return this.connect(this.newStringStringCodec());
    }

    public <K, V> StatefulRedisConnection<K, V> connect(RedisCodec<K, V> codec) {
        this.checkForRedisURI();
        return this.getConnection(this.connectStandaloneAsync(codec, this.redisURI, this.getDefaultTimeout()));
    }

    public StatefulRedisConnection<String, String> connect(RedisURI redisURI) {
        RedisClient.assertNotNull(redisURI);
        return this.getConnection(this.connectStandaloneAsync(this.newStringStringCodec(), redisURI, redisURI.getTimeout()));
    }

    public <K, V> StatefulRedisConnection<K, V> connect(RedisCodec<K, V> codec, RedisURI redisURI) {
        RedisClient.assertNotNull(redisURI);
        return this.getConnection(this.connectStandaloneAsync(codec, redisURI, redisURI.getTimeout()));
    }

    public <K, V> ConnectionFuture<StatefulRedisConnection<K, V>> connectAsync(RedisCodec<K, V> codec, RedisURI redisURI) {
        RedisClient.assertNotNull(redisURI);
        return RedisClient.transformAsyncConnectionException(this.connectStandaloneAsync(codec, redisURI, redisURI.getTimeout()));
    }

    private <K, V> ConnectionFuture<StatefulRedisConnection<K, V>> connectStandaloneAsync(RedisCodec<K, V> codec, RedisURI redisURI, Duration timeout) {
        DefaultEndpoint endpoint;
        RedisClient.assertNotNull(codec);
        RedisClient.checkValidRedisURI(redisURI);
        logger.debug("Trying to get a Redis connection for: " + redisURI);
        RedisChannelWriter writer = endpoint = new DefaultEndpoint(this.getOptions(), this.getResources());
        if (CommandExpiryWriter.isSupported(this.getOptions())) {
            writer = new CommandExpiryWriter(writer, this.getOptions(), this.getResources());
        }
        if (CommandListenerWriter.isSupported(this.getCommandListeners())) {
            writer = new CommandListenerWriter(writer, this.getCommandListeners());
        }
        StatefulRedisConnectionImpl connection = this.newStatefulRedisConnection(writer, endpoint, codec, timeout);
        ConnectionFuture<StatefulRedisConnection<K, V>> future = this.connectStatefulAsync(connection, endpoint, redisURI, () -> new CommandHandler(this.getOptions(), this.getResources(), endpoint));
        future.whenComplete((channelHandler, throwable) -> {
            if (throwable != null) {
                connection.close();
            }
        });
        return future;
    }

    private <K, V, S> ConnectionFuture<S> connectStatefulAsync(StatefulRedisConnectionImpl<K, V> connection, Endpoint endpoint, RedisURI redisURI, Supplier<CommandHandler> commandHandlerSupplier) {
        ConnectionBuilder connectionBuilder;
        if (redisURI.isSsl()) {
            SslConnectionBuilder sslConnectionBuilder = SslConnectionBuilder.sslConnectionBuilder();
            sslConnectionBuilder.ssl(redisURI);
            connectionBuilder = sslConnectionBuilder;
        } else {
            connectionBuilder = ConnectionBuilder.connectionBuilder();
        }
        ConnectionState state = connection.getConnectionState();
        state.apply(redisURI);
        state.setDb(redisURI.getDatabase());
        connectionBuilder.connection(connection);
        connectionBuilder.clientOptions(this.getOptions());
        connectionBuilder.clientResources(this.getResources());
        connectionBuilder.commandHandler(commandHandlerSupplier).endpoint(endpoint);
        this.connectionBuilder(this.getSocketAddressSupplier(redisURI), connectionBuilder, redisURI);
        connectionBuilder.connectionInitializer(this.createHandshake(state));
        ConnectionFuture future = this.initializeChannelAsync(connectionBuilder);
        return future.thenApply(channelHandler -> connection);
    }

    public StatefulRedisPubSubConnection<String, String> connectPubSub() {
        return this.getConnection(this.connectPubSubAsync(this.newStringStringCodec(), this.redisURI, this.getDefaultTimeout()));
    }

    public StatefulRedisPubSubConnection<String, String> connectPubSub(RedisURI redisURI) {
        RedisClient.assertNotNull(redisURI);
        return this.getConnection(this.connectPubSubAsync(this.newStringStringCodec(), redisURI, redisURI.getTimeout()));
    }

    public <K, V> StatefulRedisPubSubConnection<K, V> connectPubSub(RedisCodec<K, V> codec) {
        this.checkForRedisURI();
        return this.getConnection(this.connectPubSubAsync(codec, this.redisURI, this.getDefaultTimeout()));
    }

    public <K, V> StatefulRedisPubSubConnection<K, V> connectPubSub(RedisCodec<K, V> codec, RedisURI redisURI) {
        RedisClient.assertNotNull(redisURI);
        return this.getConnection(this.connectPubSubAsync(codec, redisURI, redisURI.getTimeout()));
    }

    public <K, V> ConnectionFuture<StatefulRedisPubSubConnection<K, V>> connectPubSubAsync(RedisCodec<K, V> codec, RedisURI redisURI) {
        RedisClient.assertNotNull(redisURI);
        return RedisClient.transformAsyncConnectionException(this.connectPubSubAsync(codec, redisURI, redisURI.getTimeout()));
    }

    private <K, V> ConnectionFuture<StatefulRedisPubSubConnection<K, V>> connectPubSubAsync(RedisCodec<K, V> codec, RedisURI redisURI, Duration timeout) {
        PubSubEndpoint endpoint;
        RedisClient.assertNotNull(codec);
        RedisClient.checkValidRedisURI(redisURI);
        RedisChannelWriter writer = endpoint = new PubSubEndpoint(this.getOptions(), this.getResources());
        if (CommandExpiryWriter.isSupported(this.getOptions())) {
            writer = new CommandExpiryWriter(writer, this.getOptions(), this.getResources());
        }
        if (CommandListenerWriter.isSupported(this.getCommandListeners())) {
            writer = new CommandListenerWriter(writer, this.getCommandListeners());
        }
        StatefulRedisPubSubConnectionImpl connection = this.newStatefulRedisPubSubConnection(endpoint, writer, codec, timeout);
        ConnectionFuture future = this.connectStatefulAsync(connection, endpoint, redisURI, () -> new PubSubCommandHandler(this.getOptions(), this.getResources(), codec, endpoint));
        return future.whenComplete((conn, throwable) -> {
            if (throwable != null) {
                conn.close();
            }
        });
    }

    public StatefulRedisSentinelConnection<String, String> connectSentinel() {
        return this.connectSentinel(this.newStringStringCodec());
    }

    public <K, V> StatefulRedisSentinelConnection<K, V> connectSentinel(RedisCodec<K, V> codec) {
        this.checkForRedisURI();
        return this.getConnection(this.connectSentinelAsync(codec, this.redisURI, this.getDefaultTimeout()));
    }

    public StatefulRedisSentinelConnection<String, String> connectSentinel(RedisURI redisURI) {
        RedisClient.assertNotNull(redisURI);
        return this.getConnection(this.connectSentinelAsync(this.newStringStringCodec(), redisURI, redisURI.getTimeout()));
    }

    public <K, V> StatefulRedisSentinelConnection<K, V> connectSentinel(RedisCodec<K, V> codec, RedisURI redisURI) {
        RedisClient.assertNotNull(redisURI);
        return this.getConnection(this.connectSentinelAsync(codec, redisURI, redisURI.getTimeout()));
    }

    public <K, V> CompletableFuture<StatefulRedisSentinelConnection<K, V>> connectSentinelAsync(RedisCodec<K, V> codec, RedisURI redisURI) {
        RedisClient.assertNotNull(redisURI);
        return RedisClient.transformAsyncConnectionException(this.connectSentinelAsync(codec, redisURI, redisURI.getTimeout()), redisURI);
    }

    private <K, V> CompletableFuture<StatefulRedisSentinelConnection<K, V>> connectSentinelAsync(RedisCodec<K, V> codec, RedisURI redisURI, Duration timeout) {
        RedisClient.assertNotNull(codec);
        RedisClient.checkValidRedisURI(redisURI);
        logger.debug("Trying to get a Redis Sentinel connection for one of: " + redisURI.getSentinels());
        if (redisURI.getSentinels().isEmpty() && (LettuceStrings.isNotEmpty(redisURI.getHost()) || !LettuceStrings.isEmpty(redisURI.getSocket()))) {
            return this.doConnectSentinelAsync(codec, redisURI, timeout, redisURI.getClientName()).toCompletableFuture();
        }
        List<RedisURI> sentinels = redisURI.getSentinels();
        LinkedBlockingQueue exceptionCollector = new LinkedBlockingQueue();
        RedisClient.validateUrisAreOfSameConnectionType(sentinels);
        Mono connectionLoop = null;
        for (RedisURI uri : sentinels) {
            Mono connectionMono = Mono.fromCompletionStage(() -> this.doConnectSentinelAsync(codec, uri, timeout, redisURI.getClientName())).onErrorMap(CompletionException.class, Throwable::getCause).onErrorMap(e -> new RedisConnectionException("Cannot connect Redis Sentinel at " + uri, (Throwable)e)).doOnError(exceptionCollector::add);
            if (connectionLoop == null) {
                connectionLoop = connectionMono;
                continue;
            }
            connectionLoop = connectionLoop.onErrorResume(t -> connectionMono);
        }
        if (connectionLoop == null) {
            return Mono.error(new RedisConnectionException("Cannot connect to a Redis Sentinel: " + redisURI.getSentinels())).toFuture();
        }
        return connectionLoop.onErrorMap(e -> {
            RedisConnectionException ex = new RedisConnectionException("Cannot connect to a Redis Sentinel: " + redisURI.getSentinels(), (Throwable)e);
            for (Throwable throwable : exceptionCollector) {
                if (e == throwable) continue;
                ex.addSuppressed(throwable);
            }
            return ex;
        }).toFuture();
    }

    private <K, V> ConnectionFuture<StatefulRedisSentinelConnection<K, V>> doConnectSentinelAsync(RedisCodec<K, V> codec, RedisURI redisURI, Duration timeout, String clientName) {
        DefaultEndpoint endpoint;
        ConnectionBuilder connectionBuilder;
        if (redisURI.isSsl()) {
            SslConnectionBuilder sslConnectionBuilder = SslConnectionBuilder.sslConnectionBuilder();
            sslConnectionBuilder.ssl(redisURI);
            connectionBuilder = sslConnectionBuilder;
        } else {
            connectionBuilder = ConnectionBuilder.connectionBuilder();
        }
        connectionBuilder.clientOptions(ClientOptions.copyOf(this.getOptions()));
        connectionBuilder.clientResources(this.getResources());
        RedisChannelWriter writer = endpoint = new DefaultEndpoint(this.getOptions(), this.getResources());
        if (CommandExpiryWriter.isSupported(this.getOptions())) {
            writer = new CommandExpiryWriter(writer, this.getOptions(), this.getResources());
        }
        if (CommandListenerWriter.isSupported(this.getCommandListeners())) {
            writer = new CommandListenerWriter(writer, this.getCommandListeners());
        }
        StatefulRedisSentinelConnectionImpl connection = this.newStatefulRedisSentinelConnection(writer, codec, timeout);
        ConnectionState state = connection.getConnectionState();
        state.apply(redisURI);
        if (LettuceStrings.isEmpty(state.getClientName())) {
            state.setClientName(clientName);
        }
        connectionBuilder.connectionInitializer(this.createHandshake(state));
        logger.debug("Connecting to Redis Sentinel, address: " + redisURI);
        connectionBuilder.endpoint(endpoint).commandHandler(() -> new CommandHandler(this.getOptions(), this.getResources(), endpoint)).connection(connection);
        this.connectionBuilder(this.getSocketAddressSupplier(redisURI), connectionBuilder, redisURI);
        ConnectionFuture sync = this.initializeChannelAsync(connectionBuilder);
        return sync.thenApply(ignore -> connection).whenComplete((ignore, e) -> {
            if (e != null) {
                logger.warn("Cannot connect Redis Sentinel at " + redisURI + ": " + e.toString());
                connection.close();
            }
        });
    }

    @Override
    public void setOptions(ClientOptions clientOptions) {
        super.setOptions(clientOptions);
    }

    protected <K, V> StatefulRedisPubSubConnectionImpl<K, V> newStatefulRedisPubSubConnection(PubSubEndpoint<K, V> endpoint, RedisChannelWriter channelWriter, RedisCodec<K, V> codec, Duration timeout) {
        return new StatefulRedisPubSubConnectionImpl<K, V>(endpoint, channelWriter, codec, timeout);
    }

    protected <K, V> StatefulRedisSentinelConnectionImpl<K, V> newStatefulRedisSentinelConnection(RedisChannelWriter channelWriter, RedisCodec<K, V> codec, Duration timeout) {
        return new StatefulRedisSentinelConnectionImpl<K, V>(channelWriter, codec, timeout);
    }

    protected <K, V> StatefulRedisConnectionImpl<K, V> newStatefulRedisConnection(RedisChannelWriter channelWriter, PushHandler pushHandler, RedisCodec<K, V> codec, Duration timeout) {
        return new StatefulRedisConnectionImpl<K, V>(channelWriter, pushHandler, codec, timeout);
    }

    protected Mono<SocketAddress> getSocketAddress(RedisURI redisURI) {
        return Mono.defer(() -> {
            if (redisURI.getSentinelMasterId() != null && !redisURI.getSentinels().isEmpty()) {
                logger.debug("Connecting to Redis using Sentinels {}, MasterId {}", (Object)redisURI.getSentinels(), (Object)redisURI.getSentinelMasterId());
                return this.lookupRedis(redisURI).switchIfEmpty(Mono.error(new RedisConnectionException("Cannot provide redisAddress using sentinel for masterId " + redisURI.getSentinelMasterId())));
            }
            return Mono.fromCallable(() -> this.getResources().socketAddressResolver().resolve(redisURI));
        });
    }

    protected RedisCodec<String, String> newStringStringCodec() {
        return StringCodec.UTF8;
    }

    private static void validateUrisAreOfSameConnectionType(List<RedisURI> redisUris) {
        boolean unixDomainSocket = false;
        boolean inetSocket = false;
        for (RedisURI sentinel : redisUris) {
            if (sentinel.getSocket() != null) {
                unixDomainSocket = true;
            }
            if (sentinel.getHost() == null) continue;
            inetSocket = true;
        }
        if (unixDomainSocket && inetSocket) {
            throw new RedisConnectionException("You cannot mix unix domain socket and IP socket URI's");
        }
    }

    private Mono<SocketAddress> getSocketAddressSupplier(RedisURI redisURI) {
        return this.getSocketAddress(redisURI).doOnNext(addr -> logger.debug("Resolved SocketAddress {} using {}", addr, (Object)redisURI));
    }

    private Mono<SocketAddress> lookupRedis(RedisURI sentinelUri) {
        Duration timeout = this.getDefaultTimeout();
        return Mono.usingWhen(Mono.fromCompletionStage(() -> this.connectSentinelAsync(this.newStringStringCodec(), sentinelUri, timeout)), c -> {
            String sentinelMasterId = sentinelUri.getSentinelMasterId();
            return c.reactive().getMasterAddrByName(sentinelMasterId).map(it -> {
                if (it instanceof InetSocketAddress) {
                    InetSocketAddress isa = (InetSocketAddress)it;
                    SocketAddress resolved = this.getResources().socketAddressResolver().resolve(RedisURI.create(isa.getHostString(), isa.getPort()));
                    logger.debug("Resolved Master {} SocketAddress {}:{} to {}", sentinelMasterId, isa.getHostString(), isa.getPort(), resolved);
                    return resolved;
                }
                return it;
            }).timeout(timeout).onErrorMap(e -> {
                RedisCommandTimeoutException ex = ExceptionFactory.createTimeoutException("Cannot obtain master using SENTINEL MASTER", timeout);
                ex.addSuppressed((Throwable)e);
                return ex;
            });
        }, c -> Mono.fromCompletionStage(c::closeAsync), (c, ex) -> Mono.fromCompletionStage(c::closeAsync), c -> Mono.fromCompletionStage(c::closeAsync));
    }

    private static <T> ConnectionFuture<T> transformAsyncConnectionException(ConnectionFuture<T> future) {
        return future.thenCompose((v, e) -> {
            if (e != null) {
                return Futures.failed(RedisConnectionException.create(future.getRemoteAddress(), e));
            }
            return CompletableFuture.completedFuture(v);
        });
    }

    private static <T> CompletableFuture<T> transformAsyncConnectionException(CompletionStage<T> future, RedisURI target) {
        return ConnectionFuture.from(null, future.toCompletableFuture()).thenCompose((v, e) -> {
            if (e != null) {
                return Futures.failed(RedisConnectionException.create(target.toString(), e));
            }
            return CompletableFuture.completedFuture(v);
        }).toCompletableFuture();
    }

    private static void checkValidRedisURI(RedisURI redisURI) {
        LettuceAssert.notNull((Object)redisURI, "A valid RedisURI is required");
        if (redisURI.getSentinels().isEmpty()) {
            if (LettuceStrings.isEmpty(redisURI.getHost()) && LettuceStrings.isEmpty(redisURI.getSocket())) {
                throw new IllegalArgumentException("RedisURI for Redis Standalone does not contain a host or a socket");
            }
        } else {
            if (LettuceStrings.isEmpty(redisURI.getSentinelMasterId())) {
                throw new IllegalArgumentException("RedisURI for Redis Sentinel requires a masterId");
            }
            for (RedisURI sentinel : redisURI.getSentinels()) {
                if (!LettuceStrings.isEmpty(sentinel.getHost()) || !LettuceStrings.isEmpty(sentinel.getSocket())) continue;
                throw new IllegalArgumentException("RedisURI for Redis Sentinel does not contain a host or a socket");
            }
        }
    }

    private static <K, V> void assertNotNull(RedisCodec<K, V> codec) {
        LettuceAssert.notNull(codec, "RedisCodec must not be null");
    }

    private static void assertNotNull(RedisURI redisURI) {
        LettuceAssert.notNull((Object)redisURI, "RedisURI must not be null");
    }

    private static void assertNotNull(ClientResources clientResources) {
        LettuceAssert.notNull((Object)clientResources, "ClientResources must not be null");
    }

    private void checkForRedisURI() {
        LettuceAssert.assertState(this.redisURI != EMPTY_URI, "RedisURI is not available. Use RedisClient(Host), RedisClient(Host, Port) or RedisClient(RedisURI) to construct your client.");
        RedisClient.checkValidRedisURI(this.redisURI);
    }
}

