major update to cooja's event scheduler:

* all events must now be scheduled from the simulation thread
* added new methods for requesting simulation thread execution

this patch solves some race conditions introduced by the new mspsim execution scheduling, but affects many existing plugins and interfaces
This commit is contained in:
fros4943 2009-10-27 10:06:59 +00:00
parent 4715637b38
commit 64293abe03
2 changed files with 144 additions and 184 deletions

View File

@ -26,13 +26,11 @@
* OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
* SUCH DAMAGE.
*
* $Id: EventQueue.java,v 1.8 2009/10/19 17:32:38 fros4943 Exp $
* $Id: EventQueue.java,v 1.9 2009/10/27 10:06:59 fros4943 Exp $
*/
package se.sics.cooja;
import java.util.ArrayList;
/**
* @author Joakim Eriksson (ported to COOJA by Fredrik Osterlind)
*/
@ -41,41 +39,6 @@ public class EventQueue {
private TimeEvent first;
private int eventCount = 0;
/* For scheduling events from outside simulation thread effectively */
private boolean hasUnsortedEvents = false;
private ArrayList<TimeEvent> unsortedEvents = new ArrayList<TimeEvent>();
public EventQueue() {
}
private synchronized void sortEvents() {
hasUnsortedEvents = false;
for (TimeEvent e: unsortedEvents) {
if (!e.removed) {
addEvent(e);
}
}
unsortedEvents.clear();
}
/**
* May be called from outside simulation thread.
*
* @param event Event
* @param time Time
*/
public synchronized void addEventUnsorted(TimeEvent event, long time) {
/* Make sure this event is not executed before being resorted (readded) */
if (event.queue != null) {
event.remove();
}
event.time = time;
event.removed = false;
unsortedEvents.add(event);
hasUnsortedEvents = true;
}
/**
* Should only be called from simulation thread!
*
@ -87,15 +50,13 @@ public class EventQueue {
addEvent(event);
}
/**
* Should only be called from simulation thread!
*
* @param event Event
*/
public void addEvent(TimeEvent event) {
if (event.queue != null) {
private void addEvent(TimeEvent event) {
if (event.removed && event.queue != null) {
removeFromQueue(event);
}
if (event.queue != null) {
throw new IllegalStateException("Event was already scheduled in the past: " + event);
}
if (first == null) {
first = event;
@ -168,10 +129,6 @@ public class EventQueue {
* @return Event
*/
public TimeEvent popFirst() {
if (hasUnsortedEvents) {
sortEvents();
}
TimeEvent tmp = first;
if (tmp == null) {
return null;
@ -193,14 +150,10 @@ public class EventQueue {
}
public TimeEvent peekFirst() {
if (hasUnsortedEvents) {
sortEvents();
}
return first;
}
public String toString() {
return "EventQueue with " + (eventCount+unsortedEvents.size()) + " events";
return "EventQueue with " + eventCount + " events";
}
}

View File

@ -24,11 +24,12 @@
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
* SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
* $Id: Simulation.java,v 1.52 2009/08/20 13:10:35 fros4943 Exp $
* $Id: Simulation.java,v 1.53 2009/10/27 10:06:59 fros4943 Exp $
*/
package se.sics.cooja;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Observable;
@ -51,7 +52,8 @@ import se.sics.cooja.dialogs.CreateSimDialog;
* @author Fredrik Osterlind
*/
public class Simulation extends Observable implements Runnable {
public static final long MILLISECOND = 1000L;
public static final long MICROSECOND = 1L;
public static final long MILLISECOND = 1000*MICROSECOND;
/*private static long EVENT_COUNTER = 0;*/
@ -95,8 +97,37 @@ public class Simulation extends Observable implements Runnable {
}
}
/* Event queue */
private EventQueue eventQueue = new EventQueue();
/* Poll requests */
private boolean hasPollRequests = false;
private ArrayDeque<Runnable> pollRequests = new ArrayDeque<Runnable>();
/**
* Request poll from simulation thread.
* Poll requests are prioritized over simulation events, and are
* executed between each simulation event.
*
* @param r Simulation thread action
*/
public void invokeSimulationThread(Runnable r) {
synchronized (pollRequests) {
pollRequests.addLast(r);
hasPollRequests = true;
}
}
private Runnable popSimulationInvokes() {
Runnable r;
synchronized (pollRequests) {
r = pollRequests.pop();
hasPollRequests = !pollRequests.isEmpty();
}
return r;
}
/**
* Add millisecond observer.
* This observer is notified once every simulated millisecond.
@ -107,7 +138,16 @@ public class Simulation extends Observable implements Runnable {
public void addMillisecondObserver(Observer newObserver) {
millisecondObservable.addObserver(newObserver);
hasMillisecondObservers = true;
rescheduleEvents = true;
invokeSimulationThread(new Runnable() {
public void run() {
if (!millisecondEvent.isScheduled()) {
scheduleEvent(
millisecondEvent,
currentSimulationTime - (currentSimulationTime % MILLISECOND) + MILLISECOND);
}
}
});
}
/**
@ -119,64 +159,59 @@ public class Simulation extends Observable implements Runnable {
public void deleteMillisecondObserver(Observer observer) {
millisecondObservable.deleteObserver(observer);
hasMillisecondObservers = millisecondObservable.countObservers() > 0;
rescheduleEvents = true;
}
/**
* Schedule events.
* This method is not thread-safe, and should only be invoked when the
* simulation is paused, or from inside the simulation loop.
*
* @see #scheduleEvent(TimeEvent, long)
* @param e Event
* @param time Execution time
* @return True iff current thread is the simulation thread
*/
public void scheduleEventUnsafe(TimeEvent e, long time) {
eventQueue.addEvent(e, time);
public boolean isSimulationThread() {
return simulationThread == Thread.currentThread();
}
/**
* Schedule event to be handled by event loop.
* Schedule simulation event for given time.
* Already scheduled events must be removed before they are rescheduled.
*
* If the simulation is running, this method may only be called from the simulation thread.
*
* @see #invokeSimulationThread(Runnable)
*
* @param e Event
* @param time Execution time
*/
public void scheduleEvent(TimeEvent e, long time) {
if (Thread.currentThread() == simulationThread) {
eventQueue.addEvent(e, time);
} else {
eventQueue.addEventUnsorted(e, time);
}
}
private Mote[] emulatedMoteArray;
private TimeEvent tickEmulatedMotesEvent = new TimeEvent(0) {
public void execute(long t) {
/*logger.info("MSP motes tick at: " + t);*/
if (emulatedMoteArray.length == 0) {
return;
public void scheduleEvent(final TimeEvent e, final long time) {
if (!isRunning() || isSimulationThread()) {
/* Schedule immediately */
if (e.isScheduled()) {
e.remove();
}
eventQueue.addEvent(e, time);
return;
}
/* Tick MSP motes */
boolean wantMoreTicks = true;
while (wantMoreTicks) {
/* Tick all MSP motes until none need more ticks */
wantMoreTicks = false;
for (Mote element : emulatedMoteArray) {
if (element.tick(currentSimulationTime)) {
wantMoreTicks = true;
}
/* Schedule soon */
invokeSimulationThread(new Runnable() {
public void run() {
if (e.isScheduled()) {
e.remove();
}
if (time < getSimulationTime()) {
eventQueue.addEvent(e, getSimulationTime());
} else {
eventQueue.addEvent(e, time);
}
}
/* XXX Reschedule MSP motes (millisecond resolution) */
scheduleEventUnsafe(this, t+1000);
});
/* TODO Strict scheduling from simulation thread */
/*if (e.isScheduled()) {
throw new IllegalStateException("Event already scheduled: " + e);
}
public String toString() {
return "MSPSIM ALL";
if (isRunning && !isSimulationThread()) {
throw new IllegalStateException("Scheduling event from non-simulation thread: " + e);
}
};
eventQueue.addEvent(e, time);*/
}
private TimeEvent delayEvent = new TimeEvent(0) {
public void execute(long t) {
@ -197,7 +232,7 @@ public class Simulation extends Observable implements Runnable {
}
/* Reschedule us next millisecond */
scheduleEventUnsafe(this, t+MILLISECOND);
scheduleEvent(this, t+MILLISECOND);
return;
}
@ -208,7 +243,7 @@ public class Simulation extends Observable implements Runnable {
}
/* Reschedule us next period */
scheduleEventUnsafe(this, t+delayPeriod*MILLISECOND);
scheduleEvent(this, t+delayPeriod*MILLISECOND);
}
public String toString() {
return "DELAY";
@ -222,61 +257,40 @@ public class Simulation extends Observable implements Runnable {
}
millisecondObservable.newMillisecond(getSimulationTime());
scheduleEventUnsafe(this, t+MILLISECOND);
scheduleEvent(this, t+MILLISECOND);
}
public String toString() {
return "MILLISECOND: " + millisecondObservable.countObservers();
}
};
private void recreateMoteLists() {
/* Tick MSP motes separately */
ArrayList<Mote> emulatedMotes = new ArrayList<Mote>();
for (Mote mote: motes) {
/* TODO: fixe an emulatedMote generic class */
if (mote.getType().getClass().toString().contains(".mspmote.")) {
emulatedMotes.add(mote);
} else if (mote.getType().getClass().toString().contains(".avrmote.")) {
emulatedMotes.add(mote);
}
}
emulatedMoteArray = emulatedMotes.toArray(new Mote[emulatedMotes.size()]);
}
private boolean rescheduleEvents = false;
public void run() {
long lastStartTime = System.currentTimeMillis();
logger.info("Simulation main loop started, system time: " + lastStartTime);
isRunning = true;
/* Schedule tick events */
delayLastSim = System.currentTimeMillis();
scheduleEventUnsafe(tickEmulatedMotesEvent, currentSimulationTime);
scheduleEventUnsafe(delayEvent, currentSimulationTime - (currentSimulationTime % 1000) + 1000);
scheduleEventUnsafe(millisecondEvent, currentSimulationTime - (currentSimulationTime % 1000) + 1000);
/* Simulation starting */
this.setChanged();
this.notifyObservers(this);
recreateMoteLists();
try {
TimeEvent nextEvent;
while (isRunning) {
if (rescheduleEvents) {
rescheduleEvents = false;
scheduleEventUnsafe(tickEmulatedMotesEvent, currentSimulationTime);
scheduleEventUnsafe(delayEvent, currentSimulationTime - (currentSimulationTime % 1000) + 1000);
scheduleEventUnsafe(millisecondEvent, currentSimulationTime - (currentSimulationTime % 1000) + 1000);
/* Handle all poll requests */
while (hasPollRequests) {
popSimulationInvokes().run();
}
/* Handle one simulation event, and update simulation time */
nextEvent = eventQueue.popFirst();
if (nextEvent == null) {
throw new RuntimeException("No more events");
}
if (nextEvent.time < currentSimulationTime) {
throw new RuntimeException("Next event is in the past: " + nextEvent.time + " < " + currentSimulationTime + ": " + nextEvent);
}
currentSimulationTime = nextEvent.time;
/*logger.info("Executing event #" + EVENT_COUNTER++ + " @ " + currentSimulationTime + ": " + nextEvent);*/
nextEvent.execute(currentSimulationTime);
@ -286,7 +300,7 @@ public class Simulation extends Observable implements Runnable {
}
}
} catch (RuntimeException e) {
logger.fatal("Simulation stopped due to error", e);
logger.fatal("Simulation stopped due to error: " + e.getMessage(), e);
if (!GUI.isVisualized()) {
/* Quit simulator if in test mode */
@ -297,13 +311,10 @@ public class Simulation extends Observable implements Runnable {
simulationThread = null;
stopSimulation = false;
// Notify observers simulation has stopped
this.setChanged();
this.notifyObservers(this);
logger.info("Simulation main loop stopped, system time: "
+ System.currentTimeMillis() + "\tDuration: "
+ (System.currentTimeMillis() - lastStartTime) + " ms");
logger.info("Simulation main loop stopped, system time: " + System.currentTimeMillis() +
"\tDuration: " + (System.currentTimeMillis() - lastStartTime) + " ms");
}
/**
@ -345,10 +356,13 @@ public class Simulation extends Observable implements Runnable {
}
/**
* Starts simulation if stopped, ticks all motes once, and finally stops
* Starts simulation if stopped, executes one millisecond, and finally stops
* simulation again.
*/
public void stepMillisecondSimulation() {
if (isRunning()) {
return;
}
TimeEvent stopEvent = new TimeEvent(0) {
public void execute(long t) {
/* Stop simulation */
@ -356,11 +370,7 @@ public class Simulation extends Observable implements Runnable {
}
};
scheduleEvent(stopEvent, getSimulationTime()+Simulation.MILLISECOND);
/* Start simulation if not running */
if (!isRunning()) {
startSimulation();
}
startSimulation();
}
/**
@ -648,8 +658,8 @@ public class Simulation extends Observable implements Runnable {
public void removeMote(final Mote mote) {
/* Simulation is running, remove mote in simulation loop */
TimeEvent removeMoteEvent = new TimeEvent(0) {
public void execute(long t) {
Runnable removeMote = new Runnable() {
public void run() {
motes.remove(mote);
currentRadioMedium.unregisterMote(mote, Simulation.this);
setChanged();
@ -664,20 +674,17 @@ public class Simulation extends Observable implements Runnable {
ev.remove();
}
}
ev = ev.nextEvent;
}
recreateMoteLists();
}
};
if (!isRunning()) {
/* Simulation is stopped, remove mote immediately */
removeMoteEvent.execute(0);
removeMote.run();
} else {
/* Schedule event */
scheduleEvent(removeMoteEvent, Simulation.this.getSimulationTime());
/* Remove mote from simulation thread */
invokeSimulationThread(removeMote);
}
}
@ -688,41 +695,33 @@ public class Simulation extends Observable implements Runnable {
* Mote to add
*/
public void addMote(final Mote mote) {
if (mote.getInterfaces().getClock() != null) {
if (maxMoteStartupDelay > 0) {
mote.getInterfaces().getClock().setDrift(
- getSimulationTime()
- randomGenerator.nextInt((int)maxMoteStartupDelay)
);
} else {
mote.getInterfaces().getClock().setDrift(-getSimulationTime());
}
}
if (!isRunning()) {
/* Simulation is stopped, add mote immediately */
motes.add(mote);
currentRadioMedium.registerMote(mote, this);
this.setChanged();
this.notifyObservers(mote);
return;
}
/* Simulation is running, add mote in simulation loop */
TimeEvent addNewMoteEvent = new TimeEvent(0) {
public void execute(long t) {
Runnable addMote = new Runnable() {
public void run() {
if (mote.getInterfaces().getClock() != null) {
if (maxMoteStartupDelay > 0) {
mote.getInterfaces().getClock().setDrift(
- getSimulationTime()
- randomGenerator.nextInt((int)maxMoteStartupDelay)
);
} else {
mote.getInterfaces().getClock().setDrift(-getSimulationTime());
}
}
motes.add(mote);
currentRadioMedium.registerMote(mote, Simulation.this);
recreateMoteLists();
Simulation.this.setChanged();
Simulation.this.notifyObservers(mote);
}
public String toString() {
return "ADD MOTE";
setChanged();
notifyObservers(mote);
}
};
scheduleEvent(addNewMoteEvent, Simulation.this.getSimulationTime());
if (!isRunning()) {
/* Simulation is stopped, add mote immediately */
addMote.run();
} else {
/* Add mote from simulation thread */
invokeSimulationThread(addMote);
}
}
/**
@ -825,9 +824,17 @@ public class Simulation extends Observable implements Runnable {
delayPeriod = 1; /* minimum */
}
rescheduleEvents = true;
this.setChanged();
this.notifyObservers(this);
invokeSimulationThread(new Runnable() {
public void run() {
if (!delayEvent.isScheduled()) {
scheduleEvent(
delayEvent,
currentSimulationTime - (currentSimulationTime % MILLISECOND) + MILLISECOND);
}
Simulation.this.setChanged();
Simulation.this.notifyObservers(this);
}
});
}
/**