/*
 * Decompiled with CFR 0.152.
 */
package de.themoep.connectorplugin.lib.lettuce.core.masterreplica;

import de.themoep.connectorplugin.lib.lettuce.core.ConnectionFuture;
import de.themoep.connectorplugin.lib.lettuce.core.RedisClient;
import de.themoep.connectorplugin.lib.lettuce.core.RedisURI;
import de.themoep.connectorplugin.lib.lettuce.core.api.StatefulRedisConnection;
import de.themoep.connectorplugin.lib.lettuce.core.codec.RedisCodec;
import de.themoep.connectorplugin.lib.lettuce.core.event.jfr.EventRecorder;
import de.themoep.connectorplugin.lib.lettuce.core.masterreplica.MasterReplicaChannelWriter;
import de.themoep.connectorplugin.lib.lettuce.core.masterreplica.MasterReplicaConnectionProvider;
import de.themoep.connectorplugin.lib.lettuce.core.masterreplica.MasterReplicaConnector;
import de.themoep.connectorplugin.lib.lettuce.core.masterreplica.MasterReplicaTopologyChangedEvent;
import de.themoep.connectorplugin.lib.lettuce.core.masterreplica.MasterReplicaTopologyRefresh;
import de.themoep.connectorplugin.lib.lettuce.core.masterreplica.ReplicaTopologyProvider;
import de.themoep.connectorplugin.lib.lettuce.core.masterreplica.StatefulRedisMasterReplicaConnection;
import de.themoep.connectorplugin.lib.lettuce.core.masterreplica.StatefulRedisMasterReplicaConnectionImpl;
import de.themoep.connectorplugin.lib.lettuce.core.models.role.RedisNodeDescription;
import de.themoep.connectorplugin.lib.reactor.core.publisher.Mono;
import de.themoep.connectorplugin.lib.reactor.util.function.Tuple2;
import de.themoep.connectorplugin.lib.reactor.util.function.Tuples;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.function.Predicate;

class AutodiscoveryConnector<K, V>
implements MasterReplicaConnector<K, V> {
    private final RedisClient redisClient;
    private final RedisCodec<K, V> codec;
    private final RedisURI redisURI;
    private final Map<RedisURI, StatefulRedisConnection<?, ?>> initialConnections = new ConcurrentHashMap();

    AutodiscoveryConnector(RedisClient redisClient, RedisCodec<K, V> codec, RedisURI redisURI) {
        this.redisClient = redisClient;
        this.codec = codec;
        this.redisURI = redisURI;
    }

    @Override
    public CompletableFuture<StatefulRedisMasterReplicaConnection<K, V>> connectAsync() {
        ConnectionFuture<StatefulRedisConnection<K, V>> initialConnection = this.redisClient.connectAsync(this.codec, this.redisURI);
        Mono connect = Mono.fromCompletionStage(initialConnection).flatMap(nodeConnection -> {
            this.initialConnections.put(this.redisURI, (StatefulRedisConnection<?, ?>)nodeConnection);
            ReplicaTopologyProvider topologyProvider = new ReplicaTopologyProvider((StatefulRedisConnection<?, ?>)nodeConnection, this.redisURI);
            return Mono.fromCompletionStage(topologyProvider.getNodesAsync()).flatMap(nodes -> this.getMasterConnectionAndUri((List<RedisNodeDescription>)nodes, Tuples.of(this.redisURI, nodeConnection), this.codec));
        }).flatMap(connectionAndUri -> this.initializeConnection(this.codec, (Tuple2<RedisURI, StatefulRedisConnection<K, V>>)connectionAndUri));
        return connect.onErrorResume(t -> {
            Mono<Object> close = Mono.empty();
            for (StatefulRedisConnection<?, ?> connection : this.initialConnections.values()) {
                close = close.then(Mono.fromFuture(connection.closeAsync()));
            }
            return close.then(Mono.error(t));
        }).onErrorMap(ExecutionException.class, Throwable::getCause).toFuture();
    }

    private Mono<Tuple2<RedisURI, StatefulRedisConnection<K, V>>> getMasterConnectionAndUri(List<RedisNodeDescription> nodes, Tuple2<RedisURI, StatefulRedisConnection<K, V>> connectionTuple, RedisCodec<K, V> codec) {
        RedisNodeDescription node = AutodiscoveryConnector.getConnectedNode(this.redisURI, nodes);
        if (!node.getRole().isUpstream()) {
            RedisNodeDescription master2 = AutodiscoveryConnector.lookupMaster(nodes);
            ConnectionFuture<StatefulRedisConnection<K, V>> masterConnection = this.redisClient.connectAsync(codec, master2.getUri());
            return Mono.just(master2.getUri()).zipWith(Mono.fromCompletionStage(masterConnection)).doOnNext(it -> this.initialConnections.put((RedisURI)it.getT1(), (StatefulRedisConnection<?, ?>)it.getT2()));
        }
        return Mono.just(connectionTuple);
    }

    private Mono<StatefulRedisMasterReplicaConnection<K, V>> initializeConnection(RedisCodec<K, V> codec, Tuple2<RedisURI, StatefulRedisConnection<K, V>> connectionAndUri) {
        ReplicaTopologyProvider topologyProvider = new ReplicaTopologyProvider(connectionAndUri.getT2(), connectionAndUri.getT1());
        MasterReplicaTopologyRefresh refresh = new MasterReplicaTopologyRefresh(this.redisClient, topologyProvider);
        MasterReplicaConnectionProvider<K, V> connectionProvider = new MasterReplicaConnectionProvider<K, V>(this.redisClient, codec, this.redisURI, this.initialConnections);
        Mono<List<RedisNodeDescription>> refreshFuture = refresh.getNodes(this.redisURI);
        return refreshFuture.map(nodes -> {
            EventRecorder.getInstance().record(new MasterReplicaTopologyChangedEvent(this.redisURI, (List<RedisNodeDescription>)nodes));
            connectionProvider.setKnownNodes((Collection<RedisNodeDescription>)nodes);
            MasterReplicaChannelWriter channelWriter = new MasterReplicaChannelWriter(connectionProvider, this.redisClient.getResources());
            StatefulRedisMasterReplicaConnectionImpl connection = new StatefulRedisMasterReplicaConnectionImpl(channelWriter, codec, this.redisURI.getTimeout());
            connection.setOptions(this.redisClient.getOptions());
            return connection;
        });
    }

    private static RedisNodeDescription lookupMaster(List<RedisNodeDescription> nodes) {
        Optional<RedisNodeDescription> first = AutodiscoveryConnector.findFirst(nodes, n -> n.getRole().isUpstream());
        return first.orElseThrow(() -> new IllegalStateException("Cannot lookup master from " + nodes));
    }

    private static RedisNodeDescription getConnectedNode(RedisURI redisURI, List<RedisNodeDescription> nodes) {
        Optional<RedisNodeDescription> first = AutodiscoveryConnector.findFirst(nodes, n -> AutodiscoveryConnector.equals(redisURI, n));
        return first.orElseThrow(() -> new IllegalStateException("Cannot lookup node descriptor for connected node at " + redisURI));
    }

    private static Optional<RedisNodeDescription> findFirst(List<RedisNodeDescription> nodes, Predicate<? super RedisNodeDescription> predicate) {
        return nodes.stream().filter(predicate).findFirst();
    }

    private static boolean equals(RedisURI redisURI, RedisNodeDescription node) {
        return node.getUri().getHost().equals(redisURI.getHost()) && node.getUri().getPort() == redisURI.getPort();
    }
}

