/*
 * Decompiled with CFR 0.152.
 */
package com.eucalyptus.util.async;

import com.eucalyptus.component.ServiceConfiguration;
import com.eucalyptus.component.Topology;
import com.eucalyptus.http.MappingHttpRequest;
import com.eucalyptus.http.MappingHttpResponse;
import com.eucalyptus.records.EventClass;
import com.eucalyptus.records.EventRecord;
import com.eucalyptus.records.EventType;
import com.eucalyptus.records.Logs;
import com.eucalyptus.util.Exceptions;
import com.eucalyptus.util.async.AsyncRequest;
import com.eucalyptus.util.async.CheckedListenableFuture;
import com.eucalyptus.util.async.ConnectionException;
import com.eucalyptus.util.async.FailedRequestException;
import com.eucalyptus.util.async.NoResponseException;
import com.eucalyptus.util.async.RequestHandler;
import com.eucalyptus.util.async.RetryableConnectionException;
import com.eucalyptus.util.async.UnknownMessageTypeException;
import com.eucalyptus.ws.EucalyptusRemoteFault;
import com.eucalyptus.ws.WebServices;
import edu.ucsb.eucalyptus.msgs.BaseMessage;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.log4j.Logger;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelException;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelHandler;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.handler.codec.http.HttpMethod;
import org.jboss.netty.handler.codec.http.HttpRequest;
import org.jboss.netty.handler.codec.http.HttpVersion;

public class AsyncRequestHandler<Q extends BaseMessage, R extends BaseMessage>
implements RequestHandler<Q, R> {
    private static Logger LOG = Logger.getLogger(AsyncRequestHandler.class);
    private final AsyncRequest<Q, R> parent;
    private ClientBootstrap clientBootstrap;
    private ChannelFuture connectFuture;
    private final AtomicBoolean writeComplete = new AtomicBoolean(false);
    private final CheckedListenableFuture<R> response;
    private transient AtomicReference<Q> request = new AtomicReference<Object>(null);

    AsyncRequestHandler(AsyncRequest<Q, R> parent, CheckedListenableFuture<R> response) {
        this.parent = parent;
        this.response = response;
    }

    @Override
    public boolean fire(final ServiceConfiguration config, Q request) {
        if (!this.request.compareAndSet(null, request)) {
            LOG.warn((Object)("Duplicate write attempt for request: " + ((BaseMessage)this.request.get()).getClass().getSimpleName()));
            return false;
        }
        InetSocketAddress serviceSocketAddress = config.getSocketAddress();
        final ChannelPipelineFactory factory = config.getComponentId().getClientPipeline();
        try {
            this.clientBootstrap = WebServices.clientBootstrap(new ChannelPipelineFactory(){

                public ChannelPipeline getPipeline() throws Exception {
                    ChannelPipeline pipeline = factory.getPipeline();
                    pipeline.addLast("request-handler", (ChannelHandler)AsyncRequestHandler.this);
                    return pipeline;
                }
            });
            Logs.extreme().debug((Object)EventRecord.here(request.getClass(), EventClass.SYSTEM_REQUEST, EventType.CHANNEL_OPENING, request.getClass().getSimpleName(), ((BaseMessage)request).getCorrelationId(), ((Object)serviceSocketAddress).toString()));
            this.connectFuture = this.clientBootstrap.connect((SocketAddress)serviceSocketAddress);
            MappingHttpRequest httpRequest = new MappingHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.POST, config, this.request.get());
            this.connectFuture.addListener(new ChannelFutureListener((BaseMessage)request, serviceSocketAddress, httpRequest){
                final /* synthetic */ BaseMessage val$request;
                final /* synthetic */ SocketAddress val$serviceSocketAddress;
                final /* synthetic */ HttpRequest val$httpRequest;
                {
                    this.val$request = baseMessage;
                    this.val$serviceSocketAddress = socketAddress;
                    this.val$httpRequest = httpRequest;
                }

                public void operationComplete(ChannelFuture future) throws Exception {
                    try {
                        if (future.isSuccess()) {
                            Logs.extreme().debug((Object)("Connected as: " + future.getChannel().getLocalAddress()));
                            InetAddress localAddr = ((InetSocketAddress)future.getChannel().getLocalAddress()).getAddress();
                            if (!factory.getClass().getSimpleName().startsWith("GatherLog")) {
                                Topology.populateServices(config, (BaseMessage)AsyncRequestHandler.this.request.get());
                            }
                            Logs.extreme().debug((Object)EventRecord.here(this.val$request.getClass(), EventClass.SYSTEM_REQUEST, EventType.CHANNEL_OPEN, this.val$request.getClass().getSimpleName(), this.val$request.getCorrelationId(), this.val$serviceSocketAddress.toString(), "" + future.getChannel().getLocalAddress(), "" + future.getChannel().getRemoteAddress()));
                            Logs.extreme().debug((Object)this.val$httpRequest);
                            future.getChannel().write((Object)this.val$httpRequest).addListener(new ChannelFutureListener(){

                                public void operationComplete(ChannelFuture future) throws Exception {
                                    AsyncRequestHandler.this.writeComplete.set(true);
                                    Logs.extreme().debug((Object)EventRecord.here(val$request.getClass(), EventClass.SYSTEM_REQUEST, EventType.CHANNEL_WRITE, val$request.getClass().getSimpleName(), val$request.getCorrelationId(), val$serviceSocketAddress.toString(), "" + future.getChannel().getLocalAddress(), "" + future.getChannel().getRemoteAddress()));
                                }
                            });
                        } else {
                            AsyncRequestHandler.this.teardown(future.getCause());
                        }
                    }
                    catch (Exception ex) {
                        LOG.error((Object)ex, (Throwable)ex);
                        AsyncRequestHandler.this.teardown(ex);
                    }
                }
            });
            return true;
        }
        catch (Exception t) {
            LOG.error((Object)t, (Throwable)t);
            this.teardown(t);
            return false;
        }
    }

    private void teardown(Throwable t) {
        if (t == null) {
            t = new NullPointerException("teardown() called with null argument.");
        }
        this.logRequestFailure(t);
        this.response.setException(t);
        if (this.connectFuture != null) {
            this.maybeCloseChannel();
        }
    }

    private void maybeCloseChannel() {
        Channel channel;
        if (this.connectFuture.isDone() && this.connectFuture.isSuccess()) {
            Channel channel2 = this.connectFuture.getChannel();
            if (channel2 != null && channel2.isOpen()) {
                channel2.close().addListener(new ChannelFutureListener(){

                    public void operationComplete(ChannelFuture future) throws Exception {
                        EventRecord.here(((BaseMessage)AsyncRequestHandler.this.request.get()).getClass(), EventClass.SYSTEM_REQUEST, EventType.CHANNEL_CLOSED, new String[0]).trace();
                    }
                });
            } else {
                EventRecord.here(((BaseMessage)this.request.get()).getClass(), EventClass.SYSTEM_REQUEST, EventType.CHANNEL_CLOSED, "ALREADY_CLOSED").trace();
            }
        } else if (!this.connectFuture.isDone() && !this.connectFuture.cancel()) {
            LOG.error((Object)("Failed to cancel in-flight connection request: " + this.connectFuture.toString()));
            Channel channel3 = this.connectFuture.getChannel();
            if (channel3 != null) {
                channel3.close();
            }
        } else if (!this.connectFuture.isSuccess() && (channel = this.connectFuture.getChannel()) != null) {
            channel.close();
        }
    }

    private void logRequestFailure(Throwable t) {
        try {
            Logs.extreme().debug((Object)("RESULT:" + t.getMessage() + ":REQUEST:" + (this.request.get() != null ? ((BaseMessage)this.request.get()).getClass() : "REQUEST IS NULL")));
            if (Exceptions.isCausedBy(t, RetryableConnectionException.class) || Exceptions.isCausedBy(t, ConnectionException.class) || Exceptions.isCausedBy(t, IOException.class)) {
                Logs.extreme().debug((Object)("Failed request: " + ((BaseMessage)this.request.get()).toSimpleString() + " because of: " + t.getMessage()), t);
            }
        }
        catch (Exception ex) {
            Logs.extreme().error((Object)ex, (Throwable)ex);
        }
    }

    public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
        if (e instanceof MessageEvent) {
            this.messageReceived(ctx, (MessageEvent)e);
        } else if (e instanceof ChannelStateEvent) {
            ChannelStateEvent evt = (ChannelStateEvent)e;
            switch (evt.getState()) {
                case OPEN: {
                    if (!Boolean.FALSE.equals(evt.getValue())) break;
                    this.checkFinished(ctx, evt);
                    break;
                }
                case CONNECTED: {
                    if (evt.getValue() != null) break;
                    this.checkFinished(ctx, evt);
                }
            }
        } else if (e instanceof ExceptionEvent) {
            this.exceptionCaught(ctx, (ExceptionEvent)e);
        }
        ctx.sendUpstream(e);
    }

    private void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
        try {
            if (e.getMessage() instanceof MappingHttpResponse) {
                MappingHttpResponse response = (MappingHttpResponse)e.getMessage();
                try {
                    BaseMessage msg = (BaseMessage)response.getMessage();
                    if (!msg.get_return().booleanValue()) {
                        this.teardown(new FailedRequestException("Cluster response includes _return=false", msg));
                    } else {
                        this.response.set(msg);
                    }
                    e.getFuture().addListener(ChannelFutureListener.CLOSE);
                }
                catch (Exception e1) {
                    LOG.error((Object)e1, (Throwable)e1);
                    this.teardown(e1);
                }
            } else if (e.getMessage() == null) {
                NoResponseException ex = new NoResponseException("Channel received a null response.", (BaseMessage)this.request.get());
                LOG.error((Object)ex, (Throwable)ex);
                this.teardown(ex);
            } else {
                UnknownMessageTypeException ex = new UnknownMessageTypeException("Channel received a unknown response type: " + e.getMessage().getClass().getCanonicalName(), (BaseMessage)this.request.get(), e.getMessage());
                LOG.error((Object)ex, (Throwable)ex);
                this.teardown(ex);
            }
        }
        catch (Exception t) {
            LOG.error((Object)t, (Throwable)t);
            this.teardown(t);
        }
    }

    private void checkFinished(ChannelHandlerContext ctx, ChannelStateEvent evt) {
        if (this.connectFuture != null && !this.connectFuture.isSuccess() && this.connectFuture.getCause() instanceof IOException) {
            Throwable ioError = this.connectFuture.getCause();
            if (!this.writeComplete.get()) {
                this.teardown(new RetryableConnectionException("Channel was closed before the write operation could be completed: " + ioError.getMessage(), ioError, (BaseMessage)this.request.get()));
            } else {
                this.teardown(new ConnectionException("Channel was closed before the response was received: " + ioError.getMessage(), ioError, (BaseMessage)this.request.get()));
            }
        } else if (!this.writeComplete.get()) {
            this.teardown(new RetryableConnectionException("Channel was closed before the write operation could be completed", (BaseMessage)this.request.get()));
        } else if (!this.response.isDone()) {
            this.teardown(new ConnectionException("Channel was closed before the response was received.", (BaseMessage)this.request.get()));
        } else {
            this.teardown((Throwable)new ChannelException("Channel was closed before connecting."));
        }
    }

    private void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
        Throwable cause = e.getCause();
        Logs.extreme().error((Object)e, cause);
        if (cause instanceof EucalyptusRemoteFault) {
            this.response.setException(cause);
            e.getFuture().addListener(ChannelFutureListener.CLOSE);
        } else {
            this.teardown(cause);
        }
    }

    public AtomicReference<Q> getRequest() {
        return this.request;
    }

    public CheckedListenableFuture<R> getResponse() {
        return this.response;
    }

    public AtomicBoolean getWriteComplete() {
        return this.writeComplete;
    }

    public ChannelFuture getConnectFuture() {
        return this.connectFuture;
    }

    public ClientBootstrap getClientBootstrap() {
        return this.clientBootstrap;
    }
}

