pants.stream
¶
Streaming (TCP) connection implementation.
Streams are one of the two main types of channels in Pants - the other
being servers
. Streams represent connections
between two endpoints. They may be used for both client and server
applications.
Streams¶
To write a Pants application you will first need to subclass
Stream
. Your Stream
subclass will contain the majority of your networking code in the form
of event handlers. Event handlers are methods beginning with on_
and
can be safely overridden by your subclass.
Connecting¶
Before a Stream
instance can be used, it must
first be connected to a remote host. If you are writing a server
application, all new Stream
instance created by
your Server
will be connected. Once they are
created by the Server
,
on_connect()
will be called and your
Engine
will begin dispatching events to your
Stream
instance.
If you are writing a client application, you must first instantiate your
Stream
subclass and then use the
connect()
method to connect the channel to a
remote host. Once the connection has been successfully established, the
on_connect()
event handler will be called and
your Stream
instance will start receiving events.
Bear in mind that the connection will not be established until the
Engine
is running. As such, a common pattern when
writing client applications with Pants is to call
connect()
, start the engine and then put all
other initialization code in on_connect()
.
Writing Data¶
Once your Stream
instance is connected to a
remote host, you can begin to write data to the channel. Use
write()
to write string data to the channel,
write_file()
to efficiently write data from
an open file and write_packed()
to write
packed binary data. As you call these methods, Pants internally buffers
your outgoing data. Once the buffer is completely empty,
on_write()
will be called. Be aware that if
you continuously write data to your Stream
that
on_write()
may not be called very frequently.
If you wish to bypass the internal buffering and attempt to write your
data immediately you can use the flush
options present in the three
write methods or call the flush()
method
yourself. This can help to improve your application’s responsiveness but
calling it excessively can reduce overall performance. Generally
speaking, it is useful when you know with certainty that you have
finished writing one discrete chunk of data (i.e. an HTTP response).
Reading Data¶
A connected Stream
instance will automatically
receive all incoming data from the remote host. By default, all incoming
data is immediately passed to the on_read()
event handler for your code to process. The
read_delimiter
attribute can be used to
control this behaviour by causing Pants to buffer incoming data
internally, only forwarding it to on_read()
when a particular condition is met. If the condition is never met, the
internal buffer will eventually exceed the allowed
buffer_size
and the
on_overflow_error()
handler method will be
called. read_delimiter
is extremely
powerful when used effectively.
Closing¶
To close a Stream
instance, simply call the
close()
method. Once a stream has been closed
it should not be reused.
Handling Errors¶
Despite best efforts, errors will occasionally occur in asynchronous
code. Pants handles these errors by passing the resulting exception
object to one of a number of error handler methods. They are:
on_connect_error()
,
on_overflow_error()
and
on_error()
. Additionally,
on_ssl_handshake_error()
and
on_ssl_error()
exist to handle SSL-specific
errors.
SSL¶
Pants streams have SSL support. If you are writing a server application,
use Server.startSSL
to enable SSL
on your server. Each Stream
created by your
server from that point forward will be SSL-enabled. If you are writing a
client application, call
Stream.startSSL
before calling
connect()
. Alternatively, you can pass a
dictionary of SSL options to the Stream
constructor which will then enable SSL on the instance. When SSL is
enabled on a Stream
, an SSL handshake occurs
between the local and remote ends of the connection. Once the SSL
handshake is complete, on_ssl_handshake()
will be called. If it fails,
on_ssl_handshake_error()
will be called.
If you are writing an SSL-enabled application you should read the
entirety of Python’s ssl
documentation. Pants does not override
any of Python’s SSL defaults unless clearly stated in this documentation.
Stream
¶
-
class
pants.stream.
Stream
(**kwargs)[source]¶ The stream-oriented connection channel.
A
Stream
instance represents either a local connection to a remote server or a remote connection to a local server over a streaming, connection-oriented protocol such as TCP.Keyword Argument Description engine Optional. The engine to which the channel should be added. Defaults to the global engine. socket Optional. A pre-existing socket to wrap. This can be a regular socket
or anSSLSocket
. If a socket is not provided, a new socket will be created for the channel when required.ssl_options Optional. If provided, startSSL()
will be called with these options once the stream is ready. By default, SSL will not be enabled.-
buffer_size
¶ The maximum size, in bytes, of the internal buffer used for incoming data.
When buffering data it is important to ensure that inordinate amounts of memory are not used. Setting the buffer size to a sensible value can prevent coding errors or malicious use from causing your application to consume increasingly large amounts of memory. By default, a maximum of 64kb of data will be stored.
The buffer size is mainly relevant when using a string value for the
read_delimiter
. Because you cannot guarantee that the string will appear, having an upper limit on the size of the data is appropriate.If the read delimiter is set to a number larger than the buffer size, the buffer size will be increased to accommodate the read delimiter.
When the internal buffer’s size exceeds the maximum allowed, the
on_overflow_error()
callback will be invoked.Attempting to set the buffer size to anything other than an integer or long will raise a
TypeError
.
-
connect
(address)[source]¶ Connect the channel to a remote socket.
The given
address
is resolved and used by the channel to connect to the remote server. If an error occurs at any stage in this process,on_connect_error()
is called. When a connection is successfully established,on_connect()
is called.Addresses can be represented in a number of different ways. A single string is treated as a UNIX address. A single integer is treated as a port and converted to a 2-tuple of the form
('', port)
. A 2-tuple is treated as an IPv4 address and a 4-tuple is treated as an IPv6 address. See thesocket
documentation for further information on socket addresses.If no socket exists on the channel, one will be created with a socket family appropriate for the given address.
An error will occur during the connection if the given address is not of a valid format or of an inappropriate format for the socket (e.g. if an IP address is given to a UNIX socket).
Calling
connect()
on a closed channel or a channel that is already connected will raise aRuntimeError
.Returns the channel.
Arguments Description address The remote address to connect to.
-
flush
()[source]¶ Attempt to immediately write any internally buffered data to the channel without waiting for a write event.
This method can be fairly expensive to call and should be used sparingly.
Calling
flush()
on a closed or disconnected channel will raise aRuntimeError
.
-
local_address
¶ The address of the channel on the local machine.
By default, this will be the value of
socket.getsockname
or None. It is possible for user code to override the default behaviour and set the value of the property manually. In order to return the property to its default behaviour, user code then has to delete the value. Example:# default behaviour channel.local_address = custom_value # channel.local_address will return custom_value now del channel.local_address # default behaviour
-
on_close
()¶ Placeholder. Called after the channel has finished closing.
-
on_connect
()¶ Placeholder. Called after the channel has connected to a remote socket.
-
on_connect_error
(exception)¶ Placeholder. Called when the channel has failed to connect to a remote socket.
By default, logs the exception and closes the channel.
Argument Description exception The exception that was raised.
-
on_error
(exception)¶ Placeholder. Generic error handler for exceptions raised on the channel. Called when an error occurs and no specific error-handling callback exists.
By default, logs the exception and closes the channel.
Argument Description exception The exception that was raised.
-
on_overflow_error
(exception)¶ Placeholder. Called when an internal buffer on the channel has exceeded its size limit.
By default, logs the exception and closes the channel.
Argument Description exception The exception that was raised.
-
on_read
(data)¶ Placeholder. Called when data is read from the channel.
Argument Description data A chunk of data received from the socket.
-
on_ssl_error
(exception)[source]¶ Placeholder. Called when an error occurs in the underlying SSL implementation.
By default, logs the exception and closes the channel.
Argument Description exception The exception that was raised.
-
on_ssl_handshake_error
(exception)[source]¶ Placeholder. Called when an error occurs during the SSL handshake.
By default, logs the exception and closes the channel.
Argument Description exception The exception that was raised.
-
on_write
()¶ Placeholder. Called after the channel has finished writing data.
-
read_delimiter
¶ The magical read delimiter which determines how incoming data is buffered by the stream.
As data is read from the socket, it is buffered internally by the stream before being passed to the
on_read()
callback. The value of the read delimiter determines when the data is passed to the callback. Valid values areNone
, a byte string, an integer/long, a compiled regular expression, an instance ofstruct.Struct
, or an instance ofnetstruct.NetStruct
.When the read delimiter is
None
, data will be passed toon_read()
immediately after it is read from the socket. This is the default behaviour.When the read delimiter is a byte string, data will be buffered internally until that string is encountered in the incoming data. All data up to but excluding the read delimiter is then passed to
on_read()
. The segment matching the read delimiter itself is discarded from the buffer.When the read delimiter is an integer or a long, it is treated as the number of bytes to read before passing the data to
on_read()
.When the read delimiter is a
struct.Struct
instance, the Struct’ssize
is fully buffered and the data is unpacked using the Struct before its sent toon_read()
. Unlike other types of read delimiters, this can result in more than one argument being passed toon_read()
, as in the following example:import struct from pants import Stream class Example(Stream): def on_connect(self): self.read_delimiter = struct.Struct("!LLH") def on_read(self, packet_type, length, id): pass
When the read delimiter is an instance of
netstruct.NetStruct
, the NetStruct’sminimum_size
is buffered and unpacked with the NetStruct, and additional data is buffered as necessary until the NetStruct can be completely unpacked. Once ready, the data will be passed toon_read()
. Using Struct and NetStruct are very similar.When the read delimiter is a compiled regular expression (
re.RegexObject
), there are two possible behaviors that you may switch between by setting the value ofregex_search
. Ifregex_search
is True, as is the default, the delimiter’ssearch()
method is used and, if a match is found, the string before that match is passed toon_read()
. The segment that was matched by the regular expression will be discarded.If
regex_search
is False, the delimiter’smatch()
method is used instead and, if a match is found, the match object itself will be passed toon_read()
, giving you access to the capture groups. Again, the segment that was matched by the regular expression will be discarded from the buffer.Attempting to set the read delimiter to any other value will raise a
TypeError
.The effective use of the read delimiter can greatly simplify the implementation of certain protocols.
-
remote_address
¶ The remote address to which the channel is connected.
By default, this will be the value of
socket.getpeername
or None. It is possible for user code to override the default behaviour and set the value of the property manually. In order to return the property to its default behaviour, user code then has to delete the value. Example:# default behaviour channel.remote_address = custom_value # channel.remote_address will return custom_value now del channel.remote_address # default behaviour
-
startSSL
(ssl_options={})[source]¶ Enable SSL on the channel and perform a handshake at the next opportunity.
SSL is only enabled on a channel once all currently pending data has been written. If a problem occurs at this stage,
on_ssl_error()
is called. Once SSL has been enabled, the SSL handshake begins - this typically takes some time and may fail, in which caseon_ssl_handshake_error()
will be called. When the handshake is successfully completed,on_ssl_handshake()
is called and the channel is secure.Typically, this method is called before
connect()
. In this case,on_ssl_handshake()
will be called beforeon_connect()
. IfstartSSL()
is called afterconnect()
, the reverse is true.It is possible, although unusual, to start SSL on a channel that is already connected and active. In this case, as noted above, SSL will only be enabled and the handshake performed after all currently pending data has been written.
The SSL options argument will be passed through to
ssl.wrap_socket()
as keyword arguments - see thessl
documentation for further information. You will typically want to provide thekeyfile
,certfile
andca_certs
options. Thedo_handshake_on_connect
option must beFalse
, or aValueError
will be raised.Attempting to enable SSL on a closed channel or a channel that already has SSL enabled on it will raise a
RuntimeError
.Returns the channel.
Arguments Description ssl_options Optional. Keyword arguments to pass to ssl.wrap_socket()
.
-
write
(data, flush=False)[source]¶ Write data to the channel.
Data will not be written immediately, but will be buffered internally until it can be sent without blocking the process.
Calling
write()
on a closed or disconnected channel will raise aRuntimeError
.Arguments Description data A string of data to write to the channel. flush Optional. If True, flush the internal write buffer. See flush()
for details.
-
write_file
(sfile, nbytes=0, offset=0, flush=False)[source]¶ Write a file to the channel.
The file will not be written immediately, but will be buffered internally until it can be sent without blocking the process.
Calling
write_file()
on a closed or disconnected channel will raise aRuntimeError
.Arguments Description sfile A file object to write to the channel. nbytes Optional. The number of bytes of the file to write. If 0, all bytes will be written. offset Optional. The number of bytes to offset writing by. flush Optional. If True, flush the internal write buffer. See flush()
for details.
-
write_packed
(*data, **kwargs)[source]¶ Write packed binary data to the channel.
If the current
read_delimiter
is an instance ofstruct.Struct
ornetstruct.NetStruct
the format will be read from that Struct, otherwise you will need to provide aformat
.Argument Description *data Any number of values to be passed through struct
and written to the remote host.flush Optional. If True, flush the internal write buffer. See flush()
for details.format Optional. A formatting string to pack the provided data with. If one isn’t provided, the read delimiter will be used.
-