package com.wgzhao.datax.core.transport.exchanger;

import com.wgzhao.datax.common.element.Record;
import com.wgzhao.datax.common.exception.CommonErrorCode;
import com.wgzhao.datax.common.exception.DataXException;
import com.wgzhao.datax.common.plugin.RecordReceiver;
import com.wgzhao.datax.common.plugin.RecordSender;
import com.wgzhao.datax.common.plugin.TaskPluginCollector;
import com.wgzhao.datax.core.statistics.communication.Communication;
import com.wgzhao.datax.core.transport.channel.Channel;
import com.wgzhao.datax.core.transport.record.TerminateRecord;
import com.wgzhao.datax.core.transport.transformer.TransformerExecution;
import com.wgzhao.datax.core.util.FrameworkErrorCode;
import com.wgzhao.datax.core.util.container.CoreConstant;
import java.util.List;

/* loaded from: input_file:com/wgzhao/datax/core/transport/exchanger/RecordExchanger.class */
public class RecordExchanger extends TransformerExchanger implements RecordSender, RecordReceiver {
    private static Class<? extends Record> RECORD_CLASS;
    private final Channel channel;
    private volatile boolean shutdown;
    static final /* synthetic */ boolean $assertionsDisabled;

    public RecordExchanger(int i, int i2, Channel channel, Communication communication, List<TransformerExecution> list, TaskPluginCollector taskPluginCollector) {
        super(i, i2, communication, list, taskPluginCollector);
        this.shutdown = false;
        if (!$assertionsDisabled && channel == null) {
            throw new AssertionError();
        }
        this.channel = channel;
        try {
            RECORD_CLASS = Class.forName(channel.getConfiguration().getString(CoreConstant.DATAX_CORE_TRANSPORT_RECORD_CLASS, "com.wgzhao.datax.core.transport.record.DefaultRecord"));
        } catch (ClassNotFoundException e) {
            throw DataXException.asDataXException(FrameworkErrorCode.CONFIG_ERROR, e);
        }
    }

    public Record getFromReader() {
        if (this.shutdown) {
            throw DataXException.asDataXException(CommonErrorCode.SHUT_DOWN_TASK, "");
        }
        Record pull = this.channel.pull();
        if (pull instanceof TerminateRecord) {
            return null;
        }
        return pull;
    }

    public Record createRecord() {
        try {
            return RECORD_CLASS.newInstance();
        } catch (Exception e) {
            throw DataXException.asDataXException(FrameworkErrorCode.CONFIG_ERROR, e);
        }
    }

    public void sendToWriter(Record record) {
        if (this.shutdown) {
            throw DataXException.asDataXException(CommonErrorCode.SHUT_DOWN_TASK, "");
        }
        Record doTransformer = doTransformer(record);
        if (doTransformer == null) {
            return;
        }
        this.channel.push(doTransformer);
        doStat();
    }

    public void flush() {
    }

    public void terminate() {
        if (this.shutdown) {
            throw DataXException.asDataXException(CommonErrorCode.SHUT_DOWN_TASK, "");
        }
        this.channel.pushTerminate(TerminateRecord.get());
        doStat();
    }

    public void shutdown() {
        this.shutdown = true;
    }

    static {
        $assertionsDisabled = !RecordExchanger.class.desiredAssertionStatus();
    }
}
