/*
 * Decompiled with CFR 0.152.
 */
package com.eucalyptus.simpleworkflow.stateful;

import com.eucalyptus.bootstrap.Bootstrap;
import com.eucalyptus.component.Topology;
import com.eucalyptus.component.annotation.ComponentNamed;
import com.eucalyptus.context.Context;
import com.eucalyptus.context.Contexts;
import com.eucalyptus.context.NoSuchContextException;
import com.eucalyptus.event.ClockTick;
import com.eucalyptus.event.EventListener;
import com.eucalyptus.event.Listeners;
import com.eucalyptus.simpleworkflow.stateful.NotifyResponseType;
import com.eucalyptus.simpleworkflow.stateful.NotifyType;
import com.eucalyptus.simpleworkflow.stateful.PollForNotificationResponseType;
import com.eucalyptus.simpleworkflow.stateful.PollForNotificationType;
import com.eucalyptus.simpleworkflow.stateful.PolledNotificationChecker;
import com.eucalyptus.simpleworkflow.stateful.PolledNotificationCheckerDiscovery;
import com.eucalyptus.simpleworkflow.stateful.PolledNotifications;
import com.eucalyptus.util.EucalyptusCloudException;
import com.eucalyptus.util.async.CheckedListenableFuture;
import com.eucalyptus.util.async.Futures;
import com.google.common.base.Objects;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
import com.google.common.collect.Lists;
import edu.ucsb.eucalyptus.msgs.BaseMessage;
import java.util.ArrayList;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.Future;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.log4j.Logger;
import org.jboss.netty.channel.local.LocalChannel;

@ComponentNamed
public class PolledNotificationService {
    private static final Logger logger = Logger.getLogger(PolledNotificationService.class);
    private static final ConcurrentMap<String, Pollers> pollersByChannel = new ConcurrentHashMap<String, Pollers>();
    private static final ConcurrentMap<String, PendingNotification> pendingNotificationsByChannel = new ConcurrentHashMap<String, PendingNotification>();
    private static final PolledNotificationChecker checker = new PolledNotificationChecker(){

        public boolean apply(String channel) {
            return Predicates.or((Iterable)((Iterable)PolledNotificationCheckerDiscovery.supplier().get())).apply((Object)channel);
        }
    };

    public NotifyResponseType submitNotify(NotifyType notify) throws EucalyptusCloudException {
        NotifyResponseType response = (NotifyResponseType)notify.getReply();
        Context context = Contexts.lookup();
        if (context.hasAdministrativePrivileges() && !PolledNotificationService.notifyPollers(notify.getChannel(), notify.getDetails())) {
            pendingNotificationsByChannel.put(notify.getChannel(), new PendingNotification(notify.getChannel(), notify.getDetails()));
        }
        return response;
    }

    public PollForNotificationResponseType pollForNotification(PollForNotificationType poll) throws EucalyptusCloudException {
        Context context = Contexts.lookup();
        if (context.hasAdministrativePrivileges()) {
            Future<PollForNotificationResponseType> response = PolledNotificationService.addPoller(poll.getChannel(), new Poller(poll.getChannel(), poll.getCorrelationId()));
            PolledNotificationService.checkNotify(poll.getChannel());
            try {
                return response.get();
            }
            catch (Exception e) {
                return (PollForNotificationResponseType)poll.getReply();
            }
        }
        return (PollForNotificationResponseType)poll.getReply();
    }

    private static void checkNotify(String channel) {
        PendingNotification pendingNotification = (PendingNotification)pendingNotificationsByChannel.remove(channel);
        if (pendingNotification != null && !pendingNotification.isExpired(System.currentTimeMillis())) {
            PolledNotificationService.notifyPollers(channel, pendingNotification.getDetails());
        } else if (checker.apply(channel)) {
            PolledNotificationService.notifyPollers(channel, null);
        }
    }

    private static boolean notifyPollers(String channel, String details) {
        return PolledNotificationService.getPollers(channel).notifyPollers(details);
    }

    private static Future<PollForNotificationResponseType> addPoller(String channel, Poller poller) {
        PolledNotificationService.getPollers(channel).addPoller(poller);
        return poller.getFuture();
    }

    private static Pollers getPollers(String channel) {
        Pollers pollers = (Pollers)pollersByChannel.get(channel);
        if (pollers != null) {
            pollers.touch();
            pollers = (Pollers)pollersByChannel.get(channel);
        }
        if (pollers == null) {
            pollersByChannel.putIfAbsent(channel, new Pollers(channel));
            pollers = (Pollers)pollersByChannel.get(channel);
        }
        return pollers;
    }

    static void evacuate() {
        PolledNotificationService.timeoutPollers(Long.MAX_VALUE);
    }

    private static void periodicWork() {
        long time = System.currentTimeMillis();
        PolledNotificationService.timeoutPollers(time);
        PolledNotificationService.timeoutPollerMetadata(time);
        PolledNotificationService.timeoutPendingNotifications(time);
    }

    private static void timeoutPollers(long time) {
        for (Pollers pollers : pollersByChannel.values()) {
            pollers.notifyExpiredPollers(time);
        }
    }

    private static void timeoutPollerMetadata(long time) {
        for (Pollers pollers : pollersByChannel.values()) {
            if (!pollers.isEmpty() || !pollers.isExpired(time)) continue;
            pollersByChannel.remove(pollers.getChannel(), pollers);
        }
    }

    private static void timeoutPendingNotifications(long time) {
        for (PendingNotification pendingNotification : pendingNotificationsByChannel.values()) {
            if (!pendingNotification.isExpired(time)) continue;
            pendingNotificationsByChannel.remove(pendingNotification.getChannel(), pendingNotification);
        }
    }

    public static class PollerClockTickEventListener
    implements EventListener<ClockTick> {
        public static void register() {
            Listeners.register(ClockTick.class, (EventListener)new PollerClockTickEventListener());
        }

        public void fireEvent(ClockTick event) {
            if (Bootstrap.isOperational().booleanValue()) {
                if (!Topology.isEnabledLocally(PolledNotifications.class)) {
                    PolledNotificationService.evacuate();
                }
                PolledNotificationService.periodicWork();
            }
        }
    }

    private static final class Poller {
        private static final long EXPIRY_MILLIS = TimeUnit.SECONDS.toMillis(30L);
        private final long timestamp = System.currentTimeMillis();
        private final String channel;
        private final String correlationId;
        private final CheckedListenableFuture<PollForNotificationResponseType> future;

        private Poller(String channel, String correlationId) {
            this.channel = channel;
            this.correlationId = correlationId;
            this.future = Futures.newGenericeFuture();
            try {
                if (!(Contexts.lookup((String)correlationId).getChannel() instanceof LocalChannel)) {
                    this.future.set(null);
                }
            }
            catch (NoSuchContextException noSuchContextException) {
                // empty catch block
            }
        }

        public long getTimestamp() {
            return this.timestamp;
        }

        public String getChannel() {
            return this.channel;
        }

        public String getCorrelationId() {
            return this.correlationId;
        }

        public Future<PollForNotificationResponseType> getFuture() {
            return this.future;
        }

        public void response(PollForNotificationResponseType response) {
            if (!this.future.isDone()) {
                this.future.set((Object)response);
            } else {
                Contexts.response((BaseMessage)response);
            }
        }

        public boolean isExpired(long time) {
            return this.timestamp + EXPIRY_MILLIS < time;
        }

        public String toString() {
            return Objects.toStringHelper((Object)this).add("channel", (Object)this.getChannel()).add("correlationId", (Object)this.getCorrelationId()).toString();
        }
    }

    private static final class PendingNotification {
        private static final long EXPIRY_MILLIS = TimeUnit.MINUTES.toMillis(1L);
        private final long timestamp = System.currentTimeMillis();
        private final String channel;
        private final String details;

        private PendingNotification(String channel, String details) {
            this.channel = channel;
            this.details = details;
        }

        public long getTimestamp() {
            return this.timestamp;
        }

        public String getChannel() {
            return this.channel;
        }

        public String getDetails() {
            return this.details;
        }

        public boolean isExpired(long time) {
            return this.timestamp + EXPIRY_MILLIS < time;
        }

        public String toString() {
            return Objects.toStringHelper((Object)this).add("channel", (Object)this.getChannel()).add("details", (Object)this.getDetails()).add("timestamp", this.getTimestamp()).toString();
        }
    }

    private static final class Pollers {
        private static final long EXPIRY_MILLIS = TimeUnit.MINUTES.toMillis(2L);
        private final AtomicLong timestamp = new AtomicLong(System.currentTimeMillis());
        private final BlockingQueue<Poller> pollersQueue = new LinkedBlockingDeque<Poller>();
        private final String channel;

        private Pollers(String channel) {
            this.channel = channel;
        }

        public String getChannel() {
            return this.channel;
        }

        public void addPoller(Poller poller) {
            this.touch();
            this.pollersQueue.add(poller);
        }

        public boolean notifyPollers(final String details) {
            return this.notifyPollers(this.queuedPollers(), new Predicate<PollForNotificationResponseType>(){

                public boolean apply(PollForNotificationResponseType response) {
                    response.setNotified(Boolean.valueOf(true));
                    response.setDetails(details);
                    return true;
                }
            });
        }

        public void notifyExpiredPollers(long time) {
            this.notifyPollers(this.expiredPollers(time), new Predicate<PollForNotificationResponseType>(){

                public boolean apply(PollForNotificationResponseType response) {
                    response.setNotified(Boolean.valueOf(false));
                    return true;
                }
            });
        }

        public boolean isEmpty() {
            return this.pollersQueue.isEmpty();
        }

        public boolean isExpired(long time) {
            return this.timestamp.get() + EXPIRY_MILLIS < time;
        }

        private boolean notifyPollers(Iterable<Poller> pollers, Predicate<PollForNotificationResponseType> responsePredicate) {
            this.touch();
            boolean notified = false;
            for (Poller poller : pollers) {
                try {
                    PollForNotificationResponseType response = new PollForNotificationResponseType();
                    response.setCorrelationId(poller.getCorrelationId());
                    if (!responsePredicate.apply((Object)response)) continue;
                    poller.response(response);
                    notified = true;
                }
                catch (Exception e) {
                    logger.error((Object)("Error notifying poller " + poller), (Throwable)e);
                }
            }
            return notified;
        }

        private Iterable<Poller> queuedPollers() {
            ArrayList pollers = Lists.newArrayList();
            this.pollersQueue.drainTo(pollers);
            return pollers;
        }

        private Iterable<Poller> expiredPollers(long time) {
            ArrayList pollers = Lists.newArrayList();
            for (Poller poller : this.pollersQueue) {
                if (!poller.isExpired(time) || !this.pollersQueue.remove(poller)) continue;
                pollers.add(poller);
            }
            return pollers;
        }

        private void touch() {
            this.timestamp.set(System.currentTimeMillis());
        }

        public String toString() {
            return Objects.toStringHelper((Object)this).add("channel", (Object)this.getChannel()).add("pollers", this.pollersQueue).add("timestamp", this.timestamp.get()).toString();
        }
    }
}

