001 /* 002 Copyright (c) 2003, The Regents of the University of California, through 003 Lawrence Berkeley National Laboratory (subject to receipt of any required 004 approvals from the U.S. Dept. of Energy). All rights reserved. 005 */ 006 package gov.lbl.dsd.sea; 007 008 import java.util.Iterator; 009 import java.util.LinkedHashMap; 010 import java.util.Map; 011 012 import EDU.oswego.cs.dl.util.concurrent.DirectExecutor; 013 import EDU.oswego.cs.dl.util.concurrent.Executor; 014 import EDU.oswego.cs.dl.util.concurrent.LinkedQueue; 015 import EDU.oswego.cs.dl.util.concurrent.PooledExecutor; 016 import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor; 017 import EDU.oswego.cs.dl.util.concurrent.ThreadedExecutor; 018 019 020 /** 021 * Creates and manages a set of stages; 022 * Contains several default factories for commonly used threading 023 * policies. By far the most commonly used are <code>QUEUED</code>, 024 * <code>POOLED</code> and <code>DIRECT</code>. 025 * 026 * @author whoschek@lbl.gov 027 * @author $Author: gegles $ 028 * @version $Revision: 1.17 $, $Date: 2004/09/16 16:57:15 $ 029 */ 030 public class StageManager { 031 032 //private static StageManager queuedManager; 033 protected final Map stages; // the stages created by this class 034 protected final ExecutorFactory executorFactory; 035 036 /** 037 * Creates an instance with the default threading policy. 038 */ 039 public StageManager() { 040 this(QUEUED); 041 } 042 043 /** 044 * Creates an instance with the given threading policy. 045 */ 046 public StageManager(ExecutorFactory executorFactory) { 047 if (executorFactory == null) throw new IllegalArgumentException("executor factory must not be null"); 048 this.executorFactory = executorFactory; 049 this.stages = new LinkedHashMap(); 050 } 051 052 /** 053 * Adds the given stage to the set of managed stages. 054 * @return the same stage (for convenience) 055 */ 056 public synchronized Stage addStage(Stage stage) { 057 String stageName = stage.getName(); 058 if (this.getStage(stageName) != null) { 059 throw new IllegalArgumentException("stage=" + stageName + " already added previously."); 060 } 061 062 this.stages.put(stageName, stage); 063 return stage; 064 } 065 066 /** 067 * Creates and returns a managed stage with the given event handler. 068 */ 069 public synchronized Stage createStage(EventHandler handler) { 070 return this.createStage(handler.getClass().getName() + "#" + this.stages.size(), handler); 071 } 072 073 /** 074 * Creates and returns a managed stage with the given name and event handler. 075 */ 076 public Stage createStage(String stageName, EventHandler handler) { 077 return this.createStage(stageName, handler, createExceptionHandler()); 078 } 079 080 /** 081 * Creates and returns a managed stage with the given name, event handler and exceptionHandler. 082 */ 083 public Stage createStage(String stageName, EventHandler handler, ExceptionHandler exceptionHandler) { 084 return this.createStage(stageName, handler, exceptionHandler, this.executorFactory); 085 } 086 087 /** 088 * Creates and returns a managed stage with the given name, event handler, exceptionHandler and executor. 089 */ 090 protected Stage createStage(String stageName, EventHandler handler, ExceptionHandler exceptionHandler, ExecutorFactory executorFactory) { 091 Stage stage = new Stage(stageName, handler, executorFactory, exceptionHandler); 092 this.addStage(stage); 093 return stage; 094 } 095 096 /** 097 * Override this method for custom exception handlers. 098 */ 099 protected ExceptionHandler createExceptionHandler() { 100 return null; 101 } 102 103 /** 104 * Starts all managed stages. 105 */ 106 public synchronized void startAll() { 107 Iterator iter = this.stages.values().iterator(); 108 while (iter.hasNext()) { 109 Stage stage = (Stage) iter.next(); 110 stage.start(); 111 } 112 } 113 114 /** 115 * Stops all managed stages. 116 */ 117 public synchronized void stopAll() { 118 Iterator iter = this.stages.values().iterator(); 119 while (iter.hasNext()) { 120 Stage stage = (Stage) iter.next(); 121 stage.stop(); 122 } 123 } 124 125 // /** 126 // * Returns the global queued default instance of this class. 127 // */ 128 // public static synchronized StageManager getManager() { 129 // if (queuedManager == null) queuedManager = new StageManager(QUEUED); 130 // return queuedManager; 131 // } 132 133 /** 134 * Returns the managed stage with the given name, 135 * or <code>null</code> if no such stage is currently managed. 136 */ 137 public synchronized Stage getStage(String stageName) { 138 return (Stage) this.stages.get(stageName); 139 } 140 141 /** 142 * Returns a string representation of the receiver. 143 */ 144 public synchronized String toString() { 145 return super.toString() + ": execFactory=" + this.executorFactory + 146 ", stages=" + this.stages.values().toString(); 147 } 148 149 150 //////////////////////////////////////////////////////////////// 151 // nested classes 152 //////////////////////////////////////////////////////////////// 153 154 /** 155 * Creates and returns {@link DirectExecutor} instances. 156 */ 157 public static final ExecutorFactory DIRECT = new Direct(); 158 159 160 private static class Direct implements ExecutorFactory { 161 public Executor createExecutor() { 162 return new DirectExecutor(); 163 } 164 } 165 166 /** 167 * Creates and returns {@link QueuedExecutor} instances. 168 */ 169 public static final ExecutorFactory QUEUED = new Queued(); 170 171 private static class Queued implements ExecutorFactory { 172 public Executor createExecutor() { 173 return new QueuedExecutor(new LinkedQueue()); 174 // Channel ch; 175 // ch = new LinkedQueue(); 176 // //ch = new BoundedLinkedQueue(100); 177 // //ch = new BoundedPriorityQueue(100); 178 // //ch = new Slot(); 179 // 180 // Executor exec; 181 // exec = new QueuedExecutor(ch); 182 // //exec = new DirectExecutor(); 183 // //exec = new ThreadedExecutor(); 184 // //exec = new PooledExecutor(ch); 185 // //exec = new LockedExecutor(new Mutex()); 186 // return exec; 187 } 188 } 189 190 /** 191 * Creates and returns {@link ThreadedExecutor} instances. 192 */ 193 public static final ExecutorFactory THREADED = new Threaded(); 194 195 private static class Threaded implements ExecutorFactory { 196 public Executor createExecutor() { 197 return new ThreadedExecutor(); 198 } 199 } 200 201 /** 202 * Creates and returns {@link PooledExecutor} instances. 203 */ 204 public static final ExecutorFactory POOLED = new Pooled(); 205 206 private static class Pooled implements ExecutorFactory { 207 public Executor createExecutor() { 208 PooledExecutor pexec = new PooledExecutor(new LinkedQueue()); 209 pexec.setKeepAliveTime(1000); 210 return pexec; 211 } 212 } 213 214 }