###############################################################################
#
# Copyright 2011-2012 Pants Developers (see AUTHORS.txt)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
###############################################################################
###############################################################################
# Imports
###############################################################################
from pants.engine import Engine
try:
from PySide.QtCore import QCoreApplication, QSocketNotifier, QTimer
except ImportError:
from PyQt.QtCore import QCoreApplication, QSocketNotifier, QTimer
###############################################################################
# _Qt Class
###############################################################################
class _Qt(object):
"""
A QSocketNotifier-based polling object.
This caches events, waiting for the next call to poll, which should be
called every loop by a QTimer.
"""
def __init__(self):
self._r = {}
self._w = {}
self._e = {}
self._readable = set()
self._writable = set()
self._errored = set()
def _read_event(self, fileno):
self._readable.add(fileno)
try:
self._r[fileno].setEnabled(False)
except KeyError:
pass
timer.setInterval(0)
def _write_event(self, fileno):
self._writable.add(fileno)
try:
self._w[fileno].setEnabled(False)
except KeyError:
pass
timer.setInterval(0)
def _error_event(self, fileno):
self._errored.add(fileno)
try:
self._e[fileno].setEnabled(False)
except KeyError:
pass
timer.setInterval(0)
def add(self, fileno, events):
if events & Engine.READ:
self._r[fileno] = qs = QSocketNotifier(fileno,
QSocketNotifier.Type.Read)
qs.activated.connect(self._read_event)
if events & Engine.WRITE:
self._w[fileno] = qs = QSocketNotifier(fileno,
QSocketNotifier.Type.Write)
qs.activated.connect(self._write_event)
if events & Engine.ERROR:
self._e[fileno] = qs = QSocketNotifier(fileno,
QSocketNotifier.Type.Exception)
qs.activated.connect(self._error_event)
def modify(self, fileno, events):
if events & Engine.READ and not fileno in self._r:
self._r[fileno] = qs = QSocketNotifier(fileno,
QSocketNotifier.Type.Read)
qs.activated.connect(self._read_event)
elif not events & Engine.READ and fileno in self._r:
self._r[fileno].setEnabled(False)
del self._r[fileno]
if events & Engine.WRITE and not fileno in self._w:
self._w[fileno] = qs = QSocketNotifier(fileno,
QSocketNotifier.Type.Write)
qs.activated.connect(self._write_event)
elif not events & Engine.WRITE and fileno in self._w:
self._w[fileno].setEnabled(False)
del self._w[fileno]
if events & Engine.ERROR and not fileno in self._e:
self._e[fileno] = qs = QSocketNotifier(fileno,
QSocketNotifier.Type.Exception)
qs.activated.connect(self._error_event)
elif not events & Engine.ERROR and fileno in self._e:
self._e[fileno].setEnabled(False)
del self._e[fileno]
def remove(self, fileno, events):
if fileno in self._r:
self._r[fileno].setEnabled(False)
del self._r[fileno]
if fileno in self._w:
self._w[fileno].setEnabled(False)
del self._w[fileno]
if fileno in self._e:
self._e[fileno].setEnabled(False)
del self._e[fileno]
def poll(self, timeout):
events = {}
for fileno in self._readable:
events[fileno] = events.get(fileno, 0) | Engine.READ
if fileno in self._r:
self._r[fileno].setEnabled(True)
for fileno in self._writable:
events[fileno] = events.get(fileno, 0) | Engine.WRITE
if fileno in self._w:
self._w[fileno].setEnabled(True)
for fileno in self._errored:
events[fileno] = events.get(fileno, 0) | Engine.ERROR
if fileno in self._e:
self._e[fileno].setEnabled(True)
self._readable.clear()
self._writable.clear()
self._errored.clear()
return events
def do_poll():
"""
Here, we run the Pants event loop. Then, we set the timer interval, either
to the provided timeout, or for how long it would take to reach the
earliest deferred event.
"""
_engine.poll(0)
if _engine._deferreds:
timer.setInterval(min(1000 * (_engine._deferreds[0].end - _engine.latest_poll_time), _timeout))
else:
timer.setInterval(_timeout)
###############################################################################
# Installation Function
###############################################################################
timer = None
_timeout = 0.02
_engine = None
[docs]def install(app=None, timeout=0.02, engine=None):
"""
Creates a :class:`~PySide.QtCore.QTimer` instance that will be triggered
continuously to call :func:`Engine.poll() <pants.engine.Engine.poll>`,
ensuring that Pants remains responsive.
========= ======== ============
Argument Default Description
========= ======== ============
app None *Optional.* The :class:`~PySide.QtCore.QCoreApplication` to attach to. If no application is provided, it will attempt to find an existing application in memory, or, failing that, create a new application instance.
timeout ``0.02`` *Optional.* The maximum time to wait, in seconds, before running :func:`Engine.poll() <pants.engine.Engine.poll>`.
engine *Optional.* The :class:`pants.engine.Engine` instance to use.
========= ======== ============
"""
global timer
global _timeout
global _engine
_engine = engine or Engine.instance()
_engine._install_poller(_Qt())
if app is None:
app = QCoreApplication.instance()
if app is None:
app = QCoreApplication([])
_timeout = timeout * 1000
timer = QTimer(app)
timer.timeout.connect(do_poll)
timer.start(_timeout)