/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.table.store.connector.sink;

import java.util.HashMap;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.table.catalog.ObjectIdentifier;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.sink.DynamicTableSink;
import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite;
import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DynamicTableFactory;
import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.connector.FlinkConnectorOptions;
import org.apache.flink.table.store.connector.TableStoreDataStreamSinkProvider;
import org.apache.flink.table.store.connector.sink.FlinkSinkBuilder;
import org.apache.flink.table.store.file.catalog.CatalogLock;
import org.apache.flink.table.store.file.operation.Lock;
import org.apache.flink.table.store.log.LogSinkProvider;
import org.apache.flink.table.store.log.LogStoreTableFactory;
import org.apache.flink.table.store.table.AppendOnlyFileStoreTable;
import org.apache.flink.table.store.table.ChangelogValueCountFileStoreTable;
import org.apache.flink.table.store.table.ChangelogWithKeyFileStoreTable;
import org.apache.flink.table.store.table.FileStoreTable;
import org.apache.flink.table.store.table.sink.LogSinkFunction;
import org.apache.flink.types.RowKind;

public class TableStoreSink
implements DynamicTableSink,
SupportsOverwrite,
SupportsPartitioning {
    private final ObjectIdentifier tableIdentifier;
    private final FileStoreTable table;
    private final DynamicTableFactory.Context context;
    @Nullable
    private final LogStoreTableFactory logStoreTableFactory;
    private Map<String, String> staticPartitions = new HashMap<String, String>();
    private boolean overwrite = false;
    @Nullable
    private CatalogLock.Factory lockFactory;

    public TableStoreSink(ObjectIdentifier tableIdentifier, FileStoreTable table, DynamicTableFactory.Context context, @Nullable LogStoreTableFactory logStoreTableFactory) {
        this.tableIdentifier = tableIdentifier;
        this.table = table;
        this.context = context;
        this.logStoreTableFactory = logStoreTableFactory;
    }

    public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
        if (this.table instanceof AppendOnlyFileStoreTable) {
            return requestedMode;
        }
        if (this.table instanceof ChangelogValueCountFileStoreTable) {
            return requestedMode;
        }
        if (this.table instanceof ChangelogWithKeyFileStoreTable) {
            Configuration options = Configuration.fromMap(this.table.schema().options());
            if (options.get(CoreOptions.CHANGELOG_PRODUCER) == CoreOptions.ChangelogProducer.INPUT) {
                return requestedMode;
            }
            if (options.get(CoreOptions.LOG_CHANGELOG_MODE) == CoreOptions.LogChangelogMode.ALL) {
                return requestedMode;
            }
            ChangelogMode.Builder builder = ChangelogMode.newBuilder();
            for (RowKind kind : requestedMode.getContainedKinds()) {
                if (kind == RowKind.UPDATE_BEFORE) continue;
                builder.addContainedKind(kind);
            }
            return builder.build();
        }
        throw new UnsupportedOperationException("Unknown FileStoreTable subclass " + this.table.getClass().getName());
    }

    public DynamicTableSink.SinkRuntimeProvider getSinkRuntimeProvider(DynamicTableSink.Context context) {
        if (this.overwrite && !context.isBounded()) {
            throw new UnsupportedOperationException("Table store doesn't support streaming INSERT OVERWRITE.");
        }
        LogSinkProvider logSinkProvider = null;
        if (this.logStoreTableFactory != null) {
            logSinkProvider = this.logStoreTableFactory.createSinkProvider(this.context, context);
        }
        Configuration conf = Configuration.fromMap(this.table.schema().options());
        LogSinkFunction logSinkFunction = this.overwrite ? null : (logSinkProvider == null ? null : logSinkProvider.createSink());
        return new TableStoreDataStreamSinkProvider(dataStream -> new FlinkSinkBuilder(this.table).withInput((DataStream<RowData>)new DataStream(dataStream.getExecutionEnvironment(), dataStream.getTransformation())).withLockFactory(Lock.factory(this.lockFactory, this.tableIdentifier.toObjectPath())).withLogSinkFunction(logSinkFunction).withOverwritePartition(this.overwrite ? this.staticPartitions : null).withParallelism((Integer)conf.get(FlinkConnectorOptions.SINK_PARALLELISM)).build());
    }

    public DynamicTableSink copy() {
        TableStoreSink copied = new TableStoreSink(this.tableIdentifier, this.table, this.context, this.logStoreTableFactory);
        copied.staticPartitions = new HashMap<String, String>(this.staticPartitions);
        copied.overwrite = this.overwrite;
        copied.lockFactory = this.lockFactory;
        return copied;
    }

    public String asSummaryString() {
        return "TableStoreSink";
    }

    public void applyStaticPartition(Map<String, String> partition) {
        this.table.schema().partitionKeys().forEach(partitionKey -> {
            if (partition.containsKey(partitionKey)) {
                this.staticPartitions.put((String)partitionKey, (String)partition.get(partitionKey));
            }
        });
    }

    public void applyOverwrite(boolean overwrite) {
        this.overwrite = overwrite;
    }

    public void setLockFactory(@Nullable CatalogLock.Factory lockFactory) {
        this.lockFactory = lockFactory;
    }
}

