diff --git a/circuits/core/manager.py b/circuits/core/manager.py
--- a/circuits/core/manager.py
+++ b/circuits/core/manager.py
@@ -8,11 +8,11 @@
import atexit
from os import getpid
-from itertools import chain
-from collections import deque
from inspect import isfunction
from uuid import uuid4 as uuid
from types import GeneratorType
+from itertools import chain, count
+from heapq import heappush, heappop
from sys import exc_info as _exc_info
from weakref import WeakValueDictionary
from signal import signal, SIGINT, SIGTERM
@@ -112,13 +112,17 @@
def __init__(self, *args, **kwargs):
"initializes x; see x.__class__.__doc__ for signature"
+ self._queue = []
+ self._counter = count()
+
self._tasks = set()
self._cache = dict()
- self._cache_needs_refresh = False
- self._queue = deque()
- self._flush_batch = 0
self._globals = set()
self._handlers = dict()
+
+ self._flush_batch = 0
+ self._cache_needs_refresh = False
+
self._values = WeakValueDictionary()
self._executing_thread = None
@@ -315,14 +319,14 @@
component._executing_thread = None
self.components.add(component)
self.root._queue.extend(list(component._queue))
- component._queue.clear()
+ component._queue = []
self.root._cache_needs_refresh = True
def unregisterChild(self, component):
self.components.remove(component)
self.root._cache_needs_refresh = True
- def _fire(self, event, channel):
+ def _fire(self, event, channel, priority=0):
# check if event is fired while handling an event
if thread.get_ident() \
== (self._executing_thread or self._flushing_thread) \
@@ -335,7 +339,7 @@
event.effects = 1
self._currently_handling.effects += 1
- self._queue.append((event, channel))
+ heappush(self._queue, (priority, next(self._counter), (event, channel)))
# the event comes from another thread
else:
@@ -353,12 +357,18 @@
# operations that assume its value to remain unchanged.
handling = self._currently_handling
if isinstance(handling, GenerateEvents):
- self._queue.append((event, channel))
+ heappush(
+ self._queue,
+ (priority, next(self._counter), (event, channel))
+ )
handling.reduce_time_left(0)
else:
- self._queue.append((event, channel))
+ heappush(
+ self._queue,
+ (priority, next(self._counter), (event, channel))
+ )
- def fireEvent(self, event, *channels):
+ def fireEvent(self, event, *channels, **kwargs):
"""Fire an event into the system.
:param event: The event that is to be fired.
@@ -379,7 +389,7 @@
event.channels = channels
event.value = Value(event, self)
- self.root._fire(event, channels)
+ self.root._fire(event, channels, **kwargs)
return event.value
@@ -472,7 +482,7 @@
self._flush_batch = len(self._queue)
while self._flush_batch > 0:
self._flush_batch -= 1 # Decrement first!
- event, channels = self._queue.popleft()
+ priority, count, (event, channels) = heappop(self._queue)
self._dispatcher(event, channels, self._flush_batch)
finally:
self._flushing_thread = old_flushing
diff --git a/tests/core/test_event_priority.py b/tests/core/test_event_priority.py
new file mode 100644
--- /dev/null
+++ b/tests/core/test_event_priority.py
@@ -0,0 +1,47 @@
+#!/usr/bin/env python
+
+from circuits import Component, Event
+
+
+class Foo(Event):
+ """Foo Event"""
+
+
+class Done(Event):
+ """Done Event"""
+
+
+class App(Component):
+
+ def init(self):
+ self.results = []
+
+ def foo(self, value):
+ self.results.append(value)
+
+ def done(self):
+ self.stop()
+
+
+def test1():
+ app = App()
+
+ # Normal Order
+ [app.fire(Foo(1)), app.fire(Foo(2))]
+ app.fire(Done())
+
+ app.run()
+
+ assert app.results == [1, 2]
+
+
+def test2():
+ app = App()
+
+ # Priority Order
+ [app.fire(Foo(1), priority=2), app.fire(Foo(2), priority=0)]
+ app.fire(Done())
+
+ app.run()
+
+ assert app.results == [2, 1]