package com.wgzhao.datax.core.taskgroup.runner;

import com.wgzhao.datax.common.plugin.AbstractTaskPlugin;
import com.wgzhao.datax.common.plugin.RecordReceiver;
import com.wgzhao.datax.common.spi.Writer;
import com.wgzhao.datax.common.statistics.PerfRecord;
import com.wgzhao.datax.core.statistics.communication.CommunicationTool;
import org.apache.commons.lang3.Validate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/wgzhao/datax/core/taskgroup/runner/WriterRunner.class */
public class WriterRunner extends AbstractRunner implements Runnable {
    private static final Logger LOG = LoggerFactory.getLogger(WriterRunner.class);
    private RecordReceiver recordReceiver;

    public WriterRunner(AbstractTaskPlugin abstractTaskPlugin) {
        super(abstractTaskPlugin);
    }

    public void setRecordReceiver(RecordReceiver recordReceiver) {
        this.recordReceiver = recordReceiver;
    }

    @Override // java.lang.Runnable
    public void run() {
        Validate.isTrue(this.recordReceiver != null);
        Writer.Task plugin = getPlugin();
        PerfRecord perfRecord = new PerfRecord(getTaskGroupId(), getTaskId(), PerfRecord.PHASE.WAIT_READ_TIME);
        try {
            try {
                perfRecord.start();
                LOG.debug("task writer starts to do init ...");
                PerfRecord perfRecord2 = new PerfRecord(getTaskGroupId(), getTaskId(), PerfRecord.PHASE.WRITE_TASK_INIT);
                perfRecord2.start();
                plugin.init();
                perfRecord2.end();
                LOG.debug("task writer starts to do prepare ...");
                PerfRecord perfRecord3 = new PerfRecord(getTaskGroupId(), getTaskId(), PerfRecord.PHASE.WRITE_TASK_PREPARE);
                perfRecord3.start();
                plugin.prepare();
                perfRecord3.end();
                LOG.debug("task writer starts to write ...");
                PerfRecord perfRecord4 = new PerfRecord(getTaskGroupId(), getTaskId(), PerfRecord.PHASE.WRITE_TASK_DATA);
                perfRecord4.start();
                plugin.startWrite(this.recordReceiver);
                perfRecord4.addCount(CommunicationTool.getTotalReadRecords(super.getRunnerCommunication()));
                perfRecord4.addSize(CommunicationTool.getTotalReadBytes(super.getRunnerCommunication()));
                perfRecord4.end();
                LOG.debug("task writer starts to do post ...");
                PerfRecord perfRecord5 = new PerfRecord(getTaskGroupId(), getTaskId(), PerfRecord.PHASE.WRITE_TASK_POST);
                perfRecord5.start();
                plugin.post();
                perfRecord5.end();
                super.markSuccess();
                LOG.debug("task writer starts to do destroy ...");
                PerfRecord perfRecord6 = new PerfRecord(getTaskGroupId(), getTaskId(), PerfRecord.PHASE.WRITE_TASK_DESTROY);
                perfRecord6.start();
                super.destroy();
                perfRecord6.end();
                perfRecord.end(super.getRunnerCommunication().getLongCounter(CommunicationTool.WAIT_READER_TIME).longValue());
            } catch (Throwable th) {
                LOG.error("Writer Runner Received Exceptions:", th);
                super.markFail(th);
                LOG.debug("task writer starts to do destroy ...");
                PerfRecord perfRecord7 = new PerfRecord(getTaskGroupId(), getTaskId(), PerfRecord.PHASE.WRITE_TASK_DESTROY);
                perfRecord7.start();
                super.destroy();
                perfRecord7.end();
                perfRecord.end(super.getRunnerCommunication().getLongCounter(CommunicationTool.WAIT_READER_TIME).longValue());
            }
        } catch (Throwable th2) {
            LOG.debug("task writer starts to do destroy ...");
            PerfRecord perfRecord8 = new PerfRecord(getTaskGroupId(), getTaskId(), PerfRecord.PHASE.WRITE_TASK_DESTROY);
            perfRecord8.start();
            super.destroy();
            perfRecord8.end();
            perfRecord.end(super.getRunnerCommunication().getLongCounter(CommunicationTool.WAIT_READER_TIME).longValue());
            throw th2;
        }
    }

    public boolean supportFailOver() {
        return getPlugin().supportFailOver();
    }

    @Override // com.wgzhao.datax.core.taskgroup.runner.AbstractRunner
    public void shutdown() {
        this.recordReceiver.shutdown();
    }
}
