001 /* 002 Copyright (c) 2003, The Regents of the University of California, through 003 Lawrence Berkeley National Laboratory (subject to receipt of any required 004 approvals from the U.S. Dept. of Energy). All rights reserved. 005 */ 006 package gov.lbl.dsd.sea; 007 008 import gov.lbl.dsd.sea.event.ExceptionEvent; 009 010 import java.util.Date; 011 import java.util.Timer; 012 import java.util.TimerTask; 013 014 import EDU.oswego.cs.dl.util.concurrent.Executor; 015 import EDU.oswego.cs.dl.util.concurrent.PooledExecutor; 016 import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor; 017 018 /** 019 * A stage has an input queue into which another stage puts events; A stage has 020 * one or more threads, which take events from the input queue and hand them to 021 * the stage's event handler which processes the events in a non-blocking 022 * asynchronous manner. 023 * <p> 024 * Note on exception handling: If a {@link RuntimeException}is raised within 025 * {@link EventHandler#handle(Object)}of this stage's event handler, then this 026 * stage's ExceptionHandler is called, which should handle the exception in an 027 * appropriate application specific manner. If no ExceptionHandler is defined 028 * (because it is <code>null</code>) the exception is simply rethrown. 029 * 030 * @author whoschek@lbl.gov 031 * @author $Author: gegles $ 032 * @version $Revision: 1.18 $, $Date: 2004/09/16 16:57:15 $ 033 */ 034 public class Stage { 035 036 private final String name; // the name of this stage 037 private final EventHandler handler; // user provided object handling events 038 private final ExceptionHandler exceptionHandler; // called when EventHandler.handle throw an exception 039 040 private Timer timer; // queues and delivers future events for enqueue(Event, Date) 041 private Executor executor; // thread handling policy 042 private boolean isStarted = false; 043 private ExecutorFactory executorFactory; // used to create a new executor when start is called 044 045 private static final org.apache.commons.logging.Log log = org.apache.commons.logging.LogFactory.getLog(Stage.class); 046 047 /** 048 * Creates a new stage with the given name, event handler, executor factory and exceptionHandler. 049 */ 050 public Stage(String stageName, EventHandler handler, ExecutorFactory executorFactory, ExceptionHandler exceptionHandler) { 051 this.name = stageName; 052 this.executorFactory = executorFactory; 053 this.exceptionHandler = exceptionHandler; 054 this.executor = null; // do not create an executor until start is being called 055 this.timer = null; // do not start timer thread unless it is needed via enqueue(Event, Date) 056 057 this.handler = handler; 058 handler.setStage(this); 059 } 060 061 /** 062 * Returns the name of this stage. 063 */ 064 public String getName() { 065 return this.name; 066 } 067 068 /** 069 * Returns a string representation of the receiver. 070 */ 071 public String toString() { 072 return this.getName(); 073 } 074 075 /** 076 * Initializes the stage and its event handler; Must be called before enqueuing events (otherwise enqueues are ignored); 077 * This method can be called at any time but only has an effect only if the stage is stopped. 078 * 079 * @return <code>this</code> (for convenient call chaining only) 080 */ 081 public Stage start() { 082 synchronized (this) { 083 if (this.isStarted) { 084 log.warn("start: improper but mostly harmless state"); 085 return this; 086 } 087 this.executor = this.executorFactory.createExecutor(); 088 this.isStarted = true; 089 } 090 091 if (log.isDebugEnabled()) 092 log.debug(getName() + ": now initializing handler..."); 093 094 handler.onStart(); 095 096 if (log.isDebugEnabled()) 097 log.debug(getName() + ": sucessfullly initialized handler."); 098 099 return this; 100 } 101 102 /** 103 * Cleans up this stage and its event handler, as well as related threads; 104 * Once a stage has been destroyed it can be initialized again by using start; 105 * This method can be called at any time but only has an effect if the stage is started. 106 */ 107 public void stop() { 108 synchronized (this) { 109 if (!this.isStarted) { 110 log.warn("stop: improper but mostly harmless state"); 111 return; 112 } 113 this.isStarted = false; 114 115 if (log.isDebugEnabled()) 116 log.debug(getName() + ": now shutting down..."); 117 118 if (this.timer != null) { 119 this.timer.cancel(); // throws away scheduled future events 120 } 121 122 if (this.executor instanceof QueuedExecutor) { 123 ((QueuedExecutor) this.executor).shutdownAfterProcessingCurrentlyQueuedTasks(); 124 //((QueuedExecutor) this.executor).shutdownNow(); 125 } 126 127 else if (this.executor instanceof PooledExecutor) { 128 boolean interruptAll = false; 129 PooledExecutor pooledExec = (PooledExecutor) this.executor; 130 //pooledExec.shutdownNow(); 131 pooledExec.shutdownAfterProcessingCurrentlyQueuedTasks(); 132 133 if (interruptAll) pooledExec.interruptAll(); 134 try { 135 log.debug(getName() + ": awaiting termination..."); 136 ((PooledExecutor) this.executor).awaitTerminationAfterShutdown(); 137 log.debug(getName() + ": terminated with success."); 138 } catch (InterruptedException e) {} 139 } 140 this.executor = null; 141 } 142 143 this.handler.onStop(); 144 145 if (log.isDebugEnabled()) 146 log.debug(getName() + ": successfully shut down."); 147 } 148 149 /** 150 * Enqueues the given event onto this stage. 151 */ 152 public void enqueue(Object event) { 153 try { 154 this.enqueueInterruptable(event); 155 } catch (InterruptedException e) { 156 throw new RuntimeException(e); 157 } 158 } 159 160 /** 161 * Schedules the given event to be enqueued onto this stage at the given (future) time. 162 */ 163 public void enqueue(final Object event, Date date) { 164 if (log.isTraceEnabled()) 165 log.trace(getName() + ": scheduling event to be queued at future time=" + date + ", now=" + new Date() + ", event=" + event); 166 167 this.getTimer().schedule( 168 new TimerTask() { // anonymous inner class 169 public void run() { 170 if (log.isTraceEnabled()) 171 log.trace(getName() + ": timer triggered; now queueing event=" + event); 172 173 enqueue(event); 174 } 175 } 176 , date); 177 } 178 179 /** 180 * Enqueues the given event onto this stage. 181 * <p> 182 * On an exception, we also log the stack trace of the thread enqueueing 183 * the event, which is VERY helpful for debugging purposes, because one can 184 * actually see the stack trace of the methods and thread that enqueued the 185 * event causing problems. 186 */ 187 protected void enqueueInterruptable(final Object event) throws InterruptedException { 188 synchronized (this) { 189 if (!this.isStarted) { 190 log.warn(getName() + ": enqueue: improper but mostly harmless state"); 191 return; 192 } 193 } 194 195 // save stack trace of current thread for potential later use (TRICK) 196 final RuntimeException stackTraceOfEnqueueingThread = new RuntimeException("Failure in EventHandler.handle(event)"); 197 198 this.executor.execute( 199 new Runnable() { 200 public void run() { 201 if (log.isTraceEnabled()) 202 log.trace("ENTER event handling [" + getName() + "], event=" + event); 203 204 try { 205 206 handler.handle(event); 207 208 } catch (RuntimeException e) { 209 log.error("Oopsla: stackTraceOfEnqueueingThread: ", stackTraceOfEnqueueingThread); 210 log.error("Oopsla: ", e instanceof ExceptionEvent ? e : new ExceptionEvent(e, event, Stage.this)); 211 if (exceptionHandler == null) { 212 throw e; 213 } 214 else { 215 exceptionHandler.onException(e, event, Stage.this); 216 } 217 } catch (Error e) { 218 log.error("Oopsla: stackTraceOfEnqueueingThread: ", stackTraceOfEnqueueingThread); 219 log.error("Oopsla: ", new ExceptionEvent(e, event, Stage.this)); 220 throw e; 221 } 222 finally { 223 if (log.isTraceEnabled()) 224 log.trace("EXIT event handling [" + getName() + "], event=" + event); 225 } 226 } 227 } 228 ); 229 } 230 231 protected synchronized Timer getTimer() { 232 if (this.timer == null) this.timer = new Timer(); 233 return this.timer; 234 } 235 236 }