001    /* 
002    Copyright (c) 2003, The Regents of the University of California, through 
003    Lawrence Berkeley National Laboratory (subject to receipt of any required 
004    approvals from the U.S. Dept. of Energy).  All rights reserved.
005    */
006    package gov.lbl.dsd.sea.demo;
007    
008    import gov.lbl.dsd.sea.EventHandler;
009    import gov.lbl.dsd.sea.Stage;
010    import gov.lbl.dsd.sea.StageManager;
011    import gov.lbl.dsd.sea.event.IllegalEventException;
012    
013    import java.util.Date;
014    
015    import EDU.oswego.cs.dl.util.concurrent.CountDown;
016    
017    /**
018     * Demonstrates how to use the framework with a very basic example. In this
019     * example there are two stages. The first stage receives number events that are
020     * incrementing over time. It prints the number, adds 1000 to it, and hands the
021     * new number to the second stage, which also prints it.
022     * 
023     * @author whoschek@lbl.gov
024     * @author $Author: gegles $
025     * @version $Revision: 1.4 $, $Date: 2004/09/16 16:57:15 $
026     */
027    public class StageDemo {
028    
029            public static void main(String[] args) {
030                    int runs = 3;
031                    if (args.length > 0) runs = Integer.parseInt(args[0]);
032                    
033                    final CountDown barrier = new CountDown(1); // a barrier to later wait until all responses have arrived
034    
035                    // the event handler for the second stage
036                    EventHandler h2 = new EventHandler() {
037                            public void handle(Object event) {
038                                    System.out.println("handler2: " + event);
039                                    int val = ((Integer) event).intValue();
040                                    if (val == -1+1000) {
041                                            barrier.release();
042                                    }
043                            }
044                    };
045                    StageManager manager = new StageManager();
046                    final Stage s2 = manager.createStage(h2).start();
047                    
048                    // the event handler for the first stage
049                    EventHandler h1 = new EventHandler() {
050                            public void handle(Object event) {
051                                    System.out.println("handler1: " + event);
052                                    if (! (event instanceof Integer)) throw new IllegalEventException(event, this.getStage());
053                                    
054                                    int val = ((Integer) event).intValue();
055                                    s2.enqueue(new Integer(val + 1000));
056                            }
057                    };
058                    Stage s1 = manager.createStage(h1).start();
059                    
060                    // enqueue some events to be handled immediately, and also later in three secs
061                    s1.enqueue(new Integer(100), new Date(System.currentTimeMillis() + 1));
062                    s1.enqueue(new Integer(200), new Date(System.currentTimeMillis() + 3000));
063                    
064                    //s1.enqueue("illegal dummy");  // uncomment this to experiment with getting IllegalEventExceptions
065    
066                    // enqueue some more events to be handled ASAP
067                    for (int i=0; i < runs; i++) {
068                            s1.enqueue(new Integer(i));
069                    }
070                    
071                    /*
072                    // uncomment to see the "three second" event being handled before shutting down
073                    try {
074                            Thread.sleep(5000);
075                    } catch (InterruptedException e) { e.printStackTrace(); }
076                    */
077    
078                    s1.enqueue(new Integer(-1)); // enqueue crude "termination signal"
079    
080                    // cleanly shut down all stages and threads
081                    System.out.println("waiting for all events to run through pipeline...");
082                    try {
083                            barrier.acquire();
084                    } catch (InterruptedException e) {
085                            throw new RuntimeException(e);
086                    }
087                    System.out.println("now shutting down...");
088                    manager.stopAll();
089            }
090            
091    }