Source code for pants.stream

###############################################################################
#
# Copyright 2011-2012 Pants Developers (see AUTHORS.txt)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
###############################################################################
"""
Streaming (TCP) connection implementation.

Streams are one of the two main types of channels in Pants - the other
being :mod:`servers <pants.server>`. Streams represent connections
between two endpoints. They may be used for both client and server
applications.

Streams
=======
To write a Pants application you will first need to subclass
:class:`~pants.stream.Stream`. Your :class:`~pants.stream.Stream`
subclass will contain the majority of your networking code in the form
of event handlers. Event handlers are methods beginning with ``on_`` and
can be safely overridden by your subclass.

Connecting
----------
Before a :class:`~pants.stream.Stream` instance can be used, it must
first be connected to a remote host. If you are writing a server
application, all new :class:`~pants.stream.Stream` instance created by
your :class:`~pants.server.Server` will be connected. Once they are
created by the :class:`~pants.server.Server`,
:meth:`~pants.stream.Stream.on_connect` will be called and your
:class:`~pants.engine.Engine` will begin dispatching events to your
:class:`~pants.stream.Stream` instance.

If you are writing a client application, you must first instantiate your
:class:`~pants.stream.Stream` subclass and then use the
:meth:`~pants.stream.Stream.connect` method to connect the channel to a
remote host. Once the connection has been successfully established, the
:meth:`~pants.stream.Stream.on_connect` event handler will be called and
your :class:`~pants.stream.Stream` instance will start receiving events.
Bear in mind that the connection will not be established until the
:class:`~pants.engine.Engine` is running. As such, a common pattern when
writing client applications with Pants is to call
:meth:`~pants.stream.Stream.connect`, start the engine and then put all
other initialization code in :meth:`~pants.stream.Stream.on_connect`.

Writing Data
------------
Once your :class:`~pants.stream.Stream` instance is connected to a
remote host, you can begin to write data to the channel. Use
:meth:`~pants.stream.Stream.write` to write string data to the channel,
:meth:`~pants.stream.Stream.write_file` to efficiently write data from
an open file and :meth:`~pants.stream.Stream.write_packed` to write
packed binary data. As you call these methods, Pants internally buffers
your outgoing data. Once the buffer is completely empty,
:meth:`~pants.stream.Stream.on_write` will be called. Be aware that if
you continuously write data to your :class:`~pants.stream.Stream` that
:meth:`~pants.stream.Stream.on_write` may not be called very frequently.
If you wish to bypass the internal buffering and attempt to write your
data immediately you can use the ``flush`` options present in the three
write methods or call the :meth:`~pants.stream.Stream.flush` method
yourself. This can help to improve your application's responsiveness but
calling it excessively can reduce overall performance. Generally
speaking, it is useful when you know with certainty that you have
finished writing one discrete chunk of data (i.e. an HTTP response).

Reading Data
------------
A connected :class:`~pants.stream.Stream` instance will automatically
receive all incoming data from the remote host. By default, all incoming
data is immediately passed to the :meth:`~pants.stream.Stream.on_read`
event handler for your code to process. The
:attr:`~pants.stream.Stream.read_delimiter` attribute can be used to
control this behaviour by causing Pants to buffer incoming data
internally, only forwarding it to :meth:`~pants.stream.Stream.on_read`
when a particular condition is met. If the condition is never met, the
internal buffer will eventually exceed the allowed
:attr:`~pants.stream.Stream.buffer_size` and the
:meth:`~pants.stream.Stream.on_overflow_error` handler method will be
called. :attr:`~pants.stream.Stream.read_delimiter` is extremely
powerful when used effectively.

Closing
-------
To close a :class:`~pants.stream.Stream` instance, simply call the
:meth:`~pants.stream.Stream.close` method. Once a stream has been closed
it should not be reused.

Handling Errors
---------------
Despite best efforts, errors will occasionally occur in asynchronous
code. Pants handles these errors by passing the resulting exception
object to one of a number of error handler methods. They are:
:meth:`~pants.stream.Stream.on_connect_error`,
:meth:`~pants.stream.Stream.on_overflow_error` and
:meth:`~pants.stream.Stream.on_error`. Additionally, 
:meth:`~pants.stream.Stream.on_ssl_handshake_error` and
:meth:`~pants.stream.Stream.on_ssl_error` exist to handle SSL-specific
errors.

SSL
===
Pants streams have SSL support. If you are writing a server application,
use :meth:`Server.startSSL <pants.server.Server.startSSL>` to enable SSL
on your server. Each :class:`~pants.stream.Stream` created by your
server from that point forward will be SSL-enabled. If you are writing a
client application, call
:meth:`Stream.startSSL <pants.stream.Stream.startSSL>` before calling
:meth:`~pants.stream.Stream.connect`. Alternatively, you can pass a
dictionary of SSL options to the :class:`~pants.stream.Stream`
constructor which will then enable SSL on the instance. When SSL is
enabled on a :class:`~pants.stream.Stream`, an SSL handshake occurs
between the local and remote ends of the connection. Once the SSL
handshake is complete, :meth:`~pants.stream.Stream.on_ssl_handshake`
will be called. If it fails,
:meth:`~pants.stream.Stream.on_ssl_handshake_error` will be called.

If you are writing an SSL-enabled application you should read the
entirety of Python's :mod:`ssl` documentation. Pants does not override
any of Python's SSL defaults unless clearly stated in this documentation.
"""

###############################################################################
# Imports
###############################################################################

import errno
import functools
import os
import re
import socket
import ssl
import struct

from pants._channel import _Channel, HAS_IPV6, sock_type
from pants.engine import Engine


try:
    from netstruct import NetStruct as _NetStruct
except ImportError:
    # Create the fake class because isinstance expects a class.
    class _NetStruct(object):
        def __init__(self, *a, **kw):
            raise NotImplementedError


###############################################################################
# Constants
###############################################################################

RegexType = type(re.compile(""))
Struct = struct.Struct


###############################################################################
# Logging
###############################################################################

import logging
log = logging.getLogger("pants")


###############################################################################
# Stream Class
###############################################################################

[docs]class Stream(_Channel): """ The stream-oriented connection channel. A :class:`~pants.stream.Stream` instance represents either a local connection to a remote server or a remote connection to a local server over a streaming, connection-oriented protocol such as TCP. ================= ================================================ Keyword Argument Description ================= ================================================ engine *Optional.* The engine to which the channel should be added. Defaults to the global engine. socket *Optional.* A pre-existing socket to wrap. This can be a regular :py:obj:`~socket.socket` or an :py:obj:`~ssl.SSLSocket`. If a socket is not provided, a new socket will be created for the channel when required. ssl_options *Optional.* If provided, :meth:`~pants.stream.Stream.startSSL` will be called with these options once the stream is ready. By default, SSL will not be enabled. ================= ================================================ """ SEND_STRING = 0 SEND_FILE = 1 SEND_SSL_HANDSHAKE = 2 def __init__(self, **kwargs): sock = kwargs.get("socket", None) if sock and sock_type(sock) != socket.SOCK_STREAM: raise TypeError("Cannot create a %s with a socket type other than SOCK_STREAM." % self.__class__.__name__) _Channel.__init__(self, **kwargs) # Socket self._remote_address = None self._local_address = None # I/O attributes self._read_delimiter = None self._recv_buffer = "" self._recv_buffer_size_limit = self._buffer_size self._send_buffer = [] # Channel state self.connected = False self.connecting = False self._closing = False # SSL state self.ssl_enabled = False self._ssl_enabling = False self._ssl_socket_wrapped = False self._ssl_handshake_done = False self._ssl_call_on_connect = False if isinstance(kwargs.get("socket", None), ssl.SSLSocket): self._ssl_socket_wrapped = True self.startSSL() elif kwargs.get("ssl_options", None) is not None: self.startSSL(kwargs["ssl_options"]) ##### Properties ########################################################## @property def remote_address(self): """ The remote address to which the channel is connected. By default, this will be the value of ``socket.getpeername`` or None. It is possible for user code to override the default behaviour and set the value of the property manually. In order to return the property to its default behaviour, user code then has to delete the value. Example:: # default behaviour channel.remote_address = custom_value # channel.remote_address will return custom_value now del channel.remote_address # default behaviour """ if self._remote_address is not None: return self._remote_address elif self._socket: try: return self._socket.getpeername() except socket.error: return None else: return None @remote_address.setter def remote_address(self, val): self._remote_address = val @remote_address.deleter def remote_address(self): self._remote_address = None @property def local_address(self): """ The address of the channel on the local machine. By default, this will be the value of ``socket.getsockname`` or None. It is possible for user code to override the default behaviour and set the value of the property manually. In order to return the property to its default behaviour, user code then has to delete the value. Example:: # default behaviour channel.local_address = custom_value # channel.local_address will return custom_value now del channel.local_address # default behaviour """ if self._local_address is not None: return self._local_address elif self._socket: try: return self._socket.getsockname() except socket.error: return None else: return None @local_address.setter def local_address(self, val): self._local_address = val @local_address.deleter def local_address(self): self._local_address = None @property def read_delimiter(self): """ The magical read delimiter which determines how incoming data is buffered by the stream. As data is read from the socket, it is buffered internally by the stream before being passed to the :meth:`on_read` callback. The value of the read delimiter determines when the data is passed to the callback. Valid values are ``None``, a byte string, an integer/long, a compiled regular expression, an instance of :class:`struct.Struct`, or an instance of :class:`netstruct.NetStruct`. When the read delimiter is ``None``, data will be passed to :meth:`on_read` immediately after it is read from the socket. This is the default behaviour. When the read delimiter is a byte string, data will be buffered internally until that string is encountered in the incoming data. All data up to but excluding the read delimiter is then passed to :meth:`on_read`. The segment matching the read delimiter itself is discarded from the buffer. When the read delimiter is an integer or a long, it is treated as the number of bytes to read before passing the data to :meth:`on_read`. When the read delimiter is a :class:`struct.Struct` instance, the Struct's ``size`` is fully buffered and the data is unpacked using the Struct before its sent to :meth:`on_read`. Unlike other types of read delimiters, this can result in more than one argument being passed to :meth:`on_read`, as in the following example:: import struct from pants import Stream class Example(Stream): def on_connect(self): self.read_delimiter = struct.Struct("!LLH") def on_read(self, packet_type, length, id): pass When the read delimiter is an instance of :class:`netstruct.NetStruct`, the NetStruct's :attr:`~netstruct.NetStruct.minimum_size` is buffered and unpacked with the NetStruct, and additional data is buffered as necessary until the NetStruct can be completely unpacked. Once ready, the data will be passed to :meth:`on_read`. Using Struct and NetStruct are *very* similar. When the read delimiter is a compiled regular expression (:class:`re.RegexObject`), there are two possible behaviors that you may switch between by setting the value of :attr:`regex_search`. If :attr:`regex_search` is True, as is the default, the delimiter's :meth:`~re.RegexObject.search` method is used and, if a match is found, the string before that match is passed to :meth:`on_read`. The segment that was matched by the regular expression will be discarded. If :attr:`regex_search` is False, the delimiter's :meth:`~re.RegexObject.match` method is used instead and, if a match is found, the match object itself will be passed to :meth:`on_read`, giving you access to the capture groups. Again, the segment that was matched by the regular expression will be discarded from the buffer. Attempting to set the read delimiter to any other value will raise a :exc:`TypeError`. The effective use of the read delimiter can greatly simplify the implementation of certain protocols. """ return self._read_delimiter @read_delimiter.setter def read_delimiter(self, value): if value is None or isinstance(value, basestring) or \ isinstance(value, RegexType): self._read_delimiter = value self._recv_buffer_size_limit = self._buffer_size elif isinstance(value, (int, long)): self._read_delimiter = value self._recv_buffer_size_limit = max(self._buffer_size, value) elif isinstance(value, Struct): self._read_delimiter = value self._recv_buffer_size_limit = max(self._buffer_size, value.size) elif isinstance(value, _NetStruct): self._read_delimiter = value self._recv_buffer_size_limit = max(self._buffer_size, value.minimum_size) else: raise TypeError("Attempted to set read_delimiter to a value with an invalid type.") # Reset NetStruct state when we change the read delimiter. self._netstruct_iter = None self._netstruct_needed = None # Setting these at the class level makes them easy to override on a # per-class basis. regex_search = True _buffer_size = 2 ** 16 # 64kb @property def buffer_size(self): """ The maximum size, in bytes, of the internal buffer used for incoming data. When buffering data it is important to ensure that inordinate amounts of memory are not used. Setting the buffer size to a sensible value can prevent coding errors or malicious use from causing your application to consume increasingly large amounts of memory. By default, a maximum of 64kb of data will be stored. The buffer size is mainly relevant when using a string value for the :attr:`~pants.stream.Stream.read_delimiter`. Because you cannot guarantee that the string will appear, having an upper limit on the size of the data is appropriate. If the read delimiter is set to a number larger than the buffer size, the buffer size will be increased to accommodate the read delimiter. When the internal buffer's size exceeds the maximum allowed, the :meth:`~pants.stream.Stream.on_overflow_error` callback will be invoked. Attempting to set the buffer size to anything other than an integer or long will raise a :exc:`TypeError`. """ return self._buffer_size @buffer_size.setter def buffer_size(self, value): if not isinstance(value, (long, int)): raise TypeError("buffer_size must be an int or a long") self._buffer_size = value if isinstance(self._read_delimiter, (int, long)): self._recv_buffer_size_limit = max(value, self._read_delimiter) elif isinstance(self._read_delimiter, Struct): self._recv_buffer_size_limit = max(value, self._read_delimiter.size) elif isinstance(self._read_delimiter, _NetStruct): self._recv_buffer_size_limit = max(value, self._read_delimiter.minimum_size) else: self._recv_buffer_size_limit = value ##### Control Methods #####################################################
[docs] def startSSL(self, ssl_options={}): """ Enable SSL on the channel and perform a handshake at the next opportunity. SSL is only enabled on a channel once all currently pending data has been written. If a problem occurs at this stage, :meth:`~pants.stream.Stream.on_ssl_error` is called. Once SSL has been enabled, the SSL handshake begins - this typically takes some time and may fail, in which case :meth:`~pants.stream.Stream.on_ssl_handshake_error` will be called. When the handshake is successfully completed, :meth:`~pants.stream.Stream.on_ssl_handshake` is called and the channel is secure. Typically, this method is called before :meth:`~pants.stream.Stream.connect`. In this case, :meth:`~pants.stream.Stream.on_ssl_handshake` will be called before :meth:`~pants.stream.Stream.on_connect`. If :meth:`~pants.stream.Stream.startSSL` is called after :meth:`~pants.stream.Stream.connect`, the reverse is true. It is possible, although unusual, to start SSL on a channel that is already connected and active. In this case, as noted above, SSL will only be enabled and the handshake performed after all currently pending data has been written. The SSL options argument will be passed through to :func:`ssl.wrap_socket` as keyword arguments - see the :mod:`ssl` documentation for further information. You will typically want to provide the ``keyfile``, ``certfile`` and ``ca_certs`` options. The ``do_handshake_on_connect`` option **must** be ``False``, or a :exc:`ValueError` will be raised. Attempting to enable SSL on a closed channel or a channel that already has SSL enabled on it will raise a :exc:`RuntimeError`. Returns the channel. ============ =================================================== Arguments Description ============ =================================================== ssl_options *Optional.* Keyword arguments to pass to :func:`ssl.wrap_socket`. ============ =================================================== """ if self.ssl_enabled or self._ssl_enabling: raise RuntimeError("startSSL() called on SSL-enabled %r" % self) if self._closed or self._closing: raise RuntimeError("startSSL() called on closed %r" % self) if ssl_options.setdefault("do_handshake_on_connect", False) is not False: raise ValueError("SSL option 'do_handshake_on_connect' must be False.") self._ssl_enabling = True self._send_buffer.append((Stream.SEND_SSL_HANDSHAKE, ssl_options)) if self.connected: self._process_send_buffer() return self
[docs] def connect(self, address): """ Connect the channel to a remote socket. The given ``address`` is resolved and used by the channel to connect to the remote server. If an error occurs at any stage in this process, :meth:`~pants.stream.Stream.on_connect_error` is called. When a connection is successfully established, :meth:`~pants.stream.Stream.on_connect` is called. Addresses can be represented in a number of different ways. A single string is treated as a UNIX address. A single integer is treated as a port and converted to a 2-tuple of the form ``('', port)``. A 2-tuple is treated as an IPv4 address and a 4-tuple is treated as an IPv6 address. See the :mod:`socket` documentation for further information on socket addresses. If no socket exists on the channel, one will be created with a socket family appropriate for the given address. An error will occur during the connection if the given address is not of a valid format or of an inappropriate format for the socket (e.g. if an IP address is given to a UNIX socket). Calling :meth:`connect()` on a closed channel or a channel that is already connected will raise a :exc:`RuntimeError`. Returns the channel. =============== =============================================== Arguments Description =============== =============================================== address The remote address to connect to. =============== =============================================== """ if self.connected or self.connecting: raise RuntimeError("connect() called on active %r." % self) if self._closed or self._closing: raise RuntimeError("connect() called on closed %r." % self) self.connecting = True address, family, resolved = self._format_address(address) if resolved: self._do_connect(address, family) else: try: result = socket.getaddrinfo(address[0], address[1], family) except socket.error as err: self.close(flush=False) e = StreamConnectError(err.errno, err.strerror) self._safely_call(self.on_connect_error, e) return self # We only care about the first result. result = result[0] self._do_connect(result[-1], result[0]) return self
[docs] def close(self, flush=True): """ Close the channel. """ if self._closed: return if flush and self._send_buffer: self._closing = True return self.read_delimiter = None self._recv_buffer = "" self._send_buffer = [] self.connected = False self.connecting = False self.ssl_enabled = False self._ssl_enabling = False self._ssl_socket_wrapped = False self._ssl_handshake_done = False self._ssl_call_on_connect = False self._safely_call(self.on_close) self._remote_address = None self._local_address = None _Channel.close(self) self._closing = False
##### I/O Methods #########################################################
[docs] def write(self, data, flush=False): """ Write data to the channel. Data will not be written immediately, but will be buffered internally until it can be sent without blocking the process. Calling :meth:`write()` on a closed or disconnected channel will raise a :exc:`RuntimeError`. ========== =================================================== Arguments Description ========== =================================================== data A string of data to write to the channel. flush *Optional.* If True, flush the internal write buffer. See :meth:`~pants.stream.Stream.flush` for details. ========== =================================================== """ if self._closed or self._closing: raise RuntimeError("write() called on closed %r." % self) if not self.connected: raise RuntimeError("write() called on disconnected %r." % self) if self._send_buffer and self._send_buffer[-1][0] == Stream.SEND_STRING: data_type, existing_data = self._send_buffer.pop(-1) data = existing_data + data self._send_buffer.append((Stream.SEND_STRING, data)) if flush: self._process_send_buffer() else: self._start_waiting_for_write_event()
[docs] def write_file(self, sfile, nbytes=0, offset=0, flush=False): """ Write a file to the channel. The file will not be written immediately, but will be buffered internally until it can be sent without blocking the process. Calling :meth:`write_file()` on a closed or disconnected channel will raise a :exc:`RuntimeError`. ========== ==================================================== Arguments Description ========== ==================================================== sfile A file object to write to the channel. nbytes *Optional.* The number of bytes of the file to write. If 0, all bytes will be written. offset *Optional.* The number of bytes to offset writing by. flush *Optional.* If True, flush the internal write buffer. See :meth:`~pants.stream.Stream.flush` for details. ========== ==================================================== """ if self._closed or self._closing: raise RuntimeError("write_file() called on closed %r." % self) if not self.connected: raise RuntimeError("write_file() called on disconnected %r." % self) self._send_buffer.append((Stream.SEND_FILE, (sfile, offset, nbytes))) if flush: self._process_send_buffer() else: self._start_waiting_for_write_event()
[docs] def write_packed(self, *data, **kwargs): """ Write packed binary data to the channel. If the current :attr:`read_delimiter` is an instance of :class:`struct.Struct` or :class:`netstruct.NetStruct` the format will be read from that Struct, otherwise you will need to provide a ``format``. ========== ==================================================== Argument Description ========== ==================================================== \*data Any number of values to be passed through :mod:`struct` and written to the remote host. flush *Optional.* If True, flush the internal write buffer. See :meth:`~pants.stream.Stream.flush` for details. format *Optional.* A formatting string to pack the provided data with. If one isn't provided, the read delimiter will be used. ========== ==================================================== """ format = kwargs.get("format") if format: self.write(struct.pack(format, *data), kwargs.get("flush", False)) elif not isinstance(self._read_delimiter, (Struct, _NetStruct)): raise ValueError("No format is available for writing packed data.") else: self.write(self._read_delimiter.pack(*data), kwargs.get("flush", False))
[docs] def flush(self): """ Attempt to immediately write any internally buffered data to the channel without waiting for a write event. This method can be fairly expensive to call and should be used sparingly. Calling :meth:`flush()` on a closed or disconnected channel will raise a :exc:`RuntimeError`. """ if self._closed or self._closing: raise RuntimeError("flush() called on closed %r." % self) if not self.connected: raise RuntimeError("flush() called on disconnected %r." % self) if not self._send_buffer: return self._stop_waiting_for_write_event() self._process_send_buffer()
##### Public Event Handlers ###############################################
[docs] def on_ssl_handshake(self): """ Placeholder. Called after the channel has finished its SSL handshake. """ pass
##### Public Error Handlers ###############################################
[docs] def on_ssl_handshake_error(self, exception): """ Placeholder. Called when an error occurs during the SSL handshake. By default, logs the exception and closes the channel. ========== ============ Argument Description ========== ============ exception The exception that was raised. ========== ============ """ log.exception(exception) self.close(flush=False)
[docs] def on_ssl_error(self, exception): """ Placeholder. Called when an error occurs in the underlying SSL implementation. By default, logs the exception and closes the channel. ========== ============ Argument Description ========== ============ exception The exception that was raised. ========== ============ """ log.exception(exception) self.close(flush=False)
##### Internal Methods #################################################### def _do_connect(self, address, family, error=None): """ A callback method to be used with :meth:`~pants._channel._Channel._resolve_addr` - either connects immediately or notifies the user of an error. ========= ===================================================== Argument Description ========= ===================================================== address The address to connect to or None if address resolution failed. family The detected socket family or None if address resolution failed. error *Optional.* Error information or None if no error occurred. ========= ===================================================== """ if not address: self.connecting = False e = StreamConnectError(*error) self._safely_call(self.on_connect_error, e) return if self._socket: if self._socket.family != family: self.engine.remove_channel(self) self._socket_close() self._closed = False sock = socket.socket(family, socket.SOCK_STREAM) self._socket_set(sock) self.engine.add_channel(self) try: connected = self._socket_connect(address) except socket.error as err: self.close(flush=False) e = StreamConnectError(err.errno, err.strerror) self._safely_call(self.on_connect_error, e) return if connected: self._handle_connect_event() ##### Internal Event Handler Methods ###################################### def _handle_read_event(self): """ Handle a read event raised on the channel. """ if self.ssl_enabled and not self._ssl_handshake_done: self._ssl_do_handshake() return while True: try: data = self._socket_recv() except socket.error as err: self._safely_call(self.on_read_error, err) return if not data: break else: self._recv_buffer += data if len(self._recv_buffer) > self._recv_buffer_size_limit: # Try processing the buffer to reduce its length. self._process_recv_buffer() # If the buffer's still too long, overflow error. if len(self._recv_buffer) > self._recv_buffer_size_limit: e = StreamBufferOverflow("Buffer length exceeded upper limit on %r." % self) self._safely_call(self.on_overflow_error, e) return self._process_recv_buffer() # This block was moved out of the above loop to address issue #41. if data is None: self.close(flush=False) def _handle_write_event(self): """ Handle a write event raised on the channel. """ if self.ssl_enabled and not self._ssl_handshake_done: self._ssl_do_handshake() return if not self.connected: self._handle_connect_event() if not self._send_buffer: return self._process_send_buffer() def _handle_error_event(self): """ Handle an error event raised on the channel. """ if self.connecting: # That's no moon... self._handle_connect_event() else: _Channel._handle_error_event(self) def _handle_connect_event(self): """ Handle a connect event raised on the channel. """ self.connecting = False err, errstr = self._get_socket_error() if err == 0: self.connected = True if self._ssl_enabling: self._ssl_call_on_connect = True self._process_send_buffer() else: self._safely_call(self.on_connect) else: # ... it's a space station! e = StreamConnectError(err, errstr) self._safely_call(self.on_connect_error, e) ##### Internal Processing Methods ######################################### def _process_recv_buffer(self): """ Process the :attr:`~pants.stream.Stream._recv_buffer`, passing chunks of data to :meth:`~pants.stream.Stream.on_read`. """ while self._recv_buffer: delimiter = self.read_delimiter if delimiter is None: data = self._recv_buffer self._recv_buffer = "" self._safely_call(self.on_read, data) elif isinstance(delimiter, (int, long)): if len(self._recv_buffer) < delimiter: break data = self._recv_buffer[:delimiter] self._recv_buffer = self._recv_buffer[delimiter:] self._safely_call(self.on_read, data) elif isinstance(delimiter, basestring): mark = self._recv_buffer.find(delimiter) if mark == -1: break data = self._recv_buffer[:mark] self._recv_buffer = self._recv_buffer[mark + len(delimiter):] self._safely_call(self.on_read, data) elif isinstance(delimiter, Struct): if len(self._recv_buffer) < delimiter.size: break data = self._recv_buffer[:delimiter.size] self._recv_buffer = self._recv_buffer[delimiter.size:] # Safely unpack it. This should *probably* never error. try: data = delimiter.unpack(data) except struct.error: log.exception("Unable to unpack data on %r." % self) self.close() break # Unlike most on_read calls, this one sends every variable of # the parsed data as its own argument. self._safely_call(self.on_read, *data) elif isinstance(delimiter, _NetStruct): if not self._netstruct_iter: # We need to get started. self._netstruct_iter = delimiter.iter_unpack() self._netstruct_needed = next(self._netstruct_iter) if len(self._recv_buffer) < self._netstruct_needed: break data = self._netstruct_iter.send( self._recv_buffer[:self._netstruct_needed]) self._recv_buffer = self._recv_buffer[self._netstruct_needed:] if isinstance(data, (int,long)): self._netstruct_needed = data continue # Still here? Then we've got our object. Delete the NetStruct # state and send the data. self._netstruct_needed = None self._netstruct_iter = None self._safely_call(self.on_read, *data) elif isinstance(delimiter, RegexType): # Depending on regex_search, we could do this two ways. if self.regex_search: match = delimiter.search(self._recv_buffer) if not match: break data = self._recv_buffer[:match.start()] self._recv_buffer = self._recv_buffer[match.end():] else: # Require the match to be at the beginning. data = delimiter.match(self._recv_buffer) if not data: break self._recv_buffer = self._recv_buffer[data.end():] # Send either the string or the match object. self._safely_call(self.on_read, data) else: # The safeguards in the read delimiter property should # prevent this from happening unless people start # getting too crafty for their own good. err = InvalidReadDelimiterError("Invalid read delimiter on %r." % self) self._safely_call(self.on_error, err) break if self._closed or not self.connected: break def _process_send_buffer(self): """ Process the :attr:`~pants.stream.Stream._send_buffer`, passing outgoing data to :meth:`~pants._channel._Channel._socket_send` or :meth:`~pants._channel._Channel._socket_sendfile` and calling :meth:`~pants.stream.Stream.on_write` when sending has finished. """ while self._send_buffer: data_type, data = self._send_buffer.pop(0) if data_type == Stream.SEND_STRING: bytes_sent = self._process_send_string(data) elif data_type == Stream.SEND_FILE: bytes_sent = self._process_send_file(*data) elif data_type == Stream.SEND_SSL_HANDSHAKE: bytes_sent = self._process_send_ssl_handshake(data) if bytes_sent == 0: break if not self._closed and not self._send_buffer: self._safely_call(self.on_write) if self._closing: self.close(flush=False) def _process_send_string(self, data): """ Send data from a string to the remote socket. """ try: bytes_sent = self._socket_send(data) except socket.error as err: self._safely_call(self.on_write_error, err) return 0 if len(data) > bytes_sent: self._send_buffer.insert(0, (Stream.SEND_STRING, data[bytes_sent:])) return bytes_sent def _process_send_file(self, sfile, offset, nbytes): """ Send data from a file to the remote socket. """ try: bytes_sent = self._socket_sendfile(sfile, offset, nbytes) except socket.error as err: self._safely_call(self.on_write_error, err) return 0 offset += bytes_sent if nbytes > 0: if nbytes - bytes_sent > 0: nbytes -= bytes_sent else: # Reached the end of the segment. return bytes_sent # TODO This is awful. Find a better way. if os.fstat(sfile.fileno()).st_size - offset <= 0: # Reached the end of the file. return bytes_sent self._send_buffer.insert(0, (Stream.SEND_FILE, (sfile, offset, nbytes))) return bytes_sent def _process_send_ssl_handshake(self, ssl_options): """ Enable SSL and begin the handshake. """ self._ssl_enabling = False if not self._ssl_socket_wrapped: try: self._socket = ssl.wrap_socket(self._socket, **ssl_options) except ssl.SSLError as err: self._ssl_enabling = True self._safely_call(self.on_ssl_error, err) return 0 else: self._ssl_socket_wrapped = True self.ssl_enabled = True try: bytes_sent = self._ssl_do_handshake() except Exception as err: self._safely_call(self.on_ssl_handshake_error, err) return 0 # Unlike strings and files, the SSL handshake is not re-added to # the queue. This is because the stream's state has been # modified and the handshake will continue until it's complete. return bytes_sent ##### SSL Implementation ################################################## def _socket_recv(self): """ Receive data from the socket. Returns a string of data read from the socket. The data is None if the socket has been closed. Overrides :meth:`pants._channel._Channel._socket_recv` to handle SSL-specific behaviour. """ try: return _Channel._socket_recv(self) except ssl.SSLError as err: if err.args[0] == ssl.SSL_ERROR_WANT_READ: return '' else: raise def _socket_send(self, data): """ Send data to the socket. Returns the number of bytes that were sent to the socket. Overrides :meth:`pants._channel._Channel._socket_send` to handle SSL-specific behaviour. ========= ============ Argument Description ========= ============ data The string of data to send. ========= ============ """ try: bytes_sent = _Channel._socket_send(self, data) except ssl.SSLError as err: if err.args[0] == ssl.SSL_ERROR_WANT_WRITE: self._start_waiting_for_write_event() return 0 else: raise # SSLSocket.send() can return 0 rather than raise an exception # if it needs a write event. if self.ssl_enabled and bytes_sent == 0: self._start_waiting_for_write_event() return bytes_sent def _socket_sendfile(self, sfile, offset, nbytes): """ Send data from a file to a remote socket. Returns the number of bytes that were sent to the socket. Overrides :meth:`pants._channel._Channel._socket_sendfile` to handle SSL-specific behaviour. ========= ============ Argument Description ========= ============ sfile The file to send. offset The number of bytes to offset writing by. nbytes The number of bytes of the file to write. If 0, all bytes will be written. ========= ============ """ return _Channel._socket_sendfile(self, sfile, offset, nbytes, self.ssl_enabled) def _ssl_do_handshake(self): """ Perform an asynchronous SSL handshake. """ try: self._socket.do_handshake() except ssl.SSLError as err: if err.args[0] == ssl.SSL_ERROR_WANT_READ: return 0 elif err.args[0] == ssl.SSL_ERROR_WANT_WRITE: self._start_waiting_for_write_event() return 0 elif err.args[0] in (ssl.SSL_ERROR_EOF, ssl.SSL_ERROR_ZERO_RETURN): self.close(flush=False) return 0 elif err.args[0] == ssl.SSL_ERROR_SSL: self._safely_call(self.on_ssl_handshake_error, err) return 0 else: raise except socket.error as err: if err.args[0] in (errno.ECONNRESET, errno.EPIPE): self.close(flush=False) return 0 else: raise else: self._ssl_handshake_done = True self._safely_call(self.on_ssl_handshake) if self._ssl_call_on_connect: self._safely_call(self.on_connect) return None
############################################################################### # Exceptions ############################################################################### class StreamBufferOverflow(Exception): """ Raised when a stream's internal buffer has exceeded its maximum allowed size. """ pass class StreamConnectError(Exception): """ Raised when an error has occurred during an attempt to connect a stream to a remote host. """ pass class InvalidReadDelimiterError(Exception): """ Raised when a channel tries to process incoming data with an invalid read delimiter. """ pass