/*
 * Decompiled with CFR 0.152.
 */
package com.eucalyptus.util;

import com.eucalyptus.configurable.ConfigurableClass;
import com.eucalyptus.configurable.ConfigurableField;
import com.eucalyptus.util.EucalyptusCloudException;
import java.io.IOException;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.log4j.Logger;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBufferInputStream;

@ConfigurableClass(root="objectstorage", description="Streaming upload channel configuration.")
public class ChannelBufferStreamingInputStream
extends ChannelBufferInputStream {
    private ChannelBuffer b;
    private LinkedBlockingQueue<ChannelBuffer> buffers = new LinkedBlockingQueue(QUEUE_SIZE);
    private int bytesRead;
    @ConfigurableField(description="Channel buffer queue size for uploads", displayName="objectstorage.uploadqueuesize")
    public static int QUEUE_SIZE = 128;
    @ConfigurableField(description="Channel buffer queue timeout (in seconds)", displayName="objectstorage.uploadqueuetimeout")
    public static int QUEUE_TIMEOUT = 1;
    private static final Logger LOG = Logger.getLogger(ChannelBufferStreamingInputStream.class);

    public boolean markSupported() {
        return super.markSupported();
    }

    public int available() throws IOException {
        if (this.b == null) {
            return 0;
        }
        int currentlyAvailable = this.b.readableBytes();
        if (currentlyAvailable <= 0) {
            int retries = 0;
            do {
                try {
                    this.b = this.buffers.poll(QUEUE_TIMEOUT, TimeUnit.SECONDS);
                    currentlyAvailable += this.b.readableBytes();
                }
                catch (InterruptedException e) {
                    LOG.error((Object)e, (Throwable)e);
                    return currentlyAvailable;
                }
            } while (this.b == null && retries++ < 60);
        }
        return currentlyAvailable;
    }

    public void mark(int readlimit) {
        super.mark(readlimit);
    }

    public synchronized int read(byte[] bytes, int off, int len) throws IOException {
        if (len > 0) {
            if (this.b == null) {
                return -1;
            }
            if (off < 0) {
                throw new IOException("Invalid offset: " + off);
            }
            if (off + len > bytes.length) {
                throw new IOException("Byte buffer is too small. Should be at least: " + (off + len));
            }
            int readSoFar = 0;
            int readable = 0;
            int toReadFromThisBuffer = 0;
            while (len > 0) {
                readable = this.b.readableBytes();
                toReadFromThisBuffer = 0;
                if (readable > 0) {
                    toReadFromThisBuffer = len > readable ? readable : len;
                    this.b.readBytes(bytes, off, toReadFromThisBuffer);
                    len -= toReadFromThisBuffer;
                    off += (readSoFar += toReadFromThisBuffer);
                    continue;
                }
                try {
                    int retries = 0;
                    do {
                        this.b = this.buffers.poll(QUEUE_TIMEOUT, TimeUnit.SECONDS);
                    } while (this.b == null && retries++ < 60);
                    if (this.b != null) continue;
                    LOG.error((Object)"No more data in this stream");
                    this.bytesRead += readSoFar;
                    return readSoFar;
                }
                catch (InterruptedException e) {
                    LOG.error((Object)e, (Throwable)e);
                    this.bytesRead += readSoFar;
                    return readSoFar;
                }
            }
            this.bytesRead += readSoFar;
            return readSoFar;
        }
        return 0;
    }

    public int read() throws IOException {
        return super.read();
    }

    public int readBytes() {
        return this.bytesRead;
    }

    public void reset() throws IOException {
        super.reset();
    }

    public long skip(long n) throws IOException {
        return super.skip(n);
    }

    public ChannelBufferStreamingInputStream(ChannelBuffer buffer) {
        super(buffer);
        this.b = buffer;
        this.bytesRead = 0;
        try {
            boolean success = false;
            int retries = 0;
            while (!success && retries++ < QUEUE_TIMEOUT) {
                success = this.buffers.offer(buffer, QUEUE_TIMEOUT, TimeUnit.SECONDS);
            }
            if (!success) {
                LOG.error((Object)"Timed out writing data to stream.");
            }
        }
        catch (InterruptedException e) {
            LOG.error((Object)e, (Throwable)e);
        }
    }

    public void putChunk(ChannelBuffer input) throws InterruptedException, EucalyptusCloudException {
        boolean success = false;
        int retries = 0;
        while (!success && retries++ < QUEUE_TIMEOUT) {
            success = this.buffers.offer(input, QUEUE_TIMEOUT, TimeUnit.SECONDS);
        }
        if (!success) {
            LOG.error((Object)"Timed out writing data to stream.");
            throw new EucalyptusCloudException("Aborting upload, could not process data in time. Either increase the upload queue size or retry the upload later.");
        }
    }

    public void close() throws IOException {
        LOG.trace((Object)("Closing Channel Stream: " + this.buffers.remainingCapacity() + " " + this.buffers.size()));
        super.close();
    }
}

