001    /* 
002    Copyright (c) 2003, The Regents of the University of California, through 
003    Lawrence Berkeley National Laboratory (subject to receipt of any required 
004    approvals from the U.S. Dept. of Energy).  All rights reserved.
005    */
006    package gov.lbl.dsd.sea;
007    
008    import gov.lbl.dsd.sea.event.ExceptionEvent;
009    
010    import java.util.Date;
011    import java.util.Timer;
012    import java.util.TimerTask;
013    
014    import EDU.oswego.cs.dl.util.concurrent.Executor;
015    import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
016    import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
017    
018    /**
019     * A stage has an input queue into which another stage puts events; A stage has
020     * one or more threads, which take events from the input queue and hand them to
021     * the stage's event handler which processes the events in a non-blocking
022     * asynchronous manner.
023     * <p>
024     * Note on exception handling: If a {@link RuntimeException}is raised within
025     * {@link EventHandler#handle(Object)}of this stage's event handler, then this
026     * stage's ExceptionHandler is called, which should handle the exception in an
027     * appropriate application specific manner. If no ExceptionHandler is defined
028     * (because it is <code>null</code>) the exception is simply rethrown.
029     * 
030     * @author whoschek@lbl.gov
031     * @author $Author: gegles $
032     * @version $Revision: 1.18 $, $Date: 2004/09/16 16:57:15 $
033     */
034    public class Stage {
035    
036            private final String name; // the name of this stage
037            private final EventHandler handler; // user provided object handling events
038            private final ExceptionHandler exceptionHandler; // called when EventHandler.handle throw an exception
039            
040            private Timer timer; // queues and delivers future events for enqueue(Event, Date)  
041            private Executor executor; // thread handling policy
042            private boolean isStarted = false;
043            private ExecutorFactory executorFactory; // used to create a new executor when start is called
044            
045            private static final org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(Stage.class);
046            
047            /** 
048             * Creates a new stage with the given name, event handler, executor factory and exceptionHandler.
049             */
050            public Stage(String stageName, EventHandler handler, ExecutorFactory executorFactory, ExceptionHandler exceptionHandler) {
051                    this.name = stageName;
052                    this.executorFactory = executorFactory;
053                    this.exceptionHandler = exceptionHandler;
054                    this.executor = null; // do not create an executor until start is being called
055                    this.timer = null; // do not start timer thread unless it is needed via enqueue(Event, Date)
056                    
057                    this.handler = handler;
058                    handler.setStage(this);
059            }
060    
061            /** 
062             * Returns the name of this stage. 
063             */
064            public String getName() {
065                    return this.name;
066            }
067            
068            /**
069             * Returns a string representation of the receiver.
070             */
071            public String toString() {
072                    return this.getName();
073            }
074            
075            /** 
076             * Initializes the stage and its event handler; Must be called before enqueuing events (otherwise enqueues are ignored); 
077             * This method can be called at any time but only has an effect only if the stage is stopped.
078             * 
079             * @return <code>this</code> (for convenient call chaining only)
080             */
081            public Stage start() {
082                    synchronized (this) {
083                            if (this.isStarted) {
084                                    log.warn("start: improper but mostly harmless state");
085                                    return this;
086                            }
087                            this.executor = this.executorFactory.createExecutor();
088                            this.isStarted = true;
089                    }
090    
091                    if (log.isDebugEnabled()) 
092                            log.debug(getName() + ": now initializing handler...");
093                    
094                    handler.onStart();
095                    
096                    if (log.isDebugEnabled()) 
097                            log.debug(getName() + ": sucessfullly initialized handler.");
098                    
099                    return this;
100            }
101            
102            /** 
103             * Cleans up this stage and its event handler, as well as related threads;
104             * Once a stage has been destroyed it can be initialized again by using start;
105             * This method can be called at any time but only has an effect if the stage is started. 
106             */
107            public void stop() {
108                    synchronized (this) {
109                            if (!this.isStarted) {
110                                    log.warn("stop: improper but mostly harmless state");
111                                    return;
112                            }
113                            this.isStarted = false;
114                    
115                            if (log.isDebugEnabled())
116                                    log.debug(getName() + ": now shutting down...");
117                    
118                            if (this.timer != null) {
119                                    this.timer.cancel(); // throws away scheduled future events
120                            }
121    
122                            if (this.executor instanceof QueuedExecutor) {
123                                    ((QueuedExecutor) this.executor).shutdownAfterProcessingCurrentlyQueuedTasks();
124                                    //((QueuedExecutor) this.executor).shutdownNow();
125                            }
126                    
127                            else if (this.executor instanceof PooledExecutor) {
128                                    boolean interruptAll = false;
129                                    PooledExecutor pooledExec = (PooledExecutor) this.executor;
130                                    //pooledExec.shutdownNow();
131                                    pooledExec.shutdownAfterProcessingCurrentlyQueuedTasks();
132                            
133                                    if (interruptAll) pooledExec.interruptAll();
134                                    try {
135                                            log.debug(getName() + ": awaiting termination...");
136                                            ((PooledExecutor) this.executor).awaitTerminationAfterShutdown();
137                                            log.debug(getName() + ": terminated with success.");
138                                    } catch (InterruptedException e) {}
139                            }
140                            this.executor = null;   
141                    }
142                    
143                    this.handler.onStop();
144                    
145                    if (log.isDebugEnabled())
146                            log.debug(getName() + ": successfully shut down.");
147            }
148            
149            /**
150             * Enqueues the given event onto this stage. 
151             */
152            public void enqueue(Object event) {
153                    try {
154                            this.enqueueInterruptable(event);
155                    } catch (InterruptedException e) {
156                            throw new RuntimeException(e);
157                    }
158            }
159            
160            /** 
161             * Schedules the given event to be enqueued onto this stage at the given (future) time. 
162             */
163            public void enqueue(final Object event, Date date) {
164                    if (log.isTraceEnabled())
165                            log.trace(getName() + ": scheduling event to be queued at future time=" + date + ", now=" + new Date() + ", event=" + event);
166    
167                    this.getTimer().schedule(
168                                    new TimerTask() { // anonymous inner class
169                                            public void run() {
170                                                    if (log.isTraceEnabled())
171                                                            log.trace(getName() + ": timer triggered; now queueing event=" + event);
172                                                    
173                                                    enqueue(event);
174                                            }
175                                    }
176                                    , date);
177            }
178    
179            /**
180             * Enqueues the given event onto this stage.
181             * <p>
182             * On an exception, we also log the stack trace of the thread enqueueing
183             * the event, which is VERY helpful for debugging purposes, because one can
184             * actually see the stack trace of the methods and thread that enqueued the
185             * event causing problems.
186             */
187            protected void enqueueInterruptable(final Object event) throws InterruptedException {
188                    synchronized (this) {
189                            if (!this.isStarted) {
190                                    log.warn(getName() + ": enqueue: improper but mostly harmless state");
191                                    return;
192                            }
193                    }
194                    
195                    // save stack trace of current thread for potential later use (TRICK)
196                    final RuntimeException stackTraceOfEnqueueingThread = new RuntimeException("Failure in EventHandler.handle(event)");
197    
198                    this.executor.execute(
199                            new Runnable() {
200                                    public void run() {
201                                            if (log.isTraceEnabled())
202                                                    log.trace("ENTER event handling [" + getName() + "], event=" + event);
203                                            
204                                            try {
205                                                    
206                                                    handler.handle(event);
207                                                    
208                                            } catch (RuntimeException e) {
209                                                    log.error("Oopsla: stackTraceOfEnqueueingThread: ", stackTraceOfEnqueueingThread);
210                                                    log.error("Oopsla: ", e instanceof ExceptionEvent ? e : new ExceptionEvent(e, event, Stage.this));
211                                                    if (exceptionHandler == null) {
212                                                            throw e;
213                                                    }
214                                                    else {
215                                                            exceptionHandler.onException(e, event, Stage.this);
216                                                    }
217                                            } catch (Error e) {
218                                                    log.error("Oopsla: stackTraceOfEnqueueingThread: ", stackTraceOfEnqueueingThread);
219                                                    log.error("Oopsla: ", new ExceptionEvent(e, event, Stage.this));
220                                                    throw e;
221                                            }
222                                            finally {
223                                                    if (log.isTraceEnabled())
224                                                            log.trace("EXIT event handling [" + getName() + "], event=" + event);
225                                            }
226                                    }
227                            }
228                    );
229            }
230    
231            protected synchronized Timer getTimer() {
232                    if (this.timer == null) this.timer = new Timer();
233                    return this.timer;
234            }
235            
236    }