/*
 * Decompiled with CFR 0.152.
 */
package org.apache.hadoop.mapred;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URL;
import java.net.URLConnection;
import java.util.Arrays;
import java.util.Comparator;
import java.util.Random;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.DataOutputBuffer;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableUtils;
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.mapred.ClusterStatus;
import org.apache.hadoop.mapred.Counters;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.InvalidJobConfException;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobProfile;
import org.apache.hadoop.mapred.JobStatus;
import org.apache.hadoop.mapred.JobSubmissionProtocol;
import org.apache.hadoop.mapred.JobTracker;
import org.apache.hadoop.mapred.LocalJobRunner;
import org.apache.hadoop.mapred.MRConstants;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.mapred.TaskCompletionEvent;
import org.apache.hadoop.mapred.TaskReport;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.ToolBase;

public class JobClient
extends ToolBase
implements MRConstants {
    private static final Log LOG = LogFactory.getLog((String)"org.apache.hadoop.mapred.JobClient");
    private TaskStatusFilter taskOutputFilter = TaskStatusFilter.FAILED;
    static long MAX_JOBPROFILE_AGE = 2000L;
    JobSubmissionProtocol jobSubmitClient;
    FileSystem fs = null;
    static Random r = new Random();
    private static final int CURRENT_SPLIT_FILE_VERSION = 0;
    private static final byte[] SPLIT_FILE_HEADER = "SPL".getBytes();

    public JobClient() {
    }

    public JobClient(Configuration conf) throws IOException {
        this.setConf(conf);
        this.init();
    }

    public void init() throws IOException {
        String tracker = this.conf.get("mapred.job.tracker", "local");
        this.jobSubmitClient = "local".equals(tracker) ? new LocalJobRunner(this.conf) : (JobSubmissionProtocol)RPC.getProxy(JobSubmissionProtocol.class, 3L, JobTracker.getAddress(this.conf), this.conf);
    }

    public JobClient(InetSocketAddress jobTrackAddr, Configuration conf) throws IOException {
        this.jobSubmitClient = (JobSubmissionProtocol)RPC.getProxy(JobSubmissionProtocol.class, 3L, jobTrackAddr, conf);
    }

    public synchronized void close() throws IOException {
    }

    public synchronized FileSystem getFs() throws IOException {
        if (this.fs == null) {
            String fsName = this.jobSubmitClient.getFilesystemName();
            this.fs = FileSystem.getNamed(fsName, this.conf);
        }
        return this.fs;
    }

    public RunningJob submitJob(String jobFile) throws FileNotFoundException, InvalidJobConfException, IOException {
        JobConf job = new JobConf(jobFile);
        return this.submitJob(job);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public RunningJob submitJob(JobConf job) throws FileNotFoundException, InvalidJobConfException, IOException {
        String user;
        Path submitJobDir = new Path(job.getSystemDir(), "submit_" + Integer.toString(Math.abs(r.nextInt()), 36));
        Path submitJobFile = new Path(submitJobDir, "job.xml");
        Path submitJarFile = new Path(submitJobDir, "job.jar");
        Path submitSplitFile = new Path(submitJobDir, "job.split");
        FileSystem fs = this.getFs();
        LOG.debug((Object)("default FileSystem: " + fs.getUri()));
        URI[] tarchives = DistributedCache.getCacheArchives(job);
        URI[] tfiles = DistributedCache.getCacheFiles(job);
        if (tarchives != null || tfiles != null) {
            int i;
            if (tarchives != null) {
                String md5Archives = StringUtils.byteToHexString(DistributedCache.createMD5(tarchives[0], job));
                for (i = 1; i < tarchives.length; ++i) {
                    md5Archives = md5Archives + "," + StringUtils.byteToHexString(DistributedCache.createMD5(tarchives[i], job));
                }
                DistributedCache.setArchiveMd5(job, md5Archives);
            }
            if (tfiles != null) {
                String md5Files = StringUtils.byteToHexString(DistributedCache.createMD5(tfiles[0], job));
                for (i = 1; i < tfiles.length; ++i) {
                    md5Files = md5Files + "," + StringUtils.byteToHexString(DistributedCache.createMD5(tfiles[i], job));
                }
                DistributedCache.setFileMd5(job, md5Files);
            }
        }
        String originalJarPath = job.getJar();
        short replication = (short)job.getInt("mapred.submit.replication", 10);
        if (originalJarPath != null) {
            if ("".equals(job.getJobName())) {
                job.setJobName(new Path(originalJarPath).getName());
            }
            job.setJar(submitJarFile.toString());
            fs.copyFromLocalFile(new Path(originalJarPath), submitJarFile);
            fs.setReplication(submitJarFile, replication);
        }
        job.setUser((user = System.getProperty("user.name")) != null ? user : "Dr Who");
        if (job.getWorkingDirectory() == null) {
            job.setWorkingDirectory(fs.getWorkingDirectory());
        }
        job.getInputFormat().validateInput(job);
        job.getOutputFormat().checkOutputSpecs(fs, job);
        LOG.debug((Object)("Creating splits at " + fs.makeQualified(submitSplitFile)));
        InputSplit[] splits = job.getInputFormat().getSplits(job, job.getNumMapTasks());
        Arrays.sort(splits, new Comparator<InputSplit>(){

            @Override
            public int compare(InputSplit a, InputSplit b) {
                try {
                    long left = a.getLength();
                    long right = b.getLength();
                    if (left == right) {
                        return 0;
                    }
                    if (left < right) {
                        return 1;
                    }
                    return -1;
                }
                catch (IOException ie) {
                    throw new RuntimeException("Problem getting input split size", ie);
                }
            }
        });
        FSDataOutputStream out = fs.create(submitSplitFile);
        try {
            this.writeSplitsFile(splits, out);
        }
        finally {
            out.close();
        }
        job.set("mapred.job.split.file", submitSplitFile.toString());
        job.setNumMapTasks(splits.length);
        out = fs.create(submitJobFile, replication);
        try {
            job.write(out);
        }
        finally {
            out.close();
        }
        JobStatus status = this.jobSubmitClient.submitJob(submitJobFile.toString());
        if (status != null) {
            return new NetworkedJob(status);
        }
        throw new IOException("Could not launch job");
    }

    private void writeSplitsFile(InputSplit[] splits, FSDataOutputStream out) throws IOException {
        out.write(SPLIT_FILE_HEADER);
        WritableUtils.writeVInt(out, 0);
        WritableUtils.writeVInt(out, splits.length);
        DataOutputBuffer buffer = new DataOutputBuffer();
        RawSplit rawSplit = new RawSplit();
        for (InputSplit split : splits) {
            rawSplit.setClassName(split.getClass().getName());
            buffer.reset();
            split.write(buffer);
            rawSplit.setBytes(buffer.getData(), 0, buffer.getLength());
            rawSplit.setLocations(split.getLocations());
            rawSplit.write(out);
        }
    }

    static RawSplit[] readSplitFile(DataInput in) throws IOException {
        byte[] header = new byte[SPLIT_FILE_HEADER.length];
        in.readFully(header);
        if (!Arrays.equals(SPLIT_FILE_HEADER, header)) {
            throw new IOException("Invalid header on split file");
        }
        int vers = WritableUtils.readVInt(in);
        if (vers != 0) {
            throw new IOException("Unsupported split version " + vers);
        }
        int len = WritableUtils.readVInt(in);
        RawSplit[] result = new RawSplit[len];
        for (int i = 0; i < len; ++i) {
            result[i] = new RawSplit();
            result[i].readFields(in);
        }
        return result;
    }

    public RunningJob getJob(String jobid) throws IOException {
        JobStatus status = this.jobSubmitClient.getJobStatus(jobid);
        if (status != null) {
            return new NetworkedJob(status);
        }
        return null;
    }

    public TaskReport[] getMapTaskReports(String jobId) throws IOException {
        return this.jobSubmitClient.getMapTaskReports(jobId);
    }

    public TaskReport[] getReduceTaskReports(String jobId) throws IOException {
        return this.jobSubmitClient.getReduceTaskReports(jobId);
    }

    public ClusterStatus getClusterStatus() throws IOException {
        return this.jobSubmitClient.getClusterStatus();
    }

    public JobStatus[] jobsToComplete() throws IOException {
        return this.jobSubmitClient.jobsToComplete();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public static RunningJob runJob(JobConf job) throws IOException {
        TaskStatusFilter filter;
        JobClient jc = new JobClient(job);
        boolean error = true;
        RunningJob running = null;
        String lastReport = null;
        int MAX_RETRIES = 5;
        int retries = 5;
        try {
            filter = JobClient.getTaskOutputFilter(job);
        }
        catch (IllegalArgumentException e) {
            LOG.warn((Object)("Invalid Output filter : " + e.getMessage() + " Valid values are : NONE, FAILED, SUCCEEDED, ALL"));
            throw e;
        }
        try {
            running = jc.submitJob(job);
            String jobId = running.getJobID();
            LOG.info((Object)("Running job: " + jobId));
            int eventCounter = 0;
            while (true) {
                try {
                    Thread.sleep(1000L);
                }
                catch (InterruptedException e) {
                    // empty catch block
                }
                try {
                    if (running.isComplete()) break;
                    running = jc.getJob(jobId);
                    String report = " map " + StringUtils.formatPercent(running.mapProgress(), 0) + " reduce " + StringUtils.formatPercent(running.reduceProgress(), 0);
                    if (!report.equals(lastReport)) {
                        LOG.info((Object)report);
                        lastReport = report;
                    }
                    if (filter != TaskStatusFilter.NONE) {
                        TaskCompletionEvent[] events = running.getTaskCompletionEvents(eventCounter);
                        eventCounter += events.length;
                        block15: for (TaskCompletionEvent event : events) {
                            switch (filter) {
                                case SUCCEEDED: {
                                    if (event.getTaskStatus() != TaskCompletionEvent.Status.SUCCEEDED) continue block15;
                                    LOG.info((Object)event.toString());
                                    JobClient.displayTaskLogs(event.getTaskId(), event.getTaskTrackerHttp());
                                    continue block15;
                                }
                                case FAILED: {
                                    if (event.getTaskStatus() != TaskCompletionEvent.Status.FAILED) continue block15;
                                    LOG.info((Object)event.toString());
                                    JobClient.displayTaskLogs(event.getTaskId(), event.getTaskTrackerHttp());
                                    continue block15;
                                }
                                case ALL: {
                                    LOG.info((Object)event.toString());
                                    JobClient.displayTaskLogs(event.getTaskId(), event.getTaskTrackerHttp());
                                }
                            }
                        }
                    }
                    retries = 5;
                }
                catch (IOException ie) {
                    if (--retries == 0) {
                        LOG.warn((Object)"Final attempt failed, killing job.");
                        throw ie;
                    }
                    LOG.info((Object)("Communication problem with server: " + StringUtils.stringifyException(ie)));
                }
            }
            if (!running.isSuccessful()) {
                throw new IOException("Job failed!");
            }
            LOG.info((Object)("Job complete: " + jobId));
            running.getCounters().log(LOG);
            error = false;
        }
        finally {
            if (error && running != null) {
                running.killJob();
            }
            jc.close();
        }
        return running;
    }

    private static void displayTaskLogs(String taskId, String baseUrl) throws IOException {
        if (baseUrl != null) {
            JobClient.getTaskLogs(taskId, new URL(baseUrl + "&filter=stdout"), System.out);
            JobClient.getTaskLogs(taskId, new URL(baseUrl + "&filter=stderr"), System.err);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    private static void getTaskLogs(String taskId, URL taskLogUrl, OutputStream out) {
        try {
            URLConnection connection = taskLogUrl.openConnection();
            BufferedReader input = new BufferedReader(new InputStreamReader(connection.getInputStream()));
            BufferedWriter output = new BufferedWriter(new OutputStreamWriter(out));
            try {
                String logData = null;
                while ((logData = input.readLine()) != null) {
                    if (logData.length() <= 0) continue;
                    output.write(taskId + ": " + logData + "\n");
                    output.flush();
                }
            }
            finally {
                input.close();
            }
        }
        catch (IOException ioe) {
            LOG.warn((Object)("Error reading task output" + ioe.getMessage()));
        }
    }

    static Configuration getConfiguration(String jobTrackerSpec) {
        Configuration conf = new Configuration();
        if (jobTrackerSpec != null) {
            if (jobTrackerSpec.indexOf(":") >= 0) {
                conf.set("mapred.job.tracker", jobTrackerSpec);
            } else {
                String classpathFile = "hadoop-" + jobTrackerSpec + ".xml";
                URL validate = conf.getResource(classpathFile);
                if (validate == null) {
                    throw new RuntimeException(classpathFile + " not found on CLASSPATH");
                }
                conf.addFinalResource(classpathFile);
            }
        }
        return conf;
    }

    @Deprecated
    public void setTaskOutputFilter(TaskStatusFilter newValue) {
        this.taskOutputFilter = newValue;
    }

    public static TaskStatusFilter getTaskOutputFilter(JobConf job) {
        return TaskStatusFilter.valueOf(job.get("jobclient.output.filter", "FAILED"));
    }

    public static void setTaskOutputFilter(JobConf job, TaskStatusFilter newValue) {
        job.set("jobclient.output.filter", newValue.toString());
    }

    @Deprecated
    public TaskStatusFilter getTaskOutputFilter() {
        return this.taskOutputFilter;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public int run(String[] argv) throws Exception {
        if (argv.length < 2) {
            System.out.println("JobClient -submit <job> | -status <id> | -events <id> | -kill <id> [-jt <jobtracker:port>|<config>]");
            System.exit(-1);
        }
        this.init();
        String submitJobFile = null;
        String jobid = null;
        boolean getStatus = false;
        boolean killJob = false;
        for (int i = 0; i < argv.length; ++i) {
            if ("-submit".equals(argv[i])) {
                submitJobFile = argv[i + 1];
                ++i;
                continue;
            }
            if ("-status".equals(argv[i])) {
                jobid = argv[i + 1];
                getStatus = true;
                ++i;
                continue;
            }
            if ("-kill".equals(argv[i])) {
                jobid = argv[i + 1];
                killJob = true;
                ++i;
                continue;
            }
            if (!"-events".equals(argv[i])) continue;
            this.listEvents(argv[i + 1], Integer.parseInt(argv[i + 2]), Integer.parseInt(argv[i + 3]));
            i += 3;
        }
        int exitCode = -1;
        try {
            if (submitJobFile != null) {
                RunningJob job = this.submitJob(submitJobFile);
                System.out.println("Created job " + job.getJobID());
            } else if (getStatus) {
                RunningJob job = this.getJob(jobid);
                if (job == null) {
                    System.out.println("Could not find job " + jobid);
                } else {
                    System.out.println();
                    System.out.println(job);
                    exitCode = 0;
                }
            } else if (killJob) {
                RunningJob job = this.getJob(jobid);
                if (job == null) {
                    System.out.println("Could not find job " + jobid);
                } else {
                    job.killJob();
                    System.out.println("Killed job " + jobid);
                    exitCode = 0;
                }
            }
        }
        finally {
            this.close();
        }
        return exitCode;
    }

    private void listEvents(String jobId, int fromEventId, int numEvents) throws IOException {
        TaskCompletionEvent[] events = this.jobSubmitClient.getTaskCompletionEvents(jobId, fromEventId, numEvents);
        System.out.println("Task completion events for " + jobId);
        System.out.println("Number of events (from " + fromEventId + ") are: " + events.length);
        for (TaskCompletionEvent event : events) {
            System.out.println((Object)((Object)event.getTaskStatus()) + " " + event.getTaskId() + " " + event.getTaskTrackerHttp());
        }
    }

    public static void main(String[] argv) throws Exception {
        int res = new JobClient().doMain(new Configuration(), argv);
        System.exit(res);
    }

    static class RawSplit
    implements Writable {
        private String splitClass;
        private BytesWritable bytes = new BytesWritable();
        private String[] locations;

        RawSplit() {
        }

        public void setBytes(byte[] data, int offset, int length) {
            this.bytes.set(data, offset, length);
        }

        public void setClassName(String className) {
            this.splitClass = className;
        }

        public String getClassName() {
            return this.splitClass;
        }

        public BytesWritable getBytes() {
            return this.bytes;
        }

        public void setLocations(String[] locations) {
            this.locations = locations;
        }

        public String[] getLocations() {
            return this.locations;
        }

        public void readFields(DataInput in) throws IOException {
            this.splitClass = Text.readString(in);
            this.bytes.readFields(in);
            int len = WritableUtils.readVInt(in);
            this.locations = new String[len];
            for (int i = 0; i < len; ++i) {
                this.locations[i] = Text.readString(in);
            }
        }

        public void write(DataOutput out) throws IOException {
            Text.writeString(out, this.splitClass);
            this.bytes.write(out);
            WritableUtils.writeVInt(out, this.locations.length);
            for (int i = 0; i < this.locations.length; ++i) {
                Text.writeString(out, this.locations[i]);
            }
        }
    }

    class NetworkedJob
    implements RunningJob {
        JobProfile profile;
        JobStatus status;
        long statustime;

        public NetworkedJob(JobStatus job) throws IOException {
            this.status = job;
            this.profile = JobClient.this.jobSubmitClient.getJobProfile(job.getJobId());
            this.statustime = System.currentTimeMillis();
        }

        synchronized void ensureFreshStatus() throws IOException {
            if (System.currentTimeMillis() - this.statustime > MAX_JOBPROFILE_AGE) {
                this.status = JobClient.this.jobSubmitClient.getJobStatus(this.profile.getJobId());
                this.statustime = System.currentTimeMillis();
            }
        }

        public String getJobID() {
            return this.profile.getJobId();
        }

        public String getJobFile() {
            return this.profile.getJobFile();
        }

        public String getTrackingURL() {
            return this.profile.getURL().toString();
        }

        public float mapProgress() throws IOException {
            this.ensureFreshStatus();
            return this.status.mapProgress();
        }

        public float reduceProgress() throws IOException {
            this.ensureFreshStatus();
            return this.status.reduceProgress();
        }

        public synchronized boolean isComplete() throws IOException {
            this.ensureFreshStatus();
            return this.status.getRunState() == 2 || this.status.getRunState() == 3;
        }

        public synchronized boolean isSuccessful() throws IOException {
            this.ensureFreshStatus();
            return this.status.getRunState() == 2;
        }

        public void waitForCompletion() throws IOException {
            while (!this.isComplete()) {
                try {
                    Thread.sleep(5000L);
                }
                catch (InterruptedException interruptedException) {}
            }
        }

        public synchronized void killJob() throws IOException {
            JobClient.this.jobSubmitClient.killJob(this.getJobID());
        }

        public synchronized TaskCompletionEvent[] getTaskCompletionEvents(int startFrom) throws IOException {
            return JobClient.this.jobSubmitClient.getTaskCompletionEvents(this.getJobID(), startFrom, 10);
        }

        public String toString() {
            try {
                this.ensureFreshStatus();
            }
            catch (IOException iOException) {
                // empty catch block
            }
            return "Job: " + this.profile.getJobId() + "\n" + "file: " + this.profile.getJobFile() + "\n" + "tracking URL: " + this.profile.getURL() + "\n" + "map() completion: " + this.status.mapProgress() + "\n" + "reduce() completion: " + this.status.reduceProgress();
        }

        public Counters getCounters() throws IOException {
            return JobClient.this.jobSubmitClient.getJobCounters(this.getJobID());
        }
    }

    /*
     * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
     */
    public static enum TaskStatusFilter {
        NONE,
        FAILED,
        SUCCEEDED,
        ALL;

    }
}

