/*
 * Decompiled with CFR 0.152.
 */
package de.themoep.connectorplugin.lib.lettuce.core.internal;

import de.themoep.connectorplugin.lib.lettuce.core.internal.AsyncCloseable;
import de.themoep.connectorplugin.lib.lettuce.core.internal.Futures;
import de.themoep.connectorplugin.lib.lettuce.core.internal.LettuceAssert;
import java.util.ArrayList;
import java.util.Map;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import java.util.function.BiConsumer;
import java.util.function.Consumer;
import java.util.function.Function;

public class AsyncConnectionProvider<K, T extends AsyncCloseable, F extends CompletionStage<T>> {
    private final Function<K, F> connectionFactory;
    private final Map<K, Sync<K, T, F>> connections = new ConcurrentHashMap<K, Sync<K, T, F>>();
    private volatile boolean closed;

    public AsyncConnectionProvider(Function<? extends K, ? extends F> connectionFactory) {
        LettuceAssert.notNull(connectionFactory, "AsyncConnectionProvider must not be null");
        this.connectionFactory = connectionFactory;
    }

    public F getConnection(K key) {
        return this.getSynchronizer(key).getConnection();
    }

    private Sync<K, T, F> getSynchronizer(K key) {
        if (this.closed) {
            throw new IllegalStateException("ConnectionProvider is already closed");
        }
        Sync sync = this.connections.get(key);
        if (sync != null) {
            return sync;
        }
        AtomicBoolean atomicBoolean = new AtomicBoolean();
        sync = this.connections.computeIfAbsent(key, connectionKey -> {
            Sync createdSync = new Sync(key, (CompletionStage)this.connectionFactory.apply(key));
            if (this.closed) {
                createdSync.cancel();
            }
            return createdSync;
        });
        if (atomicBoolean.compareAndSet(false, true)) {
            sync.getConnection().whenComplete((c, t) -> {
                if (t != null) {
                    this.connections.remove(key);
                }
            });
        }
        return sync;
    }

    public void register(K key, T connection) {
        this.connections.put(key, new Sync(key, connection));
    }

    public int getConnectionCount() {
        Sync[] syncs = this.connections.values().toArray(new Sync[0]);
        int count = 0;
        for (Sync sync : syncs) {
            if (!sync.isComplete()) continue;
            ++count;
        }
        return count;
    }

    public CompletableFuture<Void> close() {
        this.closed = true;
        ArrayList futures = new ArrayList();
        this.forEach((? super K connectionKey, ? super T closeable) -> {
            futures.add(closeable.closeAsync());
            this.connections.remove(connectionKey);
        });
        return Futures.allOf(futures);
    }

    public void close(K key) {
        LettuceAssert.notNull(key, "ConnectionKey must not be null!");
        Sync<K, AsyncCloseable, F> sync = this.connections.get(key);
        if (sync != null) {
            this.connections.remove(key);
            sync.doWithConnection(AsyncCloseable::closeAsync);
        }
    }

    public void forEach(Consumer<? super T> action) {
        LettuceAssert.notNull(action, "Action must not be null!");
        this.connections.values().forEach((? super T sync) -> {
            if (sync != null) {
                sync.doWithConnection(action);
            }
        });
    }

    public void forEach(BiConsumer<? super K, ? super T> action) {
        this.connections.forEach((? super K key, ? super V sync) -> sync.doWithConnection(action));
    }

    static class Sync<K, T extends AsyncCloseable, F extends CompletionStage<T>> {
        private static final int PHASE_IN_PROGRESS = 0;
        private static final int PHASE_COMPLETE = 1;
        private static final int PHASE_FAILED = 2;
        private static final int PHASE_CANCELED = 3;
        private static final AtomicIntegerFieldUpdater<Sync> PHASE = AtomicIntegerFieldUpdater.newUpdater(Sync.class, "phase");
        private volatile int phase = 0;
        private volatile T connection;
        private final K key;
        private final F future;

        public Sync(K key, F future) {
            this.key = key;
            this.future = future.whenComplete((connection, throwable) -> {
                if (throwable != null) {
                    if (throwable instanceof CancellationException) {
                        PHASE.compareAndSet(this, 0, 3);
                    }
                    PHASE.compareAndSet(this, 0, 2);
                }
                if (PHASE.compareAndSet(this, 0, 1) && connection != null) {
                    this.connection = connection;
                }
            });
        }

        public Sync(K key, T value) {
            this.key = key;
            this.connection = value;
            this.future = CompletableFuture.completedFuture(value);
            PHASE.set(this, 1);
        }

        public void cancel() {
            this.future.toCompletableFuture().cancel(false);
            this.doWithConnection(AsyncCloseable::closeAsync);
        }

        public F getConnection() {
            return this.future;
        }

        void doWithConnection(Consumer<? super T> action) {
            if (this.isComplete()) {
                action.accept(this.connection);
            } else {
                this.future.thenAccept(action);
            }
        }

        void doWithConnection(BiConsumer<? super K, ? super T> action) {
            if (this.isComplete()) {
                action.accept(this.key, this.connection);
            } else {
                this.future.thenAccept(c -> action.accept(this.key, c));
            }
        }

        private boolean isComplete() {
            return PHASE.get(this) == 1;
        }
    }
}

