001    /* 
002    Copyright (c) 2003, The Regents of the University of California, through 
003    Lawrence Berkeley National Laboratory (subject to receipt of any required 
004    approvals from the U.S. Dept. of Energy).  All rights reserved.
005    */
006    package gov.lbl.dsd.sea;
007    
008    import java.util.Iterator;
009    import java.util.LinkedHashMap;
010    import java.util.Map;
011    
012    import EDU.oswego.cs.dl.util.concurrent.DirectExecutor;
013    import EDU.oswego.cs.dl.util.concurrent.Executor;
014    import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
015    import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
016    import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
017    import EDU.oswego.cs.dl.util.concurrent.ThreadedExecutor;
018    
019    
020    /**
021     * Creates and manages a set of stages;
022     * Contains several default factories for commonly used threading
023     * policies. By far the most commonly used are <code>QUEUED</code>,
024     * <code>POOLED</code> and <code>DIRECT</code>.
025     * 
026     * @author whoschek@lbl.gov
027     * @author $Author: gegles $
028     * @version $Revision: 1.17 $, $Date: 2004/09/16 16:57:15 $
029     */
030    public class StageManager {
031    
032            //private static StageManager queuedManager; 
033            protected final Map stages; // the stages created by this class
034            protected final ExecutorFactory executorFactory;
035    
036            /** 
037             * Creates an instance with the default threading policy. 
038             */
039            public StageManager() {
040                    this(QUEUED);
041            }
042            
043            /** 
044             * Creates an instance with the given threading policy. 
045             */
046            public StageManager(ExecutorFactory executorFactory) {
047                    if (executorFactory == null) throw new IllegalArgumentException("executor factory must not be null");
048                    this.executorFactory = executorFactory;
049                    this.stages = new LinkedHashMap();
050            }
051            
052            /** 
053             * Adds the given stage to the set of managed stages.
054             * @return the same stage (for convenience)
055             */
056            public synchronized Stage addStage(Stage stage) {
057                    String stageName = stage.getName();
058                    if (this.getStage(stageName) != null) {
059                            throw new IllegalArgumentException("stage=" + stageName + " already added previously.");
060                    }
061                    
062                    this.stages.put(stageName, stage);
063                    return stage;
064            }
065            
066            /** 
067             * Creates and returns a managed stage with the given event handler. 
068             */
069            public synchronized Stage createStage(EventHandler handler) {
070                    return this.createStage(handler.getClass().getName() + "#" + this.stages.size(), handler);
071            }
072    
073            /** 
074             * Creates and returns a managed stage with the given name and event handler. 
075             */
076            public Stage createStage(String stageName, EventHandler handler) {
077                    return this.createStage(stageName, handler, createExceptionHandler());
078            }
079    
080            /** 
081             * Creates and returns a managed stage with the given name, event handler and exceptionHandler. 
082             */
083            public Stage createStage(String stageName, EventHandler handler, ExceptionHandler exceptionHandler) {
084                    return this.createStage(stageName, handler, exceptionHandler, this.executorFactory);
085            }
086    
087            /** 
088             * Creates and returns a managed stage with the given name, event handler, exceptionHandler and executor. 
089             */
090            protected Stage createStage(String stageName, EventHandler handler, ExceptionHandler exceptionHandler, ExecutorFactory executorFactory) {
091                    Stage stage = new Stage(stageName, handler, executorFactory, exceptionHandler);
092                    this.addStage(stage);
093                    return stage;
094            }
095            
096            /** 
097             * Override this method for custom exception handlers. 
098             */
099            protected ExceptionHandler createExceptionHandler() {
100                    return null;
101            }
102            
103            /** 
104             * Starts all managed stages.
105             */
106            public synchronized void startAll() {
107                    Iterator iter = this.stages.values().iterator();
108                    while (iter.hasNext()) {
109                            Stage stage = (Stage) iter.next();
110                            stage.start();
111                    }
112            }
113    
114            /** 
115             * Stops all managed stages.
116             */
117            public synchronized void stopAll() {
118                    Iterator iter = this.stages.values().iterator();
119                    while (iter.hasNext()) {
120                            Stage stage = (Stage) iter.next();
121                            stage.stop();
122                    }
123            }
124            
125    //      /**
126    //       * Returns the global queued default instance of this class. 
127    //       */
128    //      public static synchronized StageManager getManager() {
129    //              if (queuedManager == null) queuedManager = new StageManager(QUEUED);
130    //              return queuedManager;
131    //      }
132            
133            /** 
134             * Returns the managed stage with the given name, 
135             * or <code>null</code> if no such stage is currently managed.
136             */
137            public synchronized Stage getStage(String stageName) {
138                    return (Stage) this.stages.get(stageName);
139            }
140            
141            /**
142             * Returns a string representation of the receiver.
143             */
144            public synchronized String toString() {
145                    return super.toString() + ": execFactory=" + this.executorFactory +
146                                    ", stages=" + this.stages.values().toString();
147            }
148    
149    
150    ////////////////////////////////////////////////////////////////
151    // nested classes
152    ////////////////////////////////////////////////////////////////
153    
154            /**
155             * Creates and returns {@link DirectExecutor} instances.
156             */
157            public static final ExecutorFactory DIRECT = new Direct();
158    
159            
160            private static class Direct implements ExecutorFactory {                
161                    public Executor createExecutor() {
162                            return new DirectExecutor();
163                    }
164            }
165    
166            /**
167             * Creates and returns {@link QueuedExecutor} instances.
168             */
169            public static final ExecutorFactory QUEUED = new Queued();
170    
171            private static class Queued implements ExecutorFactory {                
172                    public Executor createExecutor() {
173                            return new QueuedExecutor(new LinkedQueue());
174    //                      Channel ch; 
175    //                      ch = new LinkedQueue();
176    //                      //ch = new BoundedLinkedQueue(100);
177    //                      //ch = new BoundedPriorityQueue(100);
178    //                      //ch = new Slot();
179    //                      
180    //                      Executor exec;
181    //                      exec = new QueuedExecutor(ch); 
182    //                      //exec = new DirectExecutor();
183    //                      //exec = new ThreadedExecutor();
184    //                      //exec = new PooledExecutor(ch);
185    //                      //exec = new LockedExecutor(new Mutex());
186    //                      return exec;
187                    }
188            }
189    
190            /**
191             * Creates and returns {@link ThreadedExecutor} instances.
192             */
193            public static final ExecutorFactory THREADED = new Threaded();
194    
195            private static class Threaded implements ExecutorFactory {              
196                    public Executor createExecutor() {
197                            return new ThreadedExecutor();
198                    }
199            }
200    
201            /**
202             * Creates and returns {@link PooledExecutor} instances.
203             */
204            public static final ExecutorFactory POOLED = new Pooled();
205    
206            private static class Pooled implements ExecutorFactory {                
207                    public Executor createExecutor() {
208                            PooledExecutor pexec = new PooledExecutor(new LinkedQueue());
209                            pexec.setKeepAliveTime(1000);
210                            return pexec;
211                    }
212            }
213    
214    }