pants.stream

Streaming (TCP) connection implementation.

Streams are one of the two main types of channels in Pants - the other being servers. Streams represent connections between two endpoints. They may be used for both client and server applications.

Streams

To write a Pants application you will first need to subclass Stream. Your Stream subclass will contain the majority of your networking code in the form of event handlers. Event handlers are methods beginning with on_ and can be safely overridden by your subclass.

Connecting

Before a Stream instance can be used, it must first be connected to a remote host. If you are writing a server application, all new Stream instance created by your Server will be connected. Once they are created by the Server, on_connect() will be called and your Engine will begin dispatching events to your Stream instance.

If you are writing a client application, you must first instantiate your Stream subclass and then use the connect() method to connect the channel to a remote host. Once the connection has been successfully established, the on_connect() event handler will be called and your Stream instance will start receiving events. Bear in mind that the connection will not be established until the Engine is running. As such, a common pattern when writing client applications with Pants is to call connect(), start the engine and then put all other initialization code in on_connect().

Writing Data

Once your Stream instance is connected to a remote host, you can begin to write data to the channel. Use write() to write string data to the channel, write_file() to efficiently write data from an open file and write_packed() to write packed binary data. As you call these methods, Pants internally buffers your outgoing data. Once the buffer is completely empty, on_write() will be called. Be aware that if you continuously write data to your Stream that on_write() may not be called very frequently. If you wish to bypass the internal buffering and attempt to write your data immediately you can use the flush options present in the three write methods or call the flush() method yourself. This can help to improve your application’s responsiveness but calling it excessively can reduce overall performance. Generally speaking, it is useful when you know with certainty that you have finished writing one discrete chunk of data (i.e. an HTTP response).

Reading Data

A connected Stream instance will automatically receive all incoming data from the remote host. By default, all incoming data is immediately passed to the on_read() event handler for your code to process. The read_delimiter attribute can be used to control this behaviour by causing Pants to buffer incoming data internally, only forwarding it to on_read() when a particular condition is met. If the condition is never met, the internal buffer will eventually exceed the allowed buffer_size and the on_overflow_error() handler method will be called. read_delimiter is extremely powerful when used effectively.

Closing

To close a Stream instance, simply call the close() method. Once a stream has been closed it should not be reused.

Handling Errors

Despite best efforts, errors will occasionally occur in asynchronous code. Pants handles these errors by passing the resulting exception object to one of a number of error handler methods. They are: on_connect_error(), on_overflow_error() and on_error(). Additionally, on_ssl_handshake_error() and on_ssl_error() exist to handle SSL-specific errors.

SSL

Pants streams have SSL support. If you are writing a server application, use Server.startSSL to enable SSL on your server. Each Stream created by your server from that point forward will be SSL-enabled. If you are writing a client application, call Stream.startSSL before calling connect(). Alternatively, you can pass a dictionary of SSL options to the Stream constructor which will then enable SSL on the instance. When SSL is enabled on a Stream, an SSL handshake occurs between the local and remote ends of the connection. Once the SSL handshake is complete, on_ssl_handshake() will be called. If it fails, on_ssl_handshake_error() will be called.

If you are writing an SSL-enabled application you should read the entirety of Python’s ssl documentation. Pants does not override any of Python’s SSL defaults unless clearly stated in this documentation.

Stream

class pants.stream.Stream(**kwargs)[source]

The stream-oriented connection channel.

A Stream instance represents either a local connection to a remote server or a remote connection to a local server over a streaming, connection-oriented protocol such as TCP.

Keyword Argument Description
engine Optional. The engine to which the channel should be added. Defaults to the global engine.
socket Optional. A pre-existing socket to wrap. This can be a regular socket or an SSLSocket. If a socket is not provided, a new socket will be created for the channel when required.
ssl_options Optional. If provided, startSSL() will be called with these options once the stream is ready. By default, SSL will not be enabled.
buffer_size

The maximum size, in bytes, of the internal buffer used for incoming data.

When buffering data it is important to ensure that inordinate amounts of memory are not used. Setting the buffer size to a sensible value can prevent coding errors or malicious use from causing your application to consume increasingly large amounts of memory. By default, a maximum of 64kb of data will be stored.

The buffer size is mainly relevant when using a string value for the read_delimiter. Because you cannot guarantee that the string will appear, having an upper limit on the size of the data is appropriate.

If the read delimiter is set to a number larger than the buffer size, the buffer size will be increased to accommodate the read delimiter.

When the internal buffer’s size exceeds the maximum allowed, the on_overflow_error() callback will be invoked.

Attempting to set the buffer size to anything other than an integer or long will raise a TypeError.

close(flush=True)[source]

Close the channel.

connect(address)[source]

Connect the channel to a remote socket.

The given address is resolved and used by the channel to connect to the remote server. If an error occurs at any stage in this process, on_connect_error() is called. When a connection is successfully established, on_connect() is called.

Addresses can be represented in a number of different ways. A single string is treated as a UNIX address. A single integer is treated as a port and converted to a 2-tuple of the form ('', port). A 2-tuple is treated as an IPv4 address and a 4-tuple is treated as an IPv6 address. See the socket documentation for further information on socket addresses.

If no socket exists on the channel, one will be created with a socket family appropriate for the given address.

An error will occur during the connection if the given address is not of a valid format or of an inappropriate format for the socket (e.g. if an IP address is given to a UNIX socket).

Calling connect() on a closed channel or a channel that is already connected will raise a RuntimeError.

Returns the channel.

Arguments Description
address The remote address to connect to.
flush()[source]

Attempt to immediately write any internally buffered data to the channel without waiting for a write event.

This method can be fairly expensive to call and should be used sparingly.

Calling flush() on a closed or disconnected channel will raise a RuntimeError.

local_address

The address of the channel on the local machine.

By default, this will be the value of socket.getsockname or None. It is possible for user code to override the default behaviour and set the value of the property manually. In order to return the property to its default behaviour, user code then has to delete the value. Example:

# default behaviour
channel.local_address = custom_value
# channel.local_address will return custom_value now
del channel.local_address
# default behaviour
on_close()

Placeholder. Called after the channel has finished closing.

on_connect()

Placeholder. Called after the channel has connected to a remote socket.

on_connect_error(exception)

Placeholder. Called when the channel has failed to connect to a remote socket.

By default, logs the exception and closes the channel.

Argument Description
exception The exception that was raised.
on_error(exception)

Placeholder. Generic error handler for exceptions raised on the channel. Called when an error occurs and no specific error-handling callback exists.

By default, logs the exception and closes the channel.

Argument Description
exception The exception that was raised.
on_overflow_error(exception)

Placeholder. Called when an internal buffer on the channel has exceeded its size limit.

By default, logs the exception and closes the channel.

Argument Description
exception The exception that was raised.
on_read(data)

Placeholder. Called when data is read from the channel.

Argument Description
data A chunk of data received from the socket.
on_ssl_error(exception)[source]

Placeholder. Called when an error occurs in the underlying SSL implementation.

By default, logs the exception and closes the channel.

Argument Description
exception The exception that was raised.
on_ssl_handshake()[source]

Placeholder. Called after the channel has finished its SSL handshake.

on_ssl_handshake_error(exception)[source]

Placeholder. Called when an error occurs during the SSL handshake.

By default, logs the exception and closes the channel.

Argument Description
exception The exception that was raised.
on_write()

Placeholder. Called after the channel has finished writing data.

read_delimiter

The magical read delimiter which determines how incoming data is buffered by the stream.

As data is read from the socket, it is buffered internally by the stream before being passed to the on_read() callback. The value of the read delimiter determines when the data is passed to the callback. Valid values are None, a byte string, an integer/long, a compiled regular expression, an instance of struct.Struct, or an instance of netstruct.NetStruct.

When the read delimiter is None, data will be passed to on_read() immediately after it is read from the socket. This is the default behaviour.

When the read delimiter is a byte string, data will be buffered internally until that string is encountered in the incoming data. All data up to but excluding the read delimiter is then passed to on_read(). The segment matching the read delimiter itself is discarded from the buffer.

When the read delimiter is an integer or a long, it is treated as the number of bytes to read before passing the data to on_read().

When the read delimiter is a struct.Struct instance, the Struct’s size is fully buffered and the data is unpacked using the Struct before its sent to on_read(). Unlike other types of read delimiters, this can result in more than one argument being passed to on_read(), as in the following example:

import struct
from pants import Stream

class Example(Stream):
    def on_connect(self):
        self.read_delimiter = struct.Struct("!LLH")

    def on_read(self, packet_type, length, id):
        pass

When the read delimiter is an instance of netstruct.NetStruct, the NetStruct’s minimum_size is buffered and unpacked with the NetStruct, and additional data is buffered as necessary until the NetStruct can be completely unpacked. Once ready, the data will be passed to on_read(). Using Struct and NetStruct are very similar.

When the read delimiter is a compiled regular expression (re.RegexObject), there are two possible behaviors that you may switch between by setting the value of regex_search. If regex_search is True, as is the default, the delimiter’s search() method is used and, if a match is found, the string before that match is passed to on_read(). The segment that was matched by the regular expression will be discarded.

If regex_search is False, the delimiter’s match() method is used instead and, if a match is found, the match object itself will be passed to on_read(), giving you access to the capture groups. Again, the segment that was matched by the regular expression will be discarded from the buffer.

Attempting to set the read delimiter to any other value will raise a TypeError.

The effective use of the read delimiter can greatly simplify the implementation of certain protocols.

remote_address

The remote address to which the channel is connected.

By default, this will be the value of socket.getpeername or None. It is possible for user code to override the default behaviour and set the value of the property manually. In order to return the property to its default behaviour, user code then has to delete the value. Example:

# default behaviour
channel.remote_address = custom_value
# channel.remote_address will return custom_value now
del channel.remote_address
# default behaviour
startSSL(ssl_options={})[source]

Enable SSL on the channel and perform a handshake at the next opportunity.

SSL is only enabled on a channel once all currently pending data has been written. If a problem occurs at this stage, on_ssl_error() is called. Once SSL has been enabled, the SSL handshake begins - this typically takes some time and may fail, in which case on_ssl_handshake_error() will be called. When the handshake is successfully completed, on_ssl_handshake() is called and the channel is secure.

Typically, this method is called before connect(). In this case, on_ssl_handshake() will be called before on_connect(). If startSSL() is called after connect(), the reverse is true.

It is possible, although unusual, to start SSL on a channel that is already connected and active. In this case, as noted above, SSL will only be enabled and the handshake performed after all currently pending data has been written.

The SSL options argument will be passed through to ssl.wrap_socket() as keyword arguments - see the ssl documentation for further information. You will typically want to provide the keyfile, certfile and ca_certs options. The do_handshake_on_connect option must be False, or a ValueError will be raised.

Attempting to enable SSL on a closed channel or a channel that already has SSL enabled on it will raise a RuntimeError.

Returns the channel.

Arguments Description
ssl_options Optional. Keyword arguments to pass to ssl.wrap_socket().
write(data, flush=False)[source]

Write data to the channel.

Data will not be written immediately, but will be buffered internally until it can be sent without blocking the process.

Calling write() on a closed or disconnected channel will raise a RuntimeError.

Arguments Description
data A string of data to write to the channel.
flush Optional. If True, flush the internal write buffer. See flush() for details.
write_file(sfile, nbytes=0, offset=0, flush=False)[source]

Write a file to the channel.

The file will not be written immediately, but will be buffered internally until it can be sent without blocking the process.

Calling write_file() on a closed or disconnected channel will raise a RuntimeError.

Arguments Description
sfile A file object to write to the channel.
nbytes Optional. The number of bytes of the file to write. If 0, all bytes will be written.
offset Optional. The number of bytes to offset writing by.
flush Optional. If True, flush the internal write buffer. See flush() for details.
write_packed(*data, **kwargs)[source]

Write packed binary data to the channel.

If the current read_delimiter is an instance of struct.Struct or netstruct.NetStruct the format will be read from that Struct, otherwise you will need to provide a format.

Argument Description
*data Any number of values to be passed through struct and written to the remote host.
flush Optional. If True, flush the internal write buffer. See flush() for details.
format Optional. A formatting string to pack the provided data with. If one isn’t provided, the read delimiter will be used.