codepad
[
create a new paste
]
login
|
about
Language:
C
C++
D
Haskell
Lua
OCaml
PHP
Perl
Plain Text
Python
Ruby
Scheme
Tcl
diff --git a/circuits/core/manager.py b/circuits/core/manager.py --- a/circuits/core/manager.py +++ b/circuits/core/manager.py @@ -8,11 +8,11 @@ import atexit from os import getpid -from itertools import chain -from collections import deque from inspect import isfunction from uuid import uuid4 as uuid from types import GeneratorType +from itertools import chain, count +from heapq import heappush, heappop from sys import exc_info as _exc_info from weakref import WeakValueDictionary from signal import signal, SIGINT, SIGTERM @@ -112,13 +112,17 @@ def __init__(self, *args, **kwargs): "initializes x; see x.__class__.__doc__ for signature" + self._queue = [] + self._counter = count() + self._tasks = set() self._cache = dict() - self._cache_needs_refresh = False - self._queue = deque() - self._flush_batch = 0 self._globals = set() self._handlers = dict() + + self._flush_batch = 0 + self._cache_needs_refresh = False + self._values = WeakValueDictionary() self._executing_thread = None @@ -315,14 +319,14 @@ component._executing_thread = None self.components.add(component) self.root._queue.extend(list(component._queue)) - component._queue.clear() + component._queue = [] self.root._cache_needs_refresh = True def unregisterChild(self, component): self.components.remove(component) self.root._cache_needs_refresh = True - def _fire(self, event, channel): + def _fire(self, event, channel, priority=0): # check if event is fired while handling an event if thread.get_ident() \ == (self._executing_thread or self._flushing_thread) \ @@ -335,7 +339,7 @@ event.effects = 1 self._currently_handling.effects += 1 - self._queue.append((event, channel)) + heappush(self._queue, (priority, next(self._counter), (event, channel))) # the event comes from another thread else: @@ -353,12 +357,18 @@ # operations that assume its value to remain unchanged. handling = self._currently_handling if isinstance(handling, GenerateEvents): - self._queue.append((event, channel)) + heappush( + self._queue, + (priority, next(self._counter), (event, channel)) + ) handling.reduce_time_left(0) else: - self._queue.append((event, channel)) + heappush( + self._queue, + (priority, next(self._counter), (event, channel)) + ) - def fireEvent(self, event, *channels): + def fireEvent(self, event, *channels, **kwargs): """Fire an event into the system. :param event: The event that is to be fired. @@ -379,7 +389,7 @@ event.channels = channels event.value = Value(event, self) - self.root._fire(event, channels) + self.root._fire(event, channels, **kwargs) return event.value @@ -472,7 +482,7 @@ self._flush_batch = len(self._queue) while self._flush_batch > 0: self._flush_batch -= 1 # Decrement first! - event, channels = self._queue.popleft() + priority, count, (event, channels) = heappop(self._queue) self._dispatcher(event, channels, self._flush_batch) finally: self._flushing_thread = old_flushing diff --git a/tests/core/test_event_priority.py b/tests/core/test_event_priority.py new file mode 100644 --- /dev/null +++ b/tests/core/test_event_priority.py @@ -0,0 +1,47 @@ +#!/usr/bin/env python + +from circuits import Component, Event + + +class Foo(Event): + """Foo Event""" + + +class Done(Event): + """Done Event""" + + +class App(Component): + + def init(self): + self.results = [] + + def foo(self, value): + self.results.append(value) + + def done(self): + self.stop() + + +def test1(): + app = App() + + # Normal Order + [app.fire(Foo(1)), app.fire(Foo(2))] + app.fire(Done()) + + app.run() + + assert app.results == [1, 2] + + +def test2(): + app = App() + + # Priority Order + [app.fire(Foo(1), priority=2), app.fire(Foo(2), priority=0)] + app.fire(Done()) + + app.run() + + assert app.results == [2, 1]
Private
[
?
]
Run code
Submit