001    /* 
002    Copyright (c) 2003, The Regents of the University of California, through 
003    Lawrence Berkeley National Laboratory (subject to receipt of any required 
004    approvals from the U.S. Dept. of Energy).  All rights reserved.
005    */
006    package gov.lbl.dsd.sea;
008    import java.util.Iterator;
009    import java.util.LinkedHashMap;
010    import java.util.Map;
012    import EDU.oswego.cs.dl.util.concurrent.DirectExecutor;
013    import EDU.oswego.cs.dl.util.concurrent.Executor;
014    import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
015    import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
016    import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
017    import EDU.oswego.cs.dl.util.concurrent.ThreadedExecutor;
020    /**
021     * Creates and manages a set of stages;
022     * Contains several default factories for commonly used threading
023     * policies. By far the most commonly used are <code>QUEUED</code>,
024     * <code>POOLED</code> and <code>DIRECT</code>.
025     * 
026     * @author whoschek@lbl.gov
027     * @author $Author: gegles $
028     * @version $Revision: 1.17 $, $Date: 2004/09/16 16:57:15 $
029     */
030    public class StageManager {
032            //private static StageManager queuedManager; 
033            protected final Map stages; // the stages created by this class
034            protected final ExecutorFactory executorFactory;
036            /** 
037             * Creates an instance with the default threading policy. 
038             */
039            public StageManager() {
040                    this(QUEUED);
041            }
043            /** 
044             * Creates an instance with the given threading policy. 
045             */
046            public StageManager(ExecutorFactory executorFactory) {
047                    if (executorFactory == null) throw new IllegalArgumentException("executor factory must not be null");
048                    this.executorFactory = executorFactory;
049                    this.stages = new LinkedHashMap();
050            }
052            /** 
053             * Adds the given stage to the set of managed stages.
054             * @return the same stage (for convenience)
055             */
056            public synchronized Stage addStage(Stage stage) {
057                    String stageName = stage.getName();
058                    if (this.getStage(stageName) != null) {
059                            throw new IllegalArgumentException("stage=" + stageName + " already added previously.");
060                    }
062                    this.stages.put(stageName, stage);
063                    return stage;
064            }
066            /** 
067             * Creates and returns a managed stage with the given event handler. 
068             */
069            public synchronized Stage createStage(EventHandler handler) {
070                    return this.createStage(handler.getClass().getName() + "#" + this.stages.size(), handler);
071            }
073            /** 
074             * Creates and returns a managed stage with the given name and event handler. 
075             */
076            public Stage createStage(String stageName, EventHandler handler) {
077                    return this.createStage(stageName, handler, createExceptionHandler());
078            }
080            /** 
081             * Creates and returns a managed stage with the given name, event handler and exceptionHandler. 
082             */
083            public Stage createStage(String stageName, EventHandler handler, ExceptionHandler exceptionHandler) {
084                    return this.createStage(stageName, handler, exceptionHandler, this.executorFactory);
085            }
087            /** 
088             * Creates and returns a managed stage with the given name, event handler, exceptionHandler and executor. 
089             */
090            protected Stage createStage(String stageName, EventHandler handler, ExceptionHandler exceptionHandler, ExecutorFactory executorFactory) {
091                    Stage stage = new Stage(stageName, handler, executorFactory, exceptionHandler);
092                    this.addStage(stage);
093                    return stage;
094            }
096            /** 
097             * Override this method for custom exception handlers. 
098             */
099            protected ExceptionHandler createExceptionHandler() {
100                    return null;
101            }
103            /** 
104             * Starts all managed stages.
105             */
106            public synchronized void startAll() {
107                    Iterator iter = this.stages.values().iterator();
108                    while (iter.hasNext()) {
109                            Stage stage = (Stage) iter.next();
110                            stage.start();
111                    }
112            }
114            /** 
115             * Stops all managed stages.
116             */
117            public synchronized void stopAll() {
118                    Iterator iter = this.stages.values().iterator();
119                    while (iter.hasNext()) {
120                            Stage stage = (Stage) iter.next();
121                            stage.stop();
122                    }
123            }
125    //      /**
126    //       * Returns the global queued default instance of this class. 
127    //       */
128    //      public static synchronized StageManager getManager() {
129    //              if (queuedManager == null) queuedManager = new StageManager(QUEUED);
130    //              return queuedManager;
131    //      }
133            /** 
134             * Returns the managed stage with the given name, 
135             * or <code>null</code> if no such stage is currently managed.
136             */
137            public synchronized Stage getStage(String stageName) {
138                    return (Stage) this.stages.get(stageName);
139            }
141            /**
142             * Returns a string representation of the receiver.
143             */
144            public synchronized String toString() {
145                    return super.toString() + ": execFactory=" + this.executorFactory +
146                                    ", stages=" + this.stages.values().toString();
147            }
150    ////////////////////////////////////////////////////////////////
151    // nested classes
152    ////////////////////////////////////////////////////////////////
154            /**
155             * Creates and returns {@link DirectExecutor} instances.
156             */
157            public static final ExecutorFactory DIRECT = new Direct();
160            private static class Direct implements ExecutorFactory {                
161                    public Executor createExecutor() {
162                            return new DirectExecutor();
163                    }
164            }
166            /**
167             * Creates and returns {@link QueuedExecutor} instances.
168             */
169            public static final ExecutorFactory QUEUED = new Queued();
171            private static class Queued implements ExecutorFactory {                
172                    public Executor createExecutor() {
173                            return new QueuedExecutor(new LinkedQueue());
174    //                      Channel ch; 
175    //                      ch = new LinkedQueue();
176    //                      //ch = new BoundedLinkedQueue(100);
177    //                      //ch = new BoundedPriorityQueue(100);
178    //                      //ch = new Slot();
179    //                      
180    //                      Executor exec;
181    //                      exec = new QueuedExecutor(ch); 
182    //                      //exec = new DirectExecutor();
183    //                      //exec = new ThreadedExecutor();
184    //                      //exec = new PooledExecutor(ch);
185    //                      //exec = new LockedExecutor(new Mutex());
186    //                      return exec;
187                    }
188            }
190            /**
191             * Creates and returns {@link ThreadedExecutor} instances.
192             */
193            public static final ExecutorFactory THREADED = new Threaded();
195            private static class Threaded implements ExecutorFactory {              
196                    public Executor createExecutor() {
197                            return new ThreadedExecutor();
198                    }
199            }
201            /**
202             * Creates and returns {@link PooledExecutor} instances.
203             */
204            public static final ExecutorFactory POOLED = new Pooled();
206            private static class Pooled implements ExecutorFactory {                
207                    public Executor createExecutor() {
208                            PooledExecutor pexec = new PooledExecutor(new LinkedQueue());
209                            pexec.setKeepAliveTime(1000);
210                            return pexec;
211                    }
212            }
214    }