/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.connector.file.sink.writer;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.flink.annotation.Internal;
import org.apache.flink.connector.file.sink.writer.FileWriterBucketState;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.fs.RecoverableWriter;
import org.apache.flink.core.io.SimpleVersionedSerialization;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.streaming.api.functions.sink.filesystem.InProgressFileWriter;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.function.BiFunctionWithException;
import org.apache.flink.util.function.FunctionWithException;

@Internal
public class FileWriterBucketStateSerializer
implements SimpleVersionedSerializer<FileWriterBucketState> {
    private static final int MAGIC_NUMBER = 511069049;
    private final SimpleVersionedSerializer<InProgressFileWriter.InProgressFileRecoverable> inProgressFileRecoverableSerializer;
    private final SimpleVersionedSerializer<InProgressFileWriter.PendingFileRecoverable> pendingFileRecoverableSerializer;

    public FileWriterBucketStateSerializer(SimpleVersionedSerializer<InProgressFileWriter.InProgressFileRecoverable> inProgressFileRecoverableSerializer, SimpleVersionedSerializer<InProgressFileWriter.PendingFileRecoverable> pendingFileRecoverableSerializer) {
        this.inProgressFileRecoverableSerializer = Preconditions.checkNotNull(inProgressFileRecoverableSerializer);
        this.pendingFileRecoverableSerializer = Preconditions.checkNotNull(pendingFileRecoverableSerializer);
    }

    @Override
    public int getVersion() {
        return 3;
    }

    @Override
    public byte[] serialize(FileWriterBucketState state) throws IOException {
        DataOutputSerializer out = new DataOutputSerializer(256);
        out.writeInt(511069049);
        this.serializeV3(state, out);
        return out.getCopyOfBuffer();
    }

    @Override
    public FileWriterBucketState deserialize(int version, byte[] serialized) throws IOException {
        DataInputDeserializer in = new DataInputDeserializer(serialized);
        switch (version) {
            case 1: {
                this.validateMagicNumber(in);
                return this.deserializeV1(in);
            }
            case 2: {
                this.validateMagicNumber(in);
                return this.deserializeV2(in);
            }
            case 3: {
                this.validateMagicNumber(in);
                return this.deserializeV3(in);
            }
        }
        throw new IOException("Unrecognized version or corrupt state: " + version);
    }

    private void serializeV3(FileWriterBucketState state, DataOutputView dataOutputView) throws IOException {
        SimpleVersionedSerialization.writeVersionAndSerialize(SimpleVersionedStringSerializer.INSTANCE, state.getBucketId(), dataOutputView);
        dataOutputView.writeUTF(state.getBucketPath().toString());
        dataOutputView.writeLong(state.getInProgressFileCreationTime());
        if (state.hasInProgressFileRecoverable()) {
            InProgressFileWriter.InProgressFileRecoverable inProgressFileRecoverable = state.getInProgressFileRecoverable();
            dataOutputView.writeBoolean(true);
            SimpleVersionedSerialization.writeVersionAndSerialize(this.inProgressFileRecoverableSerializer, inProgressFileRecoverable, dataOutputView);
        } else {
            dataOutputView.writeBoolean(false);
        }
    }

    private FileWriterBucketState deserializeV1(DataInputView in) throws IOException {
        SimpleVersionedSerializer<RecoverableWriter.CommitRecoverable> commitableSerializer = this.getCommitableSerializer();
        SimpleVersionedSerializer<RecoverableWriter.ResumeRecoverable> resumableSerializer = this.getResumableSerializer();
        return this.internalDeserialize(in, dataInputView -> new OutputStreamBasedPartFileWriter.OutputStreamBasedInProgressFileRecoverable((RecoverableWriter.ResumeRecoverable)SimpleVersionedSerialization.readVersionAndDeSerialize(resumableSerializer, dataInputView)), (version, bytes) -> new OutputStreamBasedPartFileWriter.OutputStreamBasedPendingFileRecoverable((RecoverableWriter.CommitRecoverable)commitableSerializer.deserialize((int)version, (byte[])bytes)));
    }

    private FileWriterBucketState deserializeV2(DataInputView in) throws IOException {
        return this.internalDeserialize(in, dataInputView -> SimpleVersionedSerialization.readVersionAndDeSerialize(this.inProgressFileRecoverableSerializer, dataInputView), this.pendingFileRecoverableSerializer::deserialize);
    }

    private FileWriterBucketState deserializeV3(DataInputView in) throws IOException {
        return this.internalDeserialize(in, dataInputView -> SimpleVersionedSerialization.readVersionAndDeSerialize(this.inProgressFileRecoverableSerializer, dataInputView), null);
    }

    private FileWriterBucketState internalDeserialize(DataInputView dataInputView, FunctionWithException<DataInputView, InProgressFileWriter.InProgressFileRecoverable, IOException> inProgressFileParser, @Nullable BiFunctionWithException<Integer, byte[], InProgressFileWriter.PendingFileRecoverable, IOException> pendingFileParser) throws IOException {
        String bucketId = (String)SimpleVersionedSerialization.readVersionAndDeSerialize(SimpleVersionedStringSerializer.INSTANCE, dataInputView);
        String bucketPathStr = dataInputView.readUTF();
        long creationTime = dataInputView.readLong();
        InProgressFileWriter.InProgressFileRecoverable current = null;
        if (dataInputView.readBoolean()) {
            current = inProgressFileParser.apply(dataInputView);
        }
        HashMap<Long, List<InProgressFileWriter.PendingFileRecoverable>> pendingFileRecoverablesPerCheckpoint = new HashMap<Long, List<InProgressFileWriter.PendingFileRecoverable>>();
        if (pendingFileParser != null) {
            int pendingFileRecoverableSerializerVersion = dataInputView.readInt();
            int numCheckpoints = dataInputView.readInt();
            for (int i = 0; i < numCheckpoints; ++i) {
                long checkpointId = dataInputView.readLong();
                int numOfPendingFileRecoverables = dataInputView.readInt();
                ArrayList<InProgressFileWriter.PendingFileRecoverable> pendingFileRecoverables = new ArrayList<InProgressFileWriter.PendingFileRecoverable>(numOfPendingFileRecoverables);
                for (int j = 0; j < numOfPendingFileRecoverables; ++j) {
                    byte[] bytes = new byte[dataInputView.readInt()];
                    dataInputView.readFully(bytes);
                    pendingFileRecoverables.add(pendingFileParser.apply(pendingFileRecoverableSerializerVersion, bytes));
                }
                pendingFileRecoverablesPerCheckpoint.put(checkpointId, pendingFileRecoverables);
            }
        }
        return new FileWriterBucketState(bucketId, new Path(bucketPathStr), creationTime, current, pendingFileRecoverablesPerCheckpoint);
    }

    private void validateMagicNumber(DataInputView in) throws IOException {
        int magicNumber = in.readInt();
        if (magicNumber != 511069049) {
            throw new IOException(String.format("Corrupt data: Unexpected magic number %08X", magicNumber));
        }
    }

    private SimpleVersionedSerializer<RecoverableWriter.ResumeRecoverable> getResumableSerializer() {
        OutputStreamBasedPartFileWriter.OutputStreamBasedInProgressFileRecoverableSerializer outputStreamBasedInProgressFileRecoverableSerializer = (OutputStreamBasedPartFileWriter.OutputStreamBasedInProgressFileRecoverableSerializer)this.inProgressFileRecoverableSerializer;
        return outputStreamBasedInProgressFileRecoverableSerializer.getResumeSerializer();
    }

    private SimpleVersionedSerializer<RecoverableWriter.CommitRecoverable> getCommitableSerializer() {
        OutputStreamBasedPartFileWriter.OutputStreamBasedPendingFileRecoverableSerializer outputStreamBasedPendingFileRecoverableSerializer = (OutputStreamBasedPartFileWriter.OutputStreamBasedPendingFileRecoverableSerializer)this.pendingFileRecoverableSerializer;
        return outputStreamBasedPendingFileRecoverableSerializer.getCommitSerializer();
    }
}

