package com.wgzhao.datax.core.taskgroup;

import com.alibaba.fastjson.JSON;
import com.wgzhao.datax.common.constant.PluginType;
import com.wgzhao.datax.common.exception.CommonErrorCode;
import com.wgzhao.datax.common.exception.DataXException;
import com.wgzhao.datax.common.plugin.TaskPluginCollector;
import com.wgzhao.datax.common.statistics.PerfRecord;
import com.wgzhao.datax.common.statistics.PerfTrace;
import com.wgzhao.datax.common.statistics.VMInfo;
import com.wgzhao.datax.common.util.Configuration;
import com.wgzhao.datax.core.AbstractContainer;
import com.wgzhao.datax.core.meta.State;
import com.wgzhao.datax.core.statistics.communication.Communication;
import com.wgzhao.datax.core.statistics.communication.CommunicationTool;
import com.wgzhao.datax.core.statistics.communication.LocalTGCommunicationManager;
import com.wgzhao.datax.core.statistics.container.communicator.taskgroup.StandaloneTGContainerCommunicator;
import com.wgzhao.datax.core.statistics.plugin.task.AbstractTaskPluginCollector;
import com.wgzhao.datax.core.taskgroup.runner.AbstractRunner;
import com.wgzhao.datax.core.taskgroup.runner.ReaderRunner;
import com.wgzhao.datax.core.taskgroup.runner.WriterRunner;
import com.wgzhao.datax.core.transport.channel.Channel;
import com.wgzhao.datax.core.transport.exchanger.BufferedRecordExchanger;
import com.wgzhao.datax.core.transport.exchanger.BufferedRecordTransformerExchanger;
import com.wgzhao.datax.core.transport.transformer.TransformerExecution;
import com.wgzhao.datax.core.util.ClassUtil;
import com.wgzhao.datax.core.util.FrameworkErrorCode;
import com.wgzhao.datax.core.util.TransformerUtil;
import com.wgzhao.datax.core.util.container.CoreConstant;
import com.wgzhao.datax.core.util.container.LoadUtil;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang3.Validate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:com/wgzhao/datax/core/taskgroup/TaskGroupContainer.class */
public class TaskGroupContainer extends AbstractContainer {
    private static final Logger LOG;
    private final long jobId;
    private final int taskGroupId;
    private final String channelClazz;
    private final String taskCollectorClass;
    private final TaskMonitor taskMonitor;
    static final /* synthetic */ boolean $assertionsDisabled;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: com.wgzhao.datax.core.taskgroup.TaskGroupContainer$1, reason: invalid class name */
    /* loaded from: input_file:com/wgzhao/datax/core/taskgroup/TaskGroupContainer$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$com$wgzhao$datax$common$constant$PluginType = new int[PluginType.values().length];

        static {
            try {
                $SwitchMap$com$wgzhao$datax$common$constant$PluginType[PluginType.READER.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$com$wgzhao$datax$common$constant$PluginType[PluginType.WRITER.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: package-private */
    /* loaded from: input_file:com/wgzhao/datax/core/taskgroup/TaskGroupContainer$TaskExecutor.class */
    public class TaskExecutor {
        private final Configuration taskConfig;
        private final int taskId;
        private final int attemptCount;
        private final Channel channel;
        private final Thread readerThread;
        private final Thread writerThread;
        private final ReaderRunner readerRunner;
        private final WriterRunner writerRunner;
        private final Communication taskCommunication;

        public TaskExecutor(Configuration configuration, int i) {
            this.taskConfig = configuration;
            Validate.isTrue((null == this.taskConfig.getConfiguration("reader") || null == this.taskConfig.getConfiguration("reader")) ? false : true, "[reader|writer]的插件参数不能为空!", new Object[0]);
            this.taskId = this.taskConfig.getInt(CoreConstant.TASK_ID).intValue();
            this.attemptCount = i;
            this.taskCommunication = TaskGroupContainer.this.containerCommunicator.getCommunication(Integer.valueOf(this.taskId));
            Validate.notNull(this.taskCommunication, String.format("taskId[%d]的Communication没有注册过", Integer.valueOf(this.taskId)), new Object[0]);
            this.channel = (Channel) ClassUtil.instantiate(TaskGroupContainer.this.channelClazz, Channel.class, TaskGroupContainer.this.configuration);
            this.channel.setCommunication(this.taskCommunication);
            List<TransformerExecution> buildTransformerInfo = TransformerUtil.buildTransformerInfo(this.taskConfig);
            this.writerRunner = (WriterRunner) generateRunner(PluginType.WRITER);
            this.writerThread = new Thread(this.writerRunner, String.format("%d-%d-%d-writer", Long.valueOf(TaskGroupContainer.this.jobId), Integer.valueOf(TaskGroupContainer.this.taskGroupId), Integer.valueOf(this.taskId)));
            this.writerThread.setContextClassLoader(LoadUtil.getJarLoader(PluginType.WRITER, this.taskConfig.getString(CoreConstant.JOB_WRITER_NAME), Long.valueOf(TaskGroupContainer.this.getJobId())));
            this.readerRunner = (ReaderRunner) generateRunner(PluginType.READER, buildTransformerInfo);
            this.readerThread = new Thread(this.readerRunner, String.format("%d-%d-%d-reader", Long.valueOf(TaskGroupContainer.this.jobId), Integer.valueOf(TaskGroupContainer.this.taskGroupId), Integer.valueOf(this.taskId)));
            this.readerThread.setContextClassLoader(LoadUtil.getJarLoader(PluginType.READER, this.taskConfig.getString(CoreConstant.JOB_READER_NAME), Long.valueOf(TaskGroupContainer.this.getJobId())));
        }

        public void doStart() {
            this.writerThread.start();
            if (!this.writerThread.isAlive() || this.taskCommunication.getState() == State.FAILED) {
                throw DataXException.asDataXException(FrameworkErrorCode.RUNTIME_ERROR, this.taskCommunication.getThrowable());
            }
            this.readerThread.start();
            if (!this.readerThread.isAlive() && this.taskCommunication.getState() == State.FAILED) {
                throw DataXException.asDataXException(FrameworkErrorCode.RUNTIME_ERROR, this.taskCommunication.getThrowable());
            }
        }

        private AbstractRunner generateRunner(PluginType pluginType) {
            return generateRunner(pluginType, null);
        }

        private AbstractRunner generateRunner(PluginType pluginType, List<TransformerExecution> list) {
            AbstractRunner loadPluginRunner;
            switch (AnonymousClass1.$SwitchMap$com$wgzhao$datax$common$constant$PluginType[pluginType.ordinal()]) {
                case 1:
                    loadPluginRunner = LoadUtil.loadPluginRunner(pluginType, this.taskConfig.getString(CoreConstant.JOB_READER_NAME), Long.valueOf(TaskGroupContainer.this.getJobId()));
                    loadPluginRunner.setJobConf(this.taskConfig.getConfiguration(CoreConstant.JOB_READER_PARAMETER));
                    TaskPluginCollector taskPluginCollector = (TaskPluginCollector) ClassUtil.instantiate(TaskGroupContainer.this.taskCollectorClass, AbstractTaskPluginCollector.class, TaskGroupContainer.this.configuration, this.taskCommunication, PluginType.READER);
                    ((ReaderRunner) loadPluginRunner).setRecordSender((list == null || list.isEmpty()) ? new BufferedRecordExchanger(this.channel, taskPluginCollector) : new BufferedRecordTransformerExchanger(TaskGroupContainer.this.taskGroupId, this.taskId, this.channel, this.taskCommunication, taskPluginCollector, list));
                    loadPluginRunner.setTaskPluginCollector(taskPluginCollector);
                    break;
                case 2:
                    loadPluginRunner = LoadUtil.loadPluginRunner(pluginType, this.taskConfig.getString(CoreConstant.JOB_WRITER_NAME), Long.valueOf(TaskGroupContainer.this.getJobId()));
                    loadPluginRunner.setJobConf(this.taskConfig.getConfiguration(CoreConstant.JOB_WRITER_PARAMETER));
                    TaskPluginCollector taskPluginCollector2 = (TaskPluginCollector) ClassUtil.instantiate(TaskGroupContainer.this.taskCollectorClass, AbstractTaskPluginCollector.class, TaskGroupContainer.this.configuration, this.taskCommunication, PluginType.WRITER);
                    ((WriterRunner) loadPluginRunner).setRecordReceiver(new BufferedRecordExchanger(this.channel, taskPluginCollector2));
                    loadPluginRunner.setTaskPluginCollector(taskPluginCollector2);
                    break;
                default:
                    throw DataXException.asDataXException(FrameworkErrorCode.ARGUMENT_ERROR, "Cant generateRunner for:" + pluginType);
            }
            loadPluginRunner.setTaskGroupId(TaskGroupContainer.this.taskGroupId);
            loadPluginRunner.setTaskId(this.taskId);
            loadPluginRunner.setRunnerCommunication(this.taskCommunication);
            return loadPluginRunner;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean isTaskFinished() {
            return (this.readerThread.isAlive() || this.writerThread.isAlive() || this.taskCommunication == null || !this.taskCommunication.isFinished()) ? false : true;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public int getTaskId() {
            return this.taskId;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public long getTimeStamp() {
            return this.taskCommunication.getTimestamp();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public int getAttemptCount() {
            return this.attemptCount;
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean supportFailOver() {
            return this.writerRunner.supportFailOver();
        }

        /* JADX INFO: Access modifiers changed from: private */
        public void shutdown() {
            this.writerRunner.shutdown();
            this.readerRunner.shutdown();
            if (this.writerThread.isAlive()) {
                this.writerThread.interrupt();
            }
            if (this.readerThread.isAlive()) {
                this.readerThread.interrupt();
            }
        }

        /* JADX INFO: Access modifiers changed from: private */
        public boolean isShutdown() {
            return (this.readerThread.isAlive() || this.writerThread.isAlive()) ? false : true;
        }
    }

    public TaskGroupContainer(Configuration configuration) {
        super(configuration);
        this.taskMonitor = TaskMonitor.getInstance();
        initCommunicator(configuration);
        this.jobId = this.configuration.getLong(CoreConstant.DATAX_CORE_CONTAINER_JOB_ID).longValue();
        this.taskGroupId = this.configuration.getInt(CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_ID).intValue();
        this.channelClazz = this.configuration.getString(CoreConstant.DATAX_CORE_TRANSPORT_CHANNEL_CLASS);
        this.taskCollectorClass = this.configuration.getString(CoreConstant.DATAX_CORE_STATISTICS_COLLECTOR_PLUGIN_TASKCLASS);
    }

    private void initCommunicator(Configuration configuration) {
        super.setContainerCommunicator(new StandaloneTGContainerCommunicator(configuration));
    }

    public long getJobId() {
        return this.jobId;
    }

    public int getTaskGroupId() {
        return this.taskGroupId;
    }

    @Override // com.wgzhao.datax.core.AbstractContainer
    public void start() {
        Long l;
        try {
            try {
                int intValue = this.configuration.getInt(CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_SLEEPINTERVAL, 100).intValue();
                long longValue = this.configuration.getLong(CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_REPORTINTERVAL, 10000L).longValue();
                int intValue2 = this.configuration.getInt(CoreConstant.DATAX_CORE_CONTAINER_TASKGROUP_CHANNEL).intValue();
                int intValue3 = this.configuration.getInt(CoreConstant.DATAX_CORE_CONTAINER_TASK_FAILOVER_MAXRETRYTIMES, 1).intValue();
                long longValue2 = this.configuration.getLong(CoreConstant.DATAX_CORE_CONTAINER_TASK_FAILOVER_RETRYINTERVALINMSEC, 10000L).longValue();
                long longValue3 = this.configuration.getLong(CoreConstant.DATAX_CORE_CONTAINER_TASK_FAILOVER_MAXWAITINMSEC, 60000L).longValue();
                List<Configuration> listConfiguration = this.configuration.getListConfiguration(CoreConstant.DATAX_JOB_CONTENT);
                if (LOG.isDebugEnabled()) {
                    LOG.debug("taskGroup[{}]'s task configs[{}]", Integer.valueOf(this.taskGroupId), JSON.toJSONString(listConfiguration));
                }
                int size = listConfiguration.size();
                LOG.info("taskGroupId=[{}] start [{}] channels for [{}] tasks.", new Object[]{Integer.valueOf(this.taskGroupId), Integer.valueOf(intValue2), Integer.valueOf(size)});
                this.containerCommunicator.registerCommunication(listConfiguration);
                Map<Integer, Configuration> buildTaskConfigMap = buildTaskConfigMap(listConfiguration);
                List<Configuration> buildRemainTasks = buildRemainTasks(listConfiguration);
                HashMap hashMap = new HashMap();
                ArrayList arrayList = new ArrayList(intValue2);
                HashMap hashMap2 = new HashMap();
                long j = 0;
                Communication communication = new Communication();
                while (true) {
                    boolean z = false;
                    Iterator<Map.Entry<Integer, Communication>> it = this.containerCommunicator.getCommunicationMap().entrySet().iterator();
                    while (true) {
                        if (!it.hasNext()) {
                            break;
                        }
                        Map.Entry<Integer, Communication> next = it.next();
                        Integer key = next.getKey();
                        Communication value = next.getValue();
                        if (value.isFinished()) {
                            TaskExecutor removeTask = removeTask(arrayList, key.intValue());
                            this.taskMonitor.removeTask(key);
                            if (value.getState() == State.FAILED) {
                                hashMap.put(key, removeTask);
                                if (!$assertionsDisabled && removeTask == null) {
                                    throw new AssertionError();
                                }
                                if (!removeTask.supportFailOver() || removeTask.getAttemptCount() >= intValue3) {
                                    break;
                                }
                                removeTask.shutdown();
                                this.containerCommunicator.resetCommunication(key);
                                buildRemainTasks.add(buildTaskConfigMap.get(key));
                            } else if (value.getState() == State.KILLED) {
                                z = true;
                                break;
                            } else if (value.getState() == State.SUCCEEDED && (l = (Long) hashMap2.get(key)) != null) {
                                long currentTimeMillis = System.currentTimeMillis() - l.longValue();
                                LOG.debug("taskGroup[{}] taskId[{}] is successful, used[{}]ms", new Object[]{Integer.valueOf(this.taskGroupId), key, Long.valueOf(currentTimeMillis)});
                                PerfRecord.addPerfRecord(this.taskGroupId, key.intValue(), PerfRecord.PHASE.TASK_TOTAL, l.longValue(), currentTimeMillis * 1000 * 1000);
                                hashMap2.remove(key);
                                buildTaskConfigMap.remove(key);
                            }
                        }
                    }
                    z = true;
                    if (z) {
                        throw DataXException.asDataXException(FrameworkErrorCode.PLUGIN_RUNTIME_ERROR, reportTaskGroupCommunication(communication, size).getThrowable());
                    }
                    Iterator<Configuration> it2 = buildRemainTasks.iterator();
                    while (it2.hasNext() && arrayList.size() < intValue2) {
                        Configuration next2 = it2.next();
                        Integer num = next2.getInt(CoreConstant.TASK_ID);
                        int i = 1;
                        TaskExecutor taskExecutor = (TaskExecutor) hashMap.get(num);
                        if (taskExecutor != null) {
                            i = taskExecutor.getAttemptCount() + 1;
                            long currentTimeMillis2 = System.currentTimeMillis();
                            long timeStamp = taskExecutor.getTimeStamp();
                            if (currentTimeMillis2 - timeStamp >= longValue2) {
                                if (taskExecutor.isShutdown()) {
                                    LOG.debug("taskGroup[{}] taskId[{}] attemptCount[{}] has already shutdown", new Object[]{Integer.valueOf(this.taskGroupId), num, Integer.valueOf(taskExecutor.getAttemptCount())});
                                } else {
                                    if (currentTimeMillis2 - timeStamp > longValue3) {
                                        markCommunicationFailed(num);
                                        reportTaskGroupCommunication(communication, size);
                                        throw DataXException.asDataXException(CommonErrorCode.WAIT_TIME_EXCEED, "task failover等待超时");
                                    }
                                    taskExecutor.shutdown();
                                }
                            }
                        }
                        TaskExecutor taskExecutor2 = new TaskExecutor(intValue3 > 1 ? next2.clone() : next2, i);
                        hashMap2.put(num, Long.valueOf(System.currentTimeMillis()));
                        taskExecutor2.doStart();
                        it2.remove();
                        arrayList.add(taskExecutor2);
                        this.taskMonitor.registerTask(num, this.containerCommunicator.getCommunication(num));
                        hashMap.remove(num);
                        LOG.debug("taskGroup[{}] taskId[{}] attemptCount[{}] is started", new Object[]{Integer.valueOf(this.taskGroupId), num, Integer.valueOf(i)});
                    }
                    if (buildRemainTasks.isEmpty() && isAllTaskDone(arrayList) && this.containerCommunicator.collectState() == State.SUCCEEDED) {
                        Communication reportTaskGroupCommunication = reportTaskGroupCommunication(communication, size);
                        LOG.debug("taskGroup[{}] completed it's tasks.", Integer.valueOf(this.taskGroupId));
                        reportTaskGroupCommunication(reportTaskGroupCommunication, size);
                        if (PerfTrace.getInstance().isJob()) {
                            return;
                        }
                        VMInfo vmInfo = VMInfo.getVmInfo();
                        if (vmInfo != null) {
                            vmInfo.getDelta(false);
                            LOG.debug(vmInfo.totalString());
                        }
                        LOG.debug(PerfTrace.getInstance().summarizeNoException());
                        removeTaskGroup();
                        return;
                    }
                    long currentTimeMillis3 = System.currentTimeMillis();
                    if (currentTimeMillis3 - j > longValue) {
                        communication = reportTaskGroupCommunication(communication, size);
                        j = currentTimeMillis3;
                        for (TaskExecutor taskExecutor3 : arrayList) {
                            this.taskMonitor.report(Integer.valueOf(taskExecutor3.getTaskId()), this.containerCommunicator.getCommunication(Integer.valueOf(taskExecutor3.getTaskId())));
                        }
                    }
                    Thread.sleep(intValue);
                }
            } catch (Throwable th) {
                Communication collect = this.containerCommunicator.collect();
                if (collect.getThrowable() == null) {
                    collect.setThrowable(th);
                }
                collect.setState(State.FAILED);
                this.containerCommunicator.report(collect);
                throw DataXException.asDataXException(FrameworkErrorCode.RUNTIME_ERROR, th);
            }
        } catch (Throwable th2) {
            if (!PerfTrace.getInstance().isJob()) {
                VMInfo vmInfo2 = VMInfo.getVmInfo();
                if (vmInfo2 != null) {
                    vmInfo2.getDelta(false);
                    LOG.debug(vmInfo2.totalString());
                }
                LOG.debug(PerfTrace.getInstance().summarizeNoException());
                removeTaskGroup();
            }
            throw th2;
        }
    }

    private Map<Integer, Configuration> buildTaskConfigMap(List<Configuration> list) {
        HashMap hashMap = new HashMap();
        for (Configuration configuration : list) {
            hashMap.put(Integer.valueOf(configuration.getInt(CoreConstant.TASK_ID).intValue()), configuration);
        }
        return hashMap;
    }

    private List<Configuration> buildRemainTasks(List<Configuration> list) {
        return new LinkedList(list);
    }

    private TaskExecutor removeTask(List<TaskExecutor> list, int i) {
        Iterator<TaskExecutor> it = list.iterator();
        while (it.hasNext()) {
            TaskExecutor next = it.next();
            if (next.getTaskId() == i) {
                it.remove();
                return next;
            }
        }
        return null;
    }

    private boolean isAllTaskDone(List<TaskExecutor> list) {
        Iterator<TaskExecutor> it = list.iterator();
        while (it.hasNext()) {
            if (!it.next().isTaskFinished()) {
                return false;
            }
        }
        return true;
    }

    private Communication reportTaskGroupCommunication(Communication communication, int i) {
        Communication collect = this.containerCommunicator.collect();
        collect.setTimestamp(System.currentTimeMillis());
        Communication reportCommunication = CommunicationTool.getReportCommunication(collect, communication, i);
        this.containerCommunicator.report(reportCommunication);
        return reportCommunication;
    }

    private void markCommunicationFailed(Integer num) {
        this.containerCommunicator.getCommunication(num).setState(State.FAILED);
    }

    public void removeTaskGroup() {
        try {
            LoadUtil.getConfigurationSet().remove(Long.valueOf(this.jobId));
            Iterator<Map.Entry<Integer, Communication>> it = LocalTGCommunicationManager.getTaskGroupCommunicationMap().entrySet().iterator();
            while (it.hasNext()) {
                Map.Entry<Integer, Communication> next = it.next();
                if (String.valueOf(next.getKey()).startsWith(String.valueOf(this.jobId))) {
                    it.remove();
                }
            }
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    static {
        $assertionsDisabled = !TaskGroupContainer.class.desiredAssertionStatus();
        LOG = LoggerFactory.getLogger(TaskGroupContainer.class);
    }
}
