###############################################################################
#
# Copyright 2009 Facebook (see NOTICE.txt)
# Copyright 2011-2012 Pants Developers (see AUTHORS.txt)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
###############################################################################
"""
Asynchronous event processing and timer scheduling.
Pants applications are powered by instances of the
:class:`~pants.engine.Engine` class. An :class:`~pants.engine.Engine`
instance keeps track of active channels, continuously checks them for
new events and raises those events on the channel when they occur.
The :class:`~pants.engine.Engine` class also provides the timer
functionality which allows callable objects to be invoked after some
delay without blocking the process.
Engines
=======
Pants' engines are very simple to use. After you have finished
initializing your application, simply call
:meth:`~pants.engine.Engine.start` to enter the blocking event loop.
:meth:`~pants.engine.Engine.stop` may be called at any time to cause
a graceful exit from the event loop. If your application has a
pre-existing event loop you can call the
:meth:`~pants.engine.Engine.poll` method on each iteration of that loop
rather than using :meth:`~pants.engine.Engine.start` and
:meth:`~pants.engine.Engine.stop`. Ideally,
:meth:`~pants.engine.Engine.poll` should be called many times each
second to ensure that events are processed efficiently and timers
are executed on schedule.
The global engine instance is returned by the
:meth:`~pants.engine.Engine.instance()` classmethod. It is not required
that you use the global engine instance, but it is strongly recommended.
By default, new channels are automatically added to the global engine
when they are created. Channels can be added to a specific engine by
passing the engine instance as a keyword argument to the channel's
constructor. If a :class:`~pants.server.Server` is added to a
non-default engine, any connections it accepts will also be added to
that engine.
Timers
======
In addition to managing channels, Pants' engines can also schedule
timers. Timers are callable objects that get invoked at some point in
the future. Pants has four types of timers: callbacks, loops, deferreds
and cycles. Callbacks and loops are executed each time
:meth:`~pants.engine.Engine.poll` is called - callbacks are executed
once while loops are executed repeatedly. Deferreds and cycles are
executed after a delay specified in seconds - deferreds are executed
once while cycles are executed repeatedly.
:class:`~pants.engine.Engine` has methods for creating each of the four
types of timers: :meth:`~pants.engine.Engine.callback`,
:meth:`~pants.engine.Engine.loop`, :meth:`~pants.engine.Engine.defer`
and :meth:`~pants.engine.Engine.cycle`. Each of these methods is passed
a callable to execute as well as any number of positional and keyword
arguments::
engine.callback(my_callable, 1, 2, foo='bar')
The timer methods all return a callable object which can be used to
cancel the execution of the timer::
cancel_cycle = engine.cycle(10.0, my_callable)
cancel_cycle()
Any object references passed to a timer method will be retained in
memory until the timer has finished executing or is cancelled. Be aware
of this when writing code, as it may cause unexpected behaviors should
you fail to take these references into account. Timers rely on their
engine for scheduling and execution. For best results, you should either
schedule timers while your engine is running or start your engine
immediately after scheduling your timers.
Pollers
=======
By default, Pants' engines support the :py:obj:`~select.epoll`,
:py:obj:`~select.kqueue` and :py:obj:`~select.select` polling methods.
The most appropriate polling method is selected based on the platform on
which Pants is running. Advanced users may wish to use a different
polling method. This can be done by defining a custom poller class and
passing an instance of it to the :class:`~pants.engine.Engine`
constructor. Interested users should review the source code for an
understanding of how these classes are defined and used.
"""
###############################################################################
# Imports
###############################################################################
import bisect
import errno
import functools
import select
import sys
import time
###############################################################################
# Logging
###############################################################################
import logging
log = logging.getLogger("pants")
###############################################################################
# Time
###############################################################################
# This hack is here because Windows' time() is too imprecise for our needs.
# See issue #40 for further details.
if sys.platform == "win32":
_start_time = time.time()
time.clock() # Initialise the clock.
current_time = lambda: round(_start_time + time.clock(), 2)
else:
current_time = time.time
###############################################################################
# Engine Class
###############################################################################
[docs]class Engine(object):
"""
The asynchronous engine class.
An engine object is responsible for passing I/O events to active
channels and running timers asynchronously. Depending on OS support,
the engine will use either the :py:func:`~select.epoll()`,
:py:func:`~select.kqueue()` or :py:func:`~select.select()` system
call to detect events on active channels. It is possible to force
the engine to use a particular polling method, but this is not
recommended.
Most applications will use the global engine object, which can be
accessed using :meth:`~pants.engine.Engine.instance`, however it is
also possible to create and use multiple instances of
:class:`~pants.engine.Engine` in your application.
An engine can either provide the main loop for your application
(see :meth:`~pants.engine.Engine.start` and
:meth:`~pants.engine.Engine.stop`), or its functionality can be
integrated into a pre-existing main loop (see
:meth:`~pants.engine.Engine.poll`).
========= =========================================================
Argument Description
========= =========================================================
poller *Optional.* A specific polling object for the engine to
use.
========= =========================================================
"""
# Socket events - these correspond to epoll() states.
NONE = 0x00
READ = 0x01
WRITE = 0x04
ERROR = 0x08
HANGUP = 0x10 | 0x2000
BASE_EVENTS = READ | ERROR | HANGUP
ALL_EVENTS = BASE_EVENTS | WRITE
def __init__(self, poller=None):
self.latest_poll_time = current_time()
self._shutdown = False
self._running = False
self._channels = {}
self._poller = None
self._install_poller(poller)
self._callbacks = []
self._deferreds = []
@classmethod
[docs] def instance(cls):
"""
Returns the global engine object.
"""
if not hasattr(cls, "_instance"):
cls._instance = cls()
return cls._instance
##### Engine Methods ######################################################
[docs] def start(self, poll_timeout=0.2):
"""
Start the engine.
Initialises and continuously polls the engine until either
:meth:`~pants.engine.Engine.stop` is called or an uncaught
:obj:`Exception` is raised. :meth:`~pants.engine.Engine.start`
should be called after your asynchronous application has been fully
initialised. For applications with a pre-existing main loop, see
:meth:`~pants.engine.Engine.poll`.
============= ===================================
Argument Description
============= ===================================
poll_timeout *Optional.* The timeout to pass to
:meth:`~pants.engine.Engine.poll`.
============= ===================================
"""
if self._shutdown:
self._shutdown = False
return
if self._running:
return
else:
self._running = True
log.info("Starting engine.")
try:
while not self._shutdown:
self.poll(poll_timeout)
except (KeyboardInterrupt, SystemExit):
raise
except Exception:
log.exception("Uncaught exception in main loop.")
finally:
log.info("Stopping engine.")
self._shutdown = False
self._running = False
[docs] def stop(self):
"""
Stop the engine.
If :meth:`~pants.engine.Engine.start` has been called, calling
:meth:`~pants.engine.Engine.stop` will cause the engine to cease
polling and shut down on the next iteration of the main loop.
"""
if self._running:
self._shutdown = True
[docs] def poll(self, poll_timeout):
"""
Poll the engine.
Updates timers and processes I/O events on all active channels.
If your application has a pre-existing main loop, call
:meth:`~pants.engine.Engine.poll` on each iteration of that
loop, otherwise, see :meth:`~pants.engine.Engine.start`.
============= ============
Argument Description
============= ============
poll_timeout The timeout to be passed to the polling object.
============= ============
"""
self.latest_poll_time = current_time()
callbacks, self._callbacks = self._callbacks[:], []
for timer in callbacks:
try:
timer.function()
except Exception:
log.exception("Exception raised while executing timer.")
if timer.requeue:
self._callbacks.append(timer)
while self._deferreds and self._deferreds[0].end <= self.latest_poll_time:
timer = self._deferreds.pop(0)
try:
timer.function()
except Exception:
log.exception("Exception raised while executing timer.")
if timer.requeue:
timer.end = self.latest_poll_time + timer.delay
bisect.insort(self._deferreds, timer)
if self._shutdown:
return
if self._deferreds:
timeout = self._deferreds[0].end - self.latest_poll_time
if timeout > 0.0:
poll_timeout = max(min(timeout, poll_timeout), 0.01)
if not self._channels:
time.sleep(poll_timeout) # Don't burn CPU.
return
try:
events = self._poller.poll(poll_timeout)
except Exception as err:
if err.args[0] == errno.EINTR:
log.debug("Interrupted system call.")
return
else:
raise
for fileno, events in events.iteritems():
channel = self._channels[fileno]
try:
channel._handle_events(events)
except (KeyboardInterrupt, SystemExit):
raise
except Exception:
log.exception("Error while handling events on %r." % channel)
##### Timer Methods #######################################################
[docs] def callback(self, function, *args, **kwargs):
"""
Schedule a callback.
A callback is a function (or other callable) that is executed
the next time :meth:`~pants.engine.Engine.poll` is called - in
other words, on the next iteration of the main loop.
Returns a callable which can be used to cancel the callback.
========= ============
Argument Description
========= ============
function The callable to be executed when the callback is run.
args The positional arguments to be passed to the callable.
kwargs The keyword arguments to be passed to the callable.
========= ============
"""
callback = functools.partial(function, *args, **kwargs)
timer = _Timer(self, callback, False)
self._callbacks.append(timer)
return timer
[docs] def loop(self, function, *args, **kwargs):
"""
Schedule a loop.
A loop is a callback that is continuously rescheduled. It will
be executed every time :meth:`~pants.engine.Engine.poll` is
called - in other words, on each iteraton of the main loop.
Returns a callable which can be used to cancel the loop.
========= ============
Argument Description
========= ============
function The callable to be executed when the loop is run.
args The positional arguments to be passed to the callable.
kwargs The keyword arguments to be passed to the callable.
========= ============
"""
loop = functools.partial(function, *args, **kwargs)
timer = _Timer(self, loop, True)
self._callbacks.append(timer)
return timer
[docs] def defer(self, delay, function, *args, **kwargs):
"""
Schedule a deferred.
A deferred is a function (or other callable) that is executed
after a certain amount of time has passed.
Returns a callable which can be used to cancel the deferred.
========= =====================================================
Argument Description
========= =====================================================
delay The delay, in seconds, after which the deferred
should be run.
function The callable to be executed when the deferred is run.
args The positional arguments to be passed to the
callable.
kwargs The keyword arguments to be passed to the callable.
========= =====================================================
"""
if delay <= 0:
raise ValueError("Delay must be greater than 0 seconds.")
deferred = functools.partial(function, *args, **kwargs)
timer = _Timer(self, deferred, False, delay, self.latest_poll_time + delay)
bisect.insort(self._deferreds, timer)
return timer
[docs] def cycle(self, interval, function, *args, **kwargs):
"""
Schedule a cycle.
A cycle is a deferred that is continuously rescheduled. It will
be run at regular intervals.
Returns a callable which can be used to cancel the cycle.
========= ============
Argument Description
========= ============
interval The interval, in seconds, at which the cycle should be run.
function The callable to be executed when the cycle is run.
args The positional arguments to be passed to the callable.
kwargs The keyword arguments to be passed to the callable.
========= ============
"""
if interval <= 0:
raise ValueError("Interval must be greater than 0 seconds.")
cycle = functools.partial(function, *args, **kwargs)
timer = _Timer(self, cycle, True, interval, self.latest_poll_time + interval)
bisect.insort(self._deferreds, timer)
return timer
def _remove_timer(self, timer):
"""
Remove a timer from the engine.
========= ============
Argument Description
========= ============
timer The timer to be removed.
========= ============
"""
if timer.end is None:
try:
self._callbacks.remove(timer)
except ValueError:
pass # Callback not present.
else:
try:
self._deferreds.remove(timer)
except ValueError:
pass # Callback not present.
##### Channel Methods #####################################################
def add_channel(self, channel):
"""
Add a channel to the engine.
========= ============
Argument Description
========= ============
channel The channel to be added.
========= ============
"""
self._channels[channel.fileno] = channel
self._poller.add(channel.fileno, channel._events)
def modify_channel(self, channel):
"""
Modify the state of a channel.
========= ============
Argument Description
========= ============
channel The channel to be modified.
========= ============
"""
self._poller.modify(channel.fileno, channel._events)
def remove_channel(self, channel):
"""
Remove a channel from the engine.
========= ============
Argument Description
========= ============
channel The channel to be removed.
========= ============
"""
self._channels.pop(channel.fileno, None)
try:
self._poller.remove(channel.fileno, channel._events)
except (IOError, OSError):
log.exception("Error while removing %r." % channel)
##### Poller Methods ######################################################
def _install_poller(self, poller=None):
"""
Install a poller on the engine.
========= ============
Argument Description
========= ============
poller The poller to be installed.
========= ============
"""
if self._poller is not None:
for fileno, channel in self._channels.iteritems():
self._poller.remove(fileno, channel._events)
if poller is not None:
self._poller = poller
elif hasattr(select, "epoll"):
self._poller = _EPoll()
elif hasattr(select, "kqueue"):
self._poller = _KQueue()
else:
self._poller = _Select()
for fileno, channel in self._channels.iteritems():
self._poller.add(fileno, channel._events)
###############################################################################
# _EPoll Class
###############################################################################
class _EPoll(object):
"""
An :py:func:`~select.epoll`-based poller.
"""
def __init__(self):
self._epoll = select.epoll()
def add(self, fileno, events):
self._epoll.register(fileno, events)
def modify(self, fileno, events):
self._epoll.modify(fileno, events)
def remove(self, fileno, events):
self._epoll.unregister(fileno)
def poll(self, timeout):
return dict(self._epoll.poll(timeout))
###############################################################################
# _KQueue Class
###############################################################################
class _KQueue(object):
"""
A :py:func:`~select.kqueue`-based poller.
"""
MAX_EVENTS = 1024
def __init__(self):
self._events = {}
self._kqueue = select.kqueue()
def add(self, fileno, events):
self._events[fileno] = events
self._control(fileno, events, select.KQ_EV_ADD)
def modify(self, fileno, events):
self.remove(fileno, self._events[fileno])
self.add(fileno, events)
def remove(self, fileno, events):
self._control(fileno, events, select.KQ_EV_DELETE)
self._events.pop(fileno, None)
def poll(self, timeout):
events = {}
kqueue_events = self._kqueue.control(None, _KQueue.MAX_EVENTS, timeout)
for event in kqueue_events:
fileno = event.ident
if event.filter == select.KQ_FILTER_READ:
events[fileno] = events.get(fileno, 0) | Engine.READ
if event.filter == select.KQ_FILTER_WRITE:
events[fileno] = events.get(fileno, 0) | Engine.WRITE
if event.flags & select.KQ_EV_ERROR:
events[fileno] = events.get(fileno, 0) | Engine.ERROR
if event.flags & select.KQ_EV_EOF:
events[fileno] = events.get(fileno, 0) | Engine.HANGUP
return events
def _control(self, fileno, events, flags):
if events & Engine.WRITE:
event = select.kevent(fileno, filter=select.KQ_FILTER_WRITE,
flags=flags)
self._kqueue.control([event], 0)
if events & Engine.READ:
event = select.kevent(fileno, filter=select.KQ_FILTER_READ,
flags=flags)
self._kqueue.control([event], 0)
###############################################################################
# _Select Class
###############################################################################
class _Select(object):
"""
A :py:func:`~select.select`-based poller.
"""
def __init__(self):
self._r = set()
self._w = set()
self._e = set()
def add(self, fileno, events):
if events & Engine.READ:
self._r.add(fileno)
if events & Engine.WRITE:
self._w.add(fileno)
if events & Engine.ERROR:
self._e.add(fileno)
def modify(self, fileno, events):
self.remove(fileno, events)
self.add(fileno, events)
def remove(self, fileno, events):
self._r.discard(fileno)
self._w.discard(fileno)
self._e.discard(fileno)
def poll(self, timeout):
# Note that select() won't raise "hangup" events. There's no way
# around this and no way to determine whether a hangup or an
# error occurred. C'est la vie.
events = {}
r, w, e, = select.select(self._r, self._w, self._e, timeout)
for fileno in r:
events[fileno] = events.get(fileno, 0) | Engine.READ
for fileno in w:
events[fileno] = events.get(fileno, 0) | Engine.WRITE
for fileno in e:
events[fileno] = events.get(fileno, 0) | Engine.ERROR
return events
###############################################################################
# _Timer Class
###############################################################################
class _Timer(object):
"""
A simple class for storing timer information.
========= ======================================================
Argument Description
========= ======================================================
function The callable to be executed when the timer is run.
requeue Whether the timer should be requeued after being run.
delay The time, in seconds, after which the timer should be
run- or None, for a callback/loop.
end The time, in seconds since the epoch, after which the
timer should be run - or None, for a callback/loop.
========= ======================================================
"""
def __init__(self, engine, function, requeue, delay=None, end=None):
self.engine = engine
self.function = function
self.requeue = requeue
self.delay = delay
self.end = end
def __call__(self):
self.cancel()
def __cmp__(self, to):
return cmp(self.end, to.end)
def cancel(self):
self.engine._remove_timer(self)