Source code for pants.server

###############################################################################
#
# Copyright 2011-2012 Pants Developers (see AUTHORS.txt)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
###############################################################################
"""
Streaming (TCP) server implementation.

Servers are one of the two main types of channels in Pants - the other
being :mod:`streams <pants.stream>`. Servers listen for connections to
your application, accept those connections and allow you to handle them
easily. Pants servers support SSL and IPv6.

Servers
=======
Writing Servers
---------------
You have two choices when writing a server application: either use
Pants' default :class:`~pants.server.Server` class without modification
or subclass :class:`~pants.server.Server` in order to implement custom
behaviour.

Pants' default :class:`~pants.server.Server` class will wrap every new
connection in an instance of a connection class which you provide (see
below). In most cases, this provides you with sufficient freedom to
implement your application logic and has the added benefit of
simplicity. To use the default server, simply instantiate 
:class:`~pants.server.Server` and pass your connection class to the
constructor.

If you need to implement custom server behaviour, you can subclass
:class:`~pants.server.Server` and define your connection class as a
class attribute::

    class MyServer(pants.Server):
        ConnectionClass = MyConnectionClass

It is recommended that you use the default :class:`~pants.server.Server`
class where possible and try to implement your application logic in your
connection class.

Connection Classes
------------------
A connection class is a subclass of :class:`~pants.stream.Stream` which
a server will use to wrap each incoming connection. Every time a new
connection is made to the server, a new instance of your connection
class will be created to handle it. You can override the various event
handler methods of :class:`~pants.stream.Stream` to implement your
application's logic.

Running Servers
---------------
Having defined your connection class and instantiated your server, you
can start it listening for new connections with the
:meth:`~pants.server.Server.listen` method. This will bind the server
to your chosen address and once the :mod:`~pants.engine` is started,
the server will begin accepting new connections. Once the server has
started listening for connections it can be stopped using the
:meth:`~pants.server.Server.close` method. When
:meth:`~pants.server.Server.close` is called, the default server 
implementation will close any connections that were made to it which are
still open.

SSL
===
Pants servers have SSL support. If you want to start an SSL-enabled
server, call the :meth:`~pants.server.Server.startSSL` method before
calling the :meth:`~pants.server.Server.listen` method. When you call
:meth:`~pants.server.Server.startSSL` you must provide a dictionary of
SSL options as detailed in the method documentation. It is also
possible to pass the SSL options dictionary directly to the
:class:`~pants.server.Server` constructor in order to enable SSL.
Here is an example of how you might start an SSL-enabled server::

    server = pants.Server(MyConnectionClass)
    server.startSSL({
        'certfile': '/home/user/certfile.pem',
        'keyfile': '/home/user/keyfile.pem'
        })
    server.listen(('', 8080))

If you are writing an SSL-enabled application you should read the
entirety of Python's :mod:`ssl` documentation. Pants does not override
any of Python's SSL defaults unless clearly stated in this documentation.
"""

###############################################################################
# Imports
###############################################################################

import socket
import ssl
import weakref

from pants._channel import _Channel, HAS_IPV6, sock_type
from pants.stream import Stream


###############################################################################
# Logging
###############################################################################

import logging
log = logging.getLogger("pants")


###############################################################################
# Server Class
###############################################################################

[docs]class Server(_Channel): """ A stream-oriented server channel. A :class:`~pants.server.Server` instance represents a local server capable of listening for connections from remote hosts over a connection-oriented protocol such as TCP/IP. ================= ================================================ Keyword Argument Description ================= ================================================ engine *Optional.* The engine to which the channel should be added. Defaults to the global engine. socket *Optional.* A pre-existing socket to wrap. This can be a regular :py:obj:`~socket.socket` or an :py:obj:`~ssl.SSLSocket`. If a socket is not provided, a new socket will be created for the channel when required. ssl_options *Optional.* If provided, :meth:`~pants.server.Server.startSSL` will be called with these options once the server is ready. By default, SSL will not be enabled. ================= ================================================ """ ConnectionClass = Stream def __init__(self, ConnectionClass=None, **kwargs): sock = kwargs.get("socket", None) if sock and sock_type(sock) != socket.SOCK_STREAM: raise TypeError("Cannot create a %s with a socket type other than SOCK_STREAM." % self.__class__.__name__) _Channel.__init__(self, **kwargs) # Socket self._remote_address = None self._local_address = None self._slave = None # Channel state self.listening = False # SSL state self.ssl_enabled = False self._ssl_options = None if kwargs.get("ssl_options", None) is not None: self.startSSL(kwargs["ssl_options"]) # Connection class if ConnectionClass is not None: self.ConnectionClass = ConnectionClass self.channels = weakref.WeakValueDictionary() ##### Properties ########################################################## @property def remote_address(self): """ """ return self._remote_address or self._socket.getpeername() @remote_address.setter def remote_address(self, val): self._remote_address = val @property def local_address(self): """ """ return self._local_address or self._socket.getsockname() @local_address.setter def local_address(self, val): self._local_address = val ##### Control Methods #####################################################
[docs] def startSSL(self, ssl_options={}): """ Enable SSL on the channel. Enabling SSL on a server channel will cause any new connections accepted by the server to be automatically wrapped in an SSL context before being passed to :meth:`~pants.server.Server.on_accept`. If an error occurs while a new connection is being wrapped, :meth:`~pants.server.Server.on_ssl_wrap_error` is called. SSL is enabled immediately. Typically, this method is called before :meth:`~pants.server.Server.listen`. If it is called afterwards, any connections made in the meantime will not have been wrapped in SSL contexts. The SSL options argument will be passed through to each invocation of :func:`ssl.wrap_socket` as keyword arguments - see the :mod:`ssl` documentation for further information. You will typically want to provide the ``keyfile``, ``certfile`` and ``ca_certs`` options. The ``do_handshake_on_connect`` option **must** be ``False`` and the ``server_side`` option **must** be true, or a :exc:`ValueError` will be raised. Attempting to enable SSL on a closed channel or a channel that already has SSL enabled on it will raise a :exc:`RuntimeError`. Returns the channel. ============ =================================================== Arguments Description ============ =================================================== ssl_options *Optional.* Keyword arguments to pass to :func:`ssl.wrap_socket`. ============ =================================================== """ if self.ssl_enabled: raise RuntimeError("startSSL() called on SSL-enabled %r." % self) if self._closed: raise RuntimeError("startSSL() called on closed %r." % self) if ssl_options.setdefault("server_side", True) is not True: raise ValueError("SSL option 'server_side' must be True.") if ssl_options.setdefault("do_handshake_on_connect", False) is not False: raise ValueError("SSL option 'do_handshake_on_connect' must be False.") self.ssl_enabled = True self._ssl_options = ssl_options return self
[docs] def listen(self, address, backlog=1024, slave=True): """ Begin listening for connections made to the channel. The given ``address`` is resolved, the channel is bound to the address and then begins listening for connections. Once the channel has begun listening, :meth:`~pants.server.Server.on_listen` will be called. Addresses can be represented in a number of different ways. A single string is treated as a UNIX address. A single integer is treated as a port and converted to a 2-tuple of the form ``('', port)``. A 2-tuple is treated as an IPv4 address and a 4-tuple is treated as an IPv6 address. See the :mod:`socket` documentation for further information on socket addresses. If no socket exists on the channel, one will be created with a socket family appropriate for the given address. An error will occur if the given address is not of a valid format or of an inappropriate format for the socket (e.g. if an IP address is given to a UNIX socket). Calling :meth:`listen()` on a closed channel or a channel that is already listening will raise a :exc:`RuntimeError`. Returns the channel. =============== ================================================ Arguments Description =============== ================================================ address The local address to listen for connections on. backlog *Optional.* The maximum size of the connection queue. slave *Optional.* If True, this will cause a Server listening on IPv6 INADDR_ANY to create a slave Server that listens on the IPv4 INADDR_ANY. =============== ================================================ """ if self.listening: raise RuntimeError("listen() called on active %r." % self) if self._closed: raise RuntimeError("listen() called on closed %r." % self) address, family, resolved = self._format_address(address) if not family: raise ValueError("Unable to determine address family from " "address: %s" % repr(address)) self._do_listen(address, family, backlog, slave) return self
[docs] def close(self): """ Close the channel. The channel will be closed immediately and will cease to accept new connections. Any connections accepted by this channel will remain open and will need to be closed separately. If this channel has an IPv4 slave (see :meth:`~pants.server.Server.listen`) it will be closed. Once closed, a channel cannot be re-opened. """ if self._closed: return self.listening = False self.ssl_enabled = False if self._slave: self._slave.close() self._safely_call(self.on_close) self._remote_address = None self._local_address = None _Channel.close(self)
##### Public Event Handlers ###############################################
[docs] def on_accept(self, socket, addr): """ Called after the channel has accepted a new connection. Create a new instance of :attr:`~pants.server.Server.ConnectionClass` to wrap the socket and add it to the server. ========= ============ Argument Description ========= ============ sock The newly connected socket object. addr The new socket's address. ========= ============ """ connection = self.ConnectionClass(engine=self.engine, socket=socket) connection.server = self self.channels[connection.fileno] = connection connection._handle_connect_event()
[docs] def on_close(self): """ Called after the channel has finished closing. Close all active connections to the server. """ for channel in self.channels.values(): channel.close(flush=False)
##### Public Error Handlers ###############################################
[docs] def on_ssl_wrap_error(self, sock, addr, exception): """ Placeholder. Called when an error occurs while wrapping a new connection with an SSL context. By default, logs the exception and closes the new connection. ========== ============ Argument Description ========== ============ sock The newly connected socket object. addr The new socket's address. exception The exception that was raised. ========== ============ """ log.exception(exception) try: sock.close() except socket.error: pass
##### Internal Methods #################################################### def _do_listen(self, addr, family, backlog, slave): """ A callback method to be used with :meth:`~pants._channel._Channel._resolve_addr` - either listens immediately or notifies the user of an error. ========= ===================================================== Argument Description ========= ===================================================== backlog The maximum size of the connection queue. slave If True, this will cause a Server listening on IPv6 INADDR_ANY to create a slave Server that listens on the IPv4 INADDR_ANY. addr The address to listen on or None if address resolution failed. family The detected socket family or None if address resolution failed. error *Optional.* Error information or None if no error occurred. ========= ===================================================== """ if self._socket: if self._socket.family != family: self.engine.remove_channel(self) self._socket_close() self._closed = False sock = socket.socket(family, socket.SOCK_STREAM) self._socket_set(sock) self.engine.add_channel(self) try: self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) except AttributeError: pass if hasattr(socket, "IPPROTO_IPV6") and hasattr(socket, "IPV6_V6ONLY")\ and family == socket.AF_INET6: self._socket.setsockopt(socket.IPPROTO_IPV6, socket.IPV6_V6ONLY, 0) slave = False try: self._socket_bind(addr) self._socket_listen(backlog) except socket.error as err: self.close() raise self.listening = True self._safely_call(self.on_listen) if slave and not isinstance(addr, str) and addr[0] == '' and HAS_IPV6: # Silently fail if we can't make a slave. try: self._slave = _SlaveServer(self.engine, self, addr, backlog) except Exception: self._slave = None ##### Internal Event Handler Methods ###################################### def _handle_read_event(self): """ Handle a read event raised on the channel. """ while True: try: sock, addr = self._socket_accept() except socket.error: log.exception("Exception raised by accept() on %r." % self) try: sock.close() except socket.error: pass return if sock is None: return if self.ssl_enabled: try: sock.setblocking(False) sock = ssl.wrap_socket(sock, **self._ssl_options) except ssl.SSLError as e: self._safely_call(self.on_ssl_wrap_error, sock, addr, e) continue self._safely_call(self.on_accept, sock, addr) def _handle_write_event(self): """ Handle a write event raised on the channel. """ log.warning("Received write event for %r." % self)
############################################################################### # _SlaveServer Class ############################################################################### class _SlaveServer(Server): """ A slave for a StreamServer to allow listening on multiple address families. """ def __init__(self, engine, server, addr, backlog): Server.__init__(self, engine=engine) self.server = server # Now, listen our way. if server._socket.family == socket.AF_INET6: family = socket.AF_INET else: family = socket.AF_INET6 sock = socket.socket(family, socket.SOCK_STREAM) self._socket_set(sock) self.engine.add_channel(self) try: self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self._socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) except AttributeError: pass try: self._socket_bind(addr) self._socket_listen(backlog) except socket.error as err: self.close() raise self._remote_address = None self._local_address = None self.listening = True self.on_accept = self.server.on_accept def on_close(self): if self.server._slave == self: self.server._slave = None