/*
 * Decompiled with CFR 0.152.
 */
package com.ibm.streams.operator.window;

import com.ibm.streams.operator.state.Checkpoint;
import com.ibm.streams.operator.state.StateHandler;
import com.ibm.streams.operator.window.StreamWindow;
import com.ibm.streams.operator.window.StreamWindowEvent;
import com.ibm.streams.operator.window.StreamWindowListener;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

public abstract class StatefulWindowListener<S, T>
implements StreamWindowListener<T>,
StateHandler {
    public static final String IBM_COPYRIGHT = " Licensed Materials-Property of IBM                               5724-Y95                                                         (C) Copyright IBM Corp.  2010, 2016    All Rights Reserved.      US Government Users Restricted Rights - Use, duplication or      disclosure restricted by GSA ADP Schedule Contract with          IBM Corp.                                                                                                                        ";
    private Map<Object, S> partionedStates = Collections.synchronizedMap(new HashMap());
    private Set<Object> insertsOccurred = Collections.synchronizedSet(new HashSet());
    private Set<Object> evictionsOccurred = Collections.synchronizedSet(new HashSet());
    private Set<Object> seenInitialFull = Collections.synchronizedSet(new HashSet());
    private final StreamWindow<T> window;

    protected StatefulWindowListener(final StreamWindow<T> window) {
        this.window = window;
        if (window.getType() == StreamWindow.Type.NOT_WINDOWED) {
            throw new IllegalStateException(this.getClass().getName() + " : " + (Object)((Object)window.getType()));
        }
        if (!this.supportsWindow()) {
            throw new IllegalStateException(this.getClass().getName() + ".supportsWindow() == false, window: " + window);
        }
        window.registerListener(new StreamWindowListener<T>(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void handleEvent(StreamWindowEvent<T> event) throws Exception {
                Object partition = event.getPartition();
                switch (event.getType()) {
                    case INSERTION: {
                        StatefulWindowListener.this.insertsOccurred.add(partition);
                        StatefulWindowListener.this.handleEvent(event);
                        break;
                    }
                    case EVICTION: {
                        try {
                            StatefulWindowListener.this.handleEvent(event);
                            break;
                        }
                        finally {
                            if (window.getType() == StreamWindow.Type.TUMBLING) {
                                StatefulWindowListener.this.partionedStates.put(partition, StatefulWindowListener.this.getInitializedState(partition, StatefulWindowListener.this.getPartitionState(partition)));
                                StatefulWindowListener.this.insertsOccurred.remove(partition);
                                return;
                            }
                            StatefulWindowListener.this.evictionsOccurred.add(partition);
                        }
                    }
                    case TRIGGER: {
                        try {
                            StatefulWindowListener.this.handleEvent(event);
                            break;
                        }
                        finally {
                            StatefulWindowListener.this.insertsOccurred.remove(partition);
                            StatefulWindowListener.this.evictionsOccurred.remove(partition);
                        }
                    }
                    case INITIAL_FULL: {
                        StatefulWindowListener statefulWindowListener = StatefulWindowListener.this;
                        synchronized (statefulWindowListener) {
                            StatefulWindowListener.this.seenInitialFull.add(partition);
                        }
                    }
                    case FINAL: {
                        StatefulWindowListener.this.handleEvent(event);
                        break;
                    }
                    case PARTITION_EVICTION: {
                        try {
                            StatefulWindowListener.this.handleEvent(event);
                            break;
                        }
                        finally {
                            StatefulWindowListener.this.partionedStates.remove(partition);
                            StatefulWindowListener.this.insertsOccurred.remove(partition);
                            StatefulWindowListener.this.evictionsOccurred.remove(partition);
                            StatefulWindowListener.this.seenInitialFull.remove(partition);
                        }
                    }
                }
            }
        }, false);
    }

    public final StreamWindow<T> getWindow() {
        return this.window;
    }

    protected boolean supportsWindow() {
        return true;
    }

    protected abstract S getInitializedState(Object var1, S var2);

    protected final S getPartitionState(Object partition) {
        S state = this.partionedStates.get(partition);
        if (state == null) {
            state = this.getInitializedState(partition, null);
            this.partionedStates.put(partition, state);
        }
        return state;
    }

    protected final boolean insertionsOccurred(Object partition) {
        return this.insertsOccurred.contains(partition);
    }

    protected final boolean evictionsOccurred(Object partition) {
        return this.evictionsOccurred.contains(partition);
    }

    protected boolean seenInitialFull(Object partition) {
        return this.seenInitialFull.contains(partition);
    }

    @Override
    public void close() throws IOException {
    }

    @Override
    public void drain() throws Exception {
    }

    @Override
    public void checkpoint(Checkpoint checkpoint) throws Exception {
        ObjectOutputStream ckptOut = checkpoint.getOutputStream();
        ckptOut.writeObject(this.evictionsOccurred);
        ckptOut.writeObject(this.insertsOccurred);
        ckptOut.writeObject(this.partionedStates);
        ckptOut.writeObject(this.seenInitialFull);
    }

    @Override
    public void resetToInitialState() throws Exception {
        this.evictionsOccurred.clear();
        this.insertsOccurred.clear();
        this.partionedStates.clear();
        this.seenInitialFull.clear();
    }

    @Override
    public void reset(Checkpoint checkpoint) throws Exception {
        ObjectInputStream ckptIn = checkpoint.getInputStream();
        this.evictionsOccurred = (Set)ckptIn.readObject();
        this.insertsOccurred = (Set)ckptIn.readObject();
        this.partionedStates = (Map)ckptIn.readObject();
        this.seenInitialFull = (Set)ckptIn.readObject();
    }

    @Override
    public void retireCheckpoint(long id) throws Exception {
    }

    @Override
    public void prepareForNonBlockingCheckpoint(long id) throws Exception {
    }

    @Override
    public void regionCheckpointed(long id) throws Exception {
    }
}

