001 /* 002 Copyright (c) 2003, The Regents of the University of California, through 003 Lawrence Berkeley National Laboratory (subject to receipt of any required 004 approvals from the U.S. Dept. of Energy). All rights reserved. 005 */ 006 package gov.lbl.dsd.sea.demo; 007 008 import gov.lbl.dsd.sea.EventHandler; 009 import gov.lbl.dsd.sea.Stage; 010 import gov.lbl.dsd.sea.StageManager; 011 import gov.lbl.dsd.sea.event.IllegalEventException; 012 013 import java.util.Date; 014 015 import EDU.oswego.cs.dl.util.concurrent.CountDown; 016 017 /** 018 * Demonstrates how to use the framework with a very basic example. In this 019 * example there are two stages. The first stage receives number events that are 020 * incrementing over time. It prints the number, adds 1000 to it, and hands the 021 * new number to the second stage, which also prints it. 022 * 023 * @author whoschek@lbl.gov 024 * @author $Author: gegles $ 025 * @version $Revision: 1.4 $, $Date: 2004/09/16 16:57:15 $ 026 */ 027 public class StageDemo { 028 029 public static void main(String[] args) { 030 int runs = 3; 031 if (args.length > 0) runs = Integer.parseInt(args[0]); 032 033 final CountDown barrier = new CountDown(1); // a barrier to later wait until all responses have arrived 034 035 // the event handler for the second stage 036 EventHandler h2 = new EventHandler() { 037 public void handle(Object event) { 038 System.out.println("handler2: " + event); 039 int val = ((Integer) event).intValue(); 040 if (val == -1+1000) { 041 barrier.release(); 042 } 043 } 044 }; 045 StageManager manager = new StageManager(); 046 final Stage s2 = manager.createStage(h2).start(); 047 048 // the event handler for the first stage 049 EventHandler h1 = new EventHandler() { 050 public void handle(Object event) { 051 System.out.println("handler1: " + event); 052 if (! (event instanceof Integer)) throw new IllegalEventException(event, this.getStage()); 053 054 int val = ((Integer) event).intValue(); 055 s2.enqueue(new Integer(val + 1000)); 056 } 057 }; 058 Stage s1 = manager.createStage(h1).start(); 059 060 // enqueue some events to be handled immediately, and also later in three secs 061 s1.enqueue(new Integer(100), new Date(System.currentTimeMillis() + 1)); 062 s1.enqueue(new Integer(200), new Date(System.currentTimeMillis() + 3000)); 063 064 //s1.enqueue("illegal dummy"); // uncomment this to experiment with getting IllegalEventExceptions 065 066 // enqueue some more events to be handled ASAP 067 for (int i=0; i < runs; i++) { 068 s1.enqueue(new Integer(i)); 069 } 070 071 /* 072 // uncomment to see the "three second" event being handled before shutting down 073 try { 074 Thread.sleep(5000); 075 } catch (InterruptedException e) { e.printStackTrace(); } 076 */ 077 078 s1.enqueue(new Integer(-1)); // enqueue crude "termination signal" 079 080 // cleanly shut down all stages and threads 081 System.out.println("waiting for all events to run through pipeline..."); 082 try { 083 barrier.acquire(); 084 } catch (InterruptedException e) { 085 throw new RuntimeException(e); 086 } 087 System.out.println("now shutting down..."); 088 manager.stopAll(); 089 } 090 091 }