/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.connector.sink2;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
import org.apache.flink.api.connector.sink.GlobalCommitter;
import org.apache.flink.api.connector.sink2.Committer;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.streaming.api.connector.sink2.CommittableMessage;
import org.apache.flink.streaming.api.connector.sink2.GlobalCommittableWrapper;
import org.apache.flink.streaming.api.connector.sink2.GlobalCommitterSerializer;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.util.SimpleVersionedListState;
import org.apache.flink.streaming.api.transformations.SinkV1Adapter;
import org.apache.flink.streaming.runtime.operators.sink.committables.CheckpointCommittableManager;
import org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollector;
import org.apache.flink.streaming.runtime.operators.sink.committables.CommittableCollectorSerializer;
import org.apache.flink.streaming.runtime.operators.sink.committables.CommittableManager;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.SerializableSupplier;

class GlobalCommitterOperator<CommT, GlobalCommT>
extends AbstractStreamOperator<Void>
implements OneInputStreamOperator<CommittableMessage<CommT>, Void>,
BoundedOneInput {
    private static final ListStateDescriptor<byte[]> GLOBAL_COMMITTER_OPERATOR_RAW_STATES_DESC = new ListStateDescriptor<byte[]>("streaming_committer_raw_states", BytePrimitiveArraySerializer.INSTANCE);
    private final SerializableSupplier<Committer<CommT>> committerFactory;
    private final SerializableSupplier<SimpleVersionedSerializer<CommT>> committableSerializerFactory;
    private ListState<GlobalCommittableWrapper<CommT, GlobalCommT>> globalCommitterState;
    private Committer<CommT> committer;
    private CommittableCollector<CommT> committableCollector;
    private long lastCompletedCheckpointId = -1L;
    private SimpleVersionedSerializer<CommT> committableSerializer;
    @Nullable
    private GlobalCommitter<CommT, GlobalCommT> globalCommitter;
    @Nullable
    private SimpleVersionedSerializer<GlobalCommT> globalCommittableSerializer;
    private List<GlobalCommT> sinkV1State = new ArrayList<GlobalCommT>();

    GlobalCommitterOperator(SerializableSupplier<Committer<CommT>> committerFactory, SerializableSupplier<SimpleVersionedSerializer<CommT>> committableSerializerFactory) {
        this.committerFactory = Preconditions.checkNotNull(committerFactory);
        this.committableSerializerFactory = Preconditions.checkNotNull(committableSerializerFactory);
    }

    @Override
    public void setup(StreamTask<?, ?> containingTask, StreamConfig config, Output<StreamRecord<Void>> output) {
        super.setup(containingTask, config, output);
        this.committer = (Committer)this.committerFactory.get();
        this.committableCollector = CommittableCollector.of(this.getRuntimeContext());
        this.committableSerializer = (SimpleVersionedSerializer)this.committableSerializerFactory.get();
        if (this.committer instanceof SinkV1Adapter.GlobalCommitterAdapter) {
            SinkV1Adapter.GlobalCommitterAdapter gc = (SinkV1Adapter.GlobalCommitterAdapter)this.committer;
            this.globalCommitter = gc.getGlobalCommitter();
            this.globalCommittableSerializer = gc.getGlobalCommittableSerializer();
        }
    }

    @Override
    public void snapshotState(StateSnapshotContext context) throws Exception {
        super.snapshotState(context);
        this.globalCommitterState.update(Collections.singletonList(new GlobalCommittableWrapper<CommT, GlobalCommT>(this.committableCollector.copy(), new ArrayList<GlobalCommT>(this.sinkV1State))));
    }

    @Override
    public void initializeState(StateInitializationContext context) throws Exception {
        super.initializeState(context);
        CommittableCollectorSerializer<CommT> committableCollectorSerializer = new CommittableCollectorSerializer<CommT>(this.committableSerializer, this.getRuntimeContext().getIndexOfThisSubtask(), this.getRuntimeContext().getMaxNumberOfParallelSubtasks());
        GlobalCommitterSerializer<CommT, GlobalCommT> serializer = new GlobalCommitterSerializer<CommT, GlobalCommT>(committableCollectorSerializer, this.globalCommittableSerializer, this.getRuntimeContext().getIndexOfThisSubtask(), this.getRuntimeContext().getMaxNumberOfParallelSubtasks());
        this.globalCommitterState = new SimpleVersionedListState<GlobalCommittableWrapper<CommT, GlobalCommT>>(context.getOperatorStateStore().getListState(GLOBAL_COMMITTER_OPERATOR_RAW_STATES_DESC), serializer);
        if (context.isRestored()) {
            ((Iterable)this.globalCommitterState.get()).forEach(cc -> {
                this.sinkV1State.addAll(cc.getGlobalCommittables());
                this.committableCollector.merge(cc.getCommittableCollector());
            });
            this.lastCompletedCheckpointId = context.getRestoredCheckpointId().getAsLong();
            if (this.globalCommitter != null) {
                this.sinkV1State = this.globalCommitter.filterRecoveredCommittables(this.sinkV1State);
            }
            this.commit(this.lastCompletedCheckpointId);
        }
    }

    @Override
    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        super.notifyCheckpointComplete(checkpointId);
        Preconditions.checkState(this.globalCommitter != null || this.sinkV1State.isEmpty(), "GlobalCommitter is required to commit SinkV1 state.");
        this.lastCompletedCheckpointId = Math.max(this.lastCompletedCheckpointId, checkpointId);
        this.commit(this.lastCompletedCheckpointId);
    }

    private Collection<? extends CheckpointCommittableManager<CommT>> getCommittables(long checkpointId) {
        Collection<CheckpointCommittableManager<CommT>> committables = this.committableCollector.getCheckpointCommittablesUpTo(checkpointId);
        if (committables == null) {
            return Collections.emptyList();
        }
        return committables;
    }

    private void commit(long checkpointId) throws IOException, InterruptedException {
        if (this.globalCommitter != null && !this.sinkV1State.isEmpty()) {
            this.sinkV1State = this.globalCommitter.commit(this.sinkV1State);
        }
        for (CheckpointCommittableManager<CommT> committable : this.getCommittables(checkpointId)) {
            boolean fullyReceived = committable.getCheckpointId() == this.lastCompletedCheckpointId;
            committable.commit(fullyReceived, this.committer);
        }
    }

    @Override
    public void endInput() throws Exception {
        CommittableManager<CommT> endOfInputCommittable = this.committableCollector.getEndOfInputCommittable();
        if (endOfInputCommittable != null) {
            do {
                endOfInputCommittable.commit(false, this.committer);
            } while (!this.committableCollector.isFinished());
        }
    }

    @Override
    public void processElement(StreamRecord<CommittableMessage<CommT>> element) throws Exception {
        this.committableCollector.addMessage(element.getValue());
    }
}

