From 64293abe039e40e8e0f110e014d5fedd2466c66d Mon Sep 17 00:00:00 2001 From: fros4943 Date: Tue, 27 Oct 2009 10:06:59 +0000 Subject: [PATCH] major update to cooja's event scheduler: * all events must now be scheduled from the simulation thread * added new methods for requesting simulation thread execution this patch solves some race conditions introduced by the new mspsim execution scheduling, but affects many existing plugins and interfaces --- .../cooja/java/se/sics/cooja/EventQueue.java | 61 +--- .../cooja/java/se/sics/cooja/Simulation.java | 267 +++++++++--------- 2 files changed, 144 insertions(+), 184 deletions(-) diff --git a/tools/cooja/java/se/sics/cooja/EventQueue.java b/tools/cooja/java/se/sics/cooja/EventQueue.java index 24219ce08..a7a725d05 100644 --- a/tools/cooja/java/se/sics/cooja/EventQueue.java +++ b/tools/cooja/java/se/sics/cooja/EventQueue.java @@ -26,13 +26,11 @@ * OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. * - * $Id: EventQueue.java,v 1.8 2009/10/19 17:32:38 fros4943 Exp $ + * $Id: EventQueue.java,v 1.9 2009/10/27 10:06:59 fros4943 Exp $ */ package se.sics.cooja; -import java.util.ArrayList; - /** * @author Joakim Eriksson (ported to COOJA by Fredrik Osterlind) */ @@ -41,41 +39,6 @@ public class EventQueue { private TimeEvent first; private int eventCount = 0; - /* For scheduling events from outside simulation thread effectively */ - private boolean hasUnsortedEvents = false; - private ArrayList unsortedEvents = new ArrayList(); - - public EventQueue() { - } - - private synchronized void sortEvents() { - hasUnsortedEvents = false; - - for (TimeEvent e: unsortedEvents) { - if (!e.removed) { - addEvent(e); - } - } - unsortedEvents.clear(); - } - - /** - * May be called from outside simulation thread. - * - * @param event Event - * @param time Time - */ - public synchronized void addEventUnsorted(TimeEvent event, long time) { - /* Make sure this event is not executed before being resorted (readded) */ - if (event.queue != null) { - event.remove(); - } - event.time = time; - event.removed = false; - unsortedEvents.add(event); - hasUnsortedEvents = true; - } - /** * Should only be called from simulation thread! * @@ -87,15 +50,13 @@ public class EventQueue { addEvent(event); } - /** - * Should only be called from simulation thread! - * - * @param event Event - */ - public void addEvent(TimeEvent event) { - if (event.queue != null) { + private void addEvent(TimeEvent event) { + if (event.removed && event.queue != null) { removeFromQueue(event); } + if (event.queue != null) { + throw new IllegalStateException("Event was already scheduled in the past: " + event); + } if (first == null) { first = event; @@ -168,10 +129,6 @@ public class EventQueue { * @return Event */ public TimeEvent popFirst() { - if (hasUnsortedEvents) { - sortEvents(); - } - TimeEvent tmp = first; if (tmp == null) { return null; @@ -193,14 +150,10 @@ public class EventQueue { } public TimeEvent peekFirst() { - if (hasUnsortedEvents) { - sortEvents(); - } - return first; } public String toString() { - return "EventQueue with " + (eventCount+unsortedEvents.size()) + " events"; + return "EventQueue with " + eventCount + " events"; } } diff --git a/tools/cooja/java/se/sics/cooja/Simulation.java b/tools/cooja/java/se/sics/cooja/Simulation.java index f845f24f9..c318a744d 100644 --- a/tools/cooja/java/se/sics/cooja/Simulation.java +++ b/tools/cooja/java/se/sics/cooja/Simulation.java @@ -24,11 +24,12 @@ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * - * $Id: Simulation.java,v 1.52 2009/08/20 13:10:35 fros4943 Exp $ + * $Id: Simulation.java,v 1.53 2009/10/27 10:06:59 fros4943 Exp $ */ package se.sics.cooja; +import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collection; import java.util.Observable; @@ -51,7 +52,8 @@ import se.sics.cooja.dialogs.CreateSimDialog; * @author Fredrik Osterlind */ public class Simulation extends Observable implements Runnable { - public static final long MILLISECOND = 1000L; + public static final long MICROSECOND = 1L; + public static final long MILLISECOND = 1000*MICROSECOND; /*private static long EVENT_COUNTER = 0;*/ @@ -95,8 +97,37 @@ public class Simulation extends Observable implements Runnable { } } + /* Event queue */ private EventQueue eventQueue = new EventQueue(); + /* Poll requests */ + private boolean hasPollRequests = false; + private ArrayDeque pollRequests = new ArrayDeque(); + + + /** + * Request poll from simulation thread. + * Poll requests are prioritized over simulation events, and are + * executed between each simulation event. + * + * @param r Simulation thread action + */ + public void invokeSimulationThread(Runnable r) { + synchronized (pollRequests) { + pollRequests.addLast(r); + hasPollRequests = true; + } + } + + private Runnable popSimulationInvokes() { + Runnable r; + synchronized (pollRequests) { + r = pollRequests.pop(); + hasPollRequests = !pollRequests.isEmpty(); + } + return r; + } + /** * Add millisecond observer. * This observer is notified once every simulated millisecond. @@ -107,7 +138,16 @@ public class Simulation extends Observable implements Runnable { public void addMillisecondObserver(Observer newObserver) { millisecondObservable.addObserver(newObserver); hasMillisecondObservers = true; - rescheduleEvents = true; + + invokeSimulationThread(new Runnable() { + public void run() { + if (!millisecondEvent.isScheduled()) { + scheduleEvent( + millisecondEvent, + currentSimulationTime - (currentSimulationTime % MILLISECOND) + MILLISECOND); + } + } + }); } /** @@ -119,64 +159,59 @@ public class Simulation extends Observable implements Runnable { public void deleteMillisecondObserver(Observer observer) { millisecondObservable.deleteObserver(observer); hasMillisecondObservers = millisecondObservable.countObservers() > 0; - rescheduleEvents = true; } - /** - * Schedule events. - * This method is not thread-safe, and should only be invoked when the - * simulation is paused, or from inside the simulation loop. - * - * @see #scheduleEvent(TimeEvent, long) - * @param e Event - * @param time Execution time + * @return True iff current thread is the simulation thread */ - public void scheduleEventUnsafe(TimeEvent e, long time) { - eventQueue.addEvent(e, time); + public boolean isSimulationThread() { + return simulationThread == Thread.currentThread(); } /** - * Schedule event to be handled by event loop. + * Schedule simulation event for given time. + * Already scheduled events must be removed before they are rescheduled. + * + * If the simulation is running, this method may only be called from the simulation thread. + * + * @see #invokeSimulationThread(Runnable) * * @param e Event * @param time Execution time */ - public void scheduleEvent(TimeEvent e, long time) { - if (Thread.currentThread() == simulationThread) { - eventQueue.addEvent(e, time); - } else { - eventQueue.addEventUnsorted(e, time); - } - } - - private Mote[] emulatedMoteArray; - private TimeEvent tickEmulatedMotesEvent = new TimeEvent(0) { - public void execute(long t) { - /*logger.info("MSP motes tick at: " + t);*/ - if (emulatedMoteArray.length == 0) { - return; + public void scheduleEvent(final TimeEvent e, final long time) { + if (!isRunning() || isSimulationThread()) { + /* Schedule immediately */ + if (e.isScheduled()) { + e.remove(); } + eventQueue.addEvent(e, time); + return; + } - /* Tick MSP motes */ - boolean wantMoreTicks = true; - while (wantMoreTicks) { - /* Tick all MSP motes until none need more ticks */ - wantMoreTicks = false; - for (Mote element : emulatedMoteArray) { - if (element.tick(currentSimulationTime)) { - wantMoreTicks = true; - } + /* Schedule soon */ + invokeSimulationThread(new Runnable() { + public void run() { + if (e.isScheduled()) { + e.remove(); + } + if (time < getSimulationTime()) { + eventQueue.addEvent(e, getSimulationTime()); + } else { + eventQueue.addEvent(e, time); } } - - /* XXX Reschedule MSP motes (millisecond resolution) */ - scheduleEventUnsafe(this, t+1000); + }); + + /* TODO Strict scheduling from simulation thread */ + /*if (e.isScheduled()) { + throw new IllegalStateException("Event already scheduled: " + e); } - public String toString() { - return "MSPSIM ALL"; + if (isRunning && !isSimulationThread()) { + throw new IllegalStateException("Scheduling event from non-simulation thread: " + e); } - }; + eventQueue.addEvent(e, time);*/ + } private TimeEvent delayEvent = new TimeEvent(0) { public void execute(long t) { @@ -197,7 +232,7 @@ public class Simulation extends Observable implements Runnable { } /* Reschedule us next millisecond */ - scheduleEventUnsafe(this, t+MILLISECOND); + scheduleEvent(this, t+MILLISECOND); return; } @@ -208,7 +243,7 @@ public class Simulation extends Observable implements Runnable { } /* Reschedule us next period */ - scheduleEventUnsafe(this, t+delayPeriod*MILLISECOND); + scheduleEvent(this, t+delayPeriod*MILLISECOND); } public String toString() { return "DELAY"; @@ -222,61 +257,40 @@ public class Simulation extends Observable implements Runnable { } millisecondObservable.newMillisecond(getSimulationTime()); - scheduleEventUnsafe(this, t+MILLISECOND); + scheduleEvent(this, t+MILLISECOND); } public String toString() { return "MILLISECOND: " + millisecondObservable.countObservers(); } }; - private void recreateMoteLists() { - /* Tick MSP motes separately */ - ArrayList emulatedMotes = new ArrayList(); - for (Mote mote: motes) { - /* TODO: fixe an emulatedMote generic class */ - if (mote.getType().getClass().toString().contains(".mspmote.")) { - emulatedMotes.add(mote); - } else if (mote.getType().getClass().toString().contains(".avrmote.")) { - emulatedMotes.add(mote); - } - } - emulatedMoteArray = emulatedMotes.toArray(new Mote[emulatedMotes.size()]); - } - - private boolean rescheduleEvents = false; public void run() { long lastStartTime = System.currentTimeMillis(); logger.info("Simulation main loop started, system time: " + lastStartTime); isRunning = true; - - /* Schedule tick events */ delayLastSim = System.currentTimeMillis(); - scheduleEventUnsafe(tickEmulatedMotesEvent, currentSimulationTime); - scheduleEventUnsafe(delayEvent, currentSimulationTime - (currentSimulationTime % 1000) + 1000); - scheduleEventUnsafe(millisecondEvent, currentSimulationTime - (currentSimulationTime % 1000) + 1000); /* Simulation starting */ this.setChanged(); this.notifyObservers(this); - recreateMoteLists(); - try { TimeEvent nextEvent; while (isRunning) { - if (rescheduleEvents) { - rescheduleEvents = false; - scheduleEventUnsafe(tickEmulatedMotesEvent, currentSimulationTime); - scheduleEventUnsafe(delayEvent, currentSimulationTime - (currentSimulationTime % 1000) + 1000); - scheduleEventUnsafe(millisecondEvent, currentSimulationTime - (currentSimulationTime % 1000) + 1000); + /* Handle all poll requests */ + while (hasPollRequests) { + popSimulationInvokes().run(); } - + + /* Handle one simulation event, and update simulation time */ nextEvent = eventQueue.popFirst(); if (nextEvent == null) { throw new RuntimeException("No more events"); } - + if (nextEvent.time < currentSimulationTime) { + throw new RuntimeException("Next event is in the past: " + nextEvent.time + " < " + currentSimulationTime + ": " + nextEvent); + } currentSimulationTime = nextEvent.time; /*logger.info("Executing event #" + EVENT_COUNTER++ + " @ " + currentSimulationTime + ": " + nextEvent);*/ nextEvent.execute(currentSimulationTime); @@ -286,7 +300,7 @@ public class Simulation extends Observable implements Runnable { } } } catch (RuntimeException e) { - logger.fatal("Simulation stopped due to error", e); + logger.fatal("Simulation stopped due to error: " + e.getMessage(), e); if (!GUI.isVisualized()) { /* Quit simulator if in test mode */ @@ -297,13 +311,10 @@ public class Simulation extends Observable implements Runnable { simulationThread = null; stopSimulation = false; - // Notify observers simulation has stopped this.setChanged(); this.notifyObservers(this); - - logger.info("Simulation main loop stopped, system time: " - + System.currentTimeMillis() + "\tDuration: " - + (System.currentTimeMillis() - lastStartTime) + " ms"); + logger.info("Simulation main loop stopped, system time: " + System.currentTimeMillis() + + "\tDuration: " + (System.currentTimeMillis() - lastStartTime) + " ms"); } /** @@ -345,10 +356,13 @@ public class Simulation extends Observable implements Runnable { } /** - * Starts simulation if stopped, ticks all motes once, and finally stops + * Starts simulation if stopped, executes one millisecond, and finally stops * simulation again. */ public void stepMillisecondSimulation() { + if (isRunning()) { + return; + } TimeEvent stopEvent = new TimeEvent(0) { public void execute(long t) { /* Stop simulation */ @@ -356,11 +370,7 @@ public class Simulation extends Observable implements Runnable { } }; scheduleEvent(stopEvent, getSimulationTime()+Simulation.MILLISECOND); - - /* Start simulation if not running */ - if (!isRunning()) { - startSimulation(); - } + startSimulation(); } /** @@ -648,8 +658,8 @@ public class Simulation extends Observable implements Runnable { public void removeMote(final Mote mote) { /* Simulation is running, remove mote in simulation loop */ - TimeEvent removeMoteEvent = new TimeEvent(0) { - public void execute(long t) { + Runnable removeMote = new Runnable() { + public void run() { motes.remove(mote); currentRadioMedium.unregisterMote(mote, Simulation.this); setChanged(); @@ -664,20 +674,17 @@ public class Simulation extends Observable implements Runnable { ev.remove(); } } - ev = ev.nextEvent; } - - recreateMoteLists(); } }; if (!isRunning()) { /* Simulation is stopped, remove mote immediately */ - removeMoteEvent.execute(0); + removeMote.run(); } else { - /* Schedule event */ - scheduleEvent(removeMoteEvent, Simulation.this.getSimulationTime()); + /* Remove mote from simulation thread */ + invokeSimulationThread(removeMote); } } @@ -688,41 +695,33 @@ public class Simulation extends Observable implements Runnable { * Mote to add */ public void addMote(final Mote mote) { - if (mote.getInterfaces().getClock() != null) { - if (maxMoteStartupDelay > 0) { - mote.getInterfaces().getClock().setDrift( - - getSimulationTime() - - randomGenerator.nextInt((int)maxMoteStartupDelay) - ); - } else { - mote.getInterfaces().getClock().setDrift(-getSimulationTime()); - } - } - - if (!isRunning()) { - /* Simulation is stopped, add mote immediately */ - motes.add(mote); - currentRadioMedium.registerMote(mote, this); - this.setChanged(); - this.notifyObservers(mote); - return; - } - - /* Simulation is running, add mote in simulation loop */ - TimeEvent addNewMoteEvent = new TimeEvent(0) { - public void execute(long t) { + Runnable addMote = new Runnable() { + public void run() { + if (mote.getInterfaces().getClock() != null) { + if (maxMoteStartupDelay > 0) { + mote.getInterfaces().getClock().setDrift( + - getSimulationTime() + - randomGenerator.nextInt((int)maxMoteStartupDelay) + ); + } else { + mote.getInterfaces().getClock().setDrift(-getSimulationTime()); + } + } + motes.add(mote); currentRadioMedium.registerMote(mote, Simulation.this); - recreateMoteLists(); - Simulation.this.setChanged(); - Simulation.this.notifyObservers(mote); - } - public String toString() { - return "ADD MOTE"; + setChanged(); + notifyObservers(mote); } }; - - scheduleEvent(addNewMoteEvent, Simulation.this.getSimulationTime()); + + if (!isRunning()) { + /* Simulation is stopped, add mote immediately */ + addMote.run(); + } else { + /* Add mote from simulation thread */ + invokeSimulationThread(addMote); + } } /** @@ -825,9 +824,17 @@ public class Simulation extends Observable implements Runnable { delayPeriod = 1; /* minimum */ } - rescheduleEvents = true; - this.setChanged(); - this.notifyObservers(this); + invokeSimulationThread(new Runnable() { + public void run() { + if (!delayEvent.isScheduled()) { + scheduleEvent( + delayEvent, + currentSimulationTime - (currentSimulationTime % MILLISECOND) + MILLISECOND); + } + Simulation.this.setChanged(); + Simulation.this.notifyObservers(this); + } + }); } /**