/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.dispatcher.runner;

import java.io.IOException;
import java.util.Collection;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.stream.Collectors;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.dispatcher.JobDispatcherFactory;
import org.apache.flink.runtime.dispatcher.PartialDispatcherServices;
import org.apache.flink.runtime.dispatcher.runner.DefaultDispatcherGatewayServiceFactory;
import org.apache.flink.runtime.dispatcher.runner.DispatcherLeaderProcessFactoryFactory;
import org.apache.flink.runtime.dispatcher.runner.JobDispatcherLeaderProcessFactory;
import org.apache.flink.runtime.entrypoint.component.JobGraphRetriever;
import org.apache.flink.runtime.highavailability.JobResultStore;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmanager.JobPersistenceComponentFactory;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class JobDispatcherLeaderProcessFactoryFactory
implements DispatcherLeaderProcessFactoryFactory {
    private static final Logger LOG = LoggerFactory.getLogger(JobDispatcherLeaderProcessFactoryFactory.class);
    private final JobGraphRetriever jobGraphRetriever;

    @VisibleForTesting
    JobDispatcherLeaderProcessFactoryFactory(JobGraphRetriever jobGraphRetriever) {
        this.jobGraphRetriever = jobGraphRetriever;
    }

    @Override
    public JobDispatcherLeaderProcessFactory createFactory(JobPersistenceComponentFactory jobPersistenceComponentFactory, Executor ioExecutor, RpcService rpcService, PartialDispatcherServices partialDispatcherServices, FatalErrorHandler fatalErrorHandler) {
        JobGraph jobGraph;
        try {
            jobGraph = Preconditions.checkNotNull(this.jobGraphRetriever.retrieveJobGraph(partialDispatcherServices.getConfiguration()));
        }
        catch (FlinkException e) {
            throw new FlinkRuntimeException("Could not retrieve the JobGraph.", e);
        }
        JobResultStore jobResultStore = jobPersistenceComponentFactory.createJobResultStore();
        Collection<JobResult> recoveredDirtyJobResults = JobDispatcherLeaderProcessFactoryFactory.getDirtyJobResults(jobResultStore);
        Optional<JobResult> maybeRecoveredDirtyJobResult = JobDispatcherLeaderProcessFactoryFactory.extractDirtyJobResult(recoveredDirtyJobResults, jobGraph);
        Optional<JobGraph> maybeJobGraph = JobDispatcherLeaderProcessFactoryFactory.getJobGraphBasedOnDirtyJobResults(jobGraph, recoveredDirtyJobResults);
        DefaultDispatcherGatewayServiceFactory defaultDispatcherServiceFactory = new DefaultDispatcherGatewayServiceFactory(JobDispatcherFactory.INSTANCE, rpcService, partialDispatcherServices);
        return new JobDispatcherLeaderProcessFactory(defaultDispatcherServiceFactory, maybeJobGraph.orElse(null), maybeRecoveredDirtyJobResult.orElse(null), jobResultStore, fatalErrorHandler);
    }

    public static JobDispatcherLeaderProcessFactoryFactory create(JobGraphRetriever jobGraphRetriever) {
        return new JobDispatcherLeaderProcessFactoryFactory(jobGraphRetriever);
    }

    private static Collection<JobResult> getDirtyJobResults(JobResultStore jobResultStore) {
        try {
            return jobResultStore.getDirtyResults();
        }
        catch (IOException e) {
            throw new FlinkRuntimeException("Could not retrieve the JobResults of dirty jobs from the underlying JobResultStore.", e);
        }
    }

    private static Optional<JobResult> extractDirtyJobResult(Collection<JobResult> dirtyJobResults, JobGraph jobGraph) {
        Optional<JobResult> actualDirtyJobResult = Optional.empty();
        for (JobResult dirtyJobResult : dirtyJobResults) {
            if (dirtyJobResult.getJobId().equals(jobGraph.getJobID())) {
                actualDirtyJobResult = Optional.of(dirtyJobResult);
                continue;
            }
            LOG.warn("Unexpected dirty JobResultStore entry: Job '{}' is listed as dirty, isn't part of this single-job cluster, though.", (Object)dirtyJobResult.getJobId());
        }
        return actualDirtyJobResult;
    }

    private static Optional<JobGraph> getJobGraphBasedOnDirtyJobResults(JobGraph jobGraph, Collection<JobResult> dirtyJobResults) {
        Set jobIdsOfFinishedJobs = dirtyJobResults.stream().map(JobResult::getJobId).collect(Collectors.toSet());
        return jobIdsOfFinishedJobs.contains(jobGraph.getJobID()) ? Optional.empty() : Optional.of(jobGraph);
    }
}

