###############################################################################
#
# Copyright 2011-2012 Pants Developers (see AUTHORS.txt)
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
###############################################################################
"""
``pants.http.websocket`` implements the WebSocket protocol, as described by
:rfc:`6455`, on top of the Pants HTTP server using an API similar to that
provided by :class:`pants.stream.Stream`.
Using WebSockets
================
To start working with WebSockets, you'll need to create a subclass of
:class:`WebSocket`. As with :class:`~pants.stream.Stream`, :class:`WebSocket`
instances are meant to contain the majority of your networking logic through
the definition of custom event handlers. Event handlers are methods that have
names beginning with ``on_`` that can be safely overridden within
your subclass.
Listening for Connections
-------------------------
:class:`WebSocket` is designed to be used as a request handler for the Pants
HTTP server, :class:`pants.http.server.HTTPServer`. As such, to begin listening
for WebSocket connections, you must create an instance of
:class:`~pants.http.server.HTTPServer` using your custom :class:`WebSocket`
subclass as its request handler.
.. code-block:: python
from pants.http import HTTPServer, WebSocket
from pants import Engine
class EchoSocket(WebSocket):
def on_read(self, data):
self.write(data)
HTTPServer(EchoSocket).listen(8080)
Engine.instance().start()
If you need to host traditional requests from your HTTPServer instance, you may
invoke the WebSocket handler simply by creating an instance of your
:class:`WebSocket` subclass with the appropriate
:class:`pants.http.server.HTTPRequest` instance as its only argument:
.. code-block:: python
from pants.http import HTTPServer, WebSocket
from pants import Engine
class EchoSocket(WebSocket):
def on_read(self, data):
self.write(data)
def request_handler(request):
if request.path == '/_ws':
EchoSocket(request)
else:
request.send_response("Nothing to see here.")
HTTPServer(request_handler).listen(8080)
Engine.instance().start()
``WebSocket`` and ``Application``
---------------------------------
:class:`WebSocket` has support for :class:`pants.web.application.Application`
and can easily be used as a request handler for any route. Additionally,
variables captured from the URL by :class:`~pants.web.application.Application`
will be made accessible to the :meth:`WebSocket.on_connect` event handler. The
following example of a WebSocket echo server displays a customized welcome
message depending on the requested URL.
.. code-block:: python
from pants.http import WebSocket
from pants.web import Application
app = Application()
@app.route("/ws/<name>")
class EchoSocket(WebSocket):
def on_connect(self, name):
self.write(u"Hello, {name}!".format(name=name))
def on_read(self, data):
self.write(data)
app.run(8080)
WebSocket Security
==================
Secure Connections
------------------
:class:`WebSocket` relies upon the :class:`pants.http.server.HTTPServer`
instance serving it to provide SSL. This can be as easy as calling the server's
:meth:`~pants.http.server.HTTPServer.startSSL` method.
To determine whether or not the :class:`WebSocket` instance is using a
secure connection, you may use the :attr:`~WebSocket.is_secure` attribute.
Custom Handshakes
-----------------
You may implement custom logic during the WebSocket's handshake by overriding
the :meth:`WebSocket.on_handshake` event handler. The ``on_handshake`` handler
is called with a reference to the :class:`~pants.http.server.HTTPRequest`
instance the WebSocket handshake is happening upon as well as an empty
dictionary that may be used to set custom headers on the HTTP response.
``on_handshake`` is expected to return a True value if the request is alright.
Returning a False value will result in the generation of an error page. The
following example of a custom handshake requires a secret HTTP header in the
request, and that the connection is secured:
.. code-block:: python
from pants.http import WebSocket
class SecureSocket(WebSocket):
def on_handshake(self, request, headers):
return self.is_secure and 'X-Pizza' in request.headers
def on_connect(self):
self.write(u"Welcome to PizzaNet.")
Reading and Writing Data
========================
WebSockets are a bit different than normal :class:`~pants.stream.Stream`
instances, as a WebSocket can transmit both byte strings and unicode strings,
and data is encapsulated into formatted messages with definite lengths. Because
of this, reading from one can be slightly different.
Mostly, however, the :attr:`~WebSocket.read_delimiter` works in exactly the
same way as that of :class:`pants.stream.Stream`.
Unicode Strings and Byte Strings
--------------------------------
:class:`WebSocket` strictly enforces the difference between byte strings and
unicode strings. As such, the connection will be closed with a protocol error
if any of the following happen:
1. The string types of the :attr:`~WebSocket.read_delimiter` and the
buffer differ.
2. There is an existing string still in the buffer when the client sends
another string of a different type.
3. The :attr:`~WebSocket.read_delimiter` is currently a struct and the
buffer does not contain a byte string.
Of those conditions, the most likely to occur is the first. Take the following
code:
.. code-block:: python
from pants.http import WebSocket, HTTPServer
from pants import Engine
def process(text):
return text.decode('rot13')
class LineOriented(WebSocket):
def on_connect(self):
self.read_delimiter = "\\n"
def on_read(self, line):
self.write(process(line))
HTTPServer(LineOriented).listen(8080)
Engine.instance().start()
And, on the client:
.. code-block:: html
<!DOCTYPE html>
<textarea id="editor"></textarea><br>
<input type="submit" value="Send">
<script>
var ws = new WebSocket("ws://localhost:8080/"),
input = document.querySelector('#editor'),
button = document.querySelector('input');
ws.onmessage = function(e) {
alert("Got back: " + e.data);
}
button.addEventListener("click", function() {
ws.send(input.value + "\\n");
});
</script>
On Python 2.x, the read delimiter will be a byte string. The WebSocket will
expect to receive a byte string. However, the simple JavaScript shown above
sends *unicode* strings. That simple service would fail immediately
on Python 2.
To avoid the problem, be sure to use the string type you really want for your
read delimiters. For the above, that's as simple as setting the read
delimiter with:
.. code-block:: python
self.read_delimiter = u"\\n"
WebSocket Messages
------------------
In addition to the standard types of :attr:`~WebSocket.read_delimiter`,
:class:`WebSocket` instances support the use of a special value called
:attr:`EntireMessage`. When using ``EntireMessage``, full messages will
be sent to your :attr:`~WebSocket.on_read` event handler, as framed by
the remote end-point.
``EntireMessage`` is the default :attr:`~WebSocket.read_delimiter` for
WebSocket instances, and it makes it dead simple to write simple services.
The following example implements a simple RPC system over WebSockets:
.. code-block:: python
import json
from pants.http.server import HTTPServer
from pants.http.websocket import WebSocket, FRAME_TEXT
from pants import Engine
class RPCSocket(WebSocket):
methods = {}
@classmethod
def method(cls, name):
''' Add a method to the RPC. '''
def decorator(method):
cls.methods[name] = method
return method
return decorator
def json(self, **data):
''' Send a JSON object to the remote end-point. '''
# JSON outputs UTF-8 encoded text by default, so use the frame
# argument to let WebSocket know it should be sent as text to the
# remote end-point, rather than as binary data.
self.write(json.dumps(data), frame=FRAME_TEXT)
def on_read(self, data):
# Attempt to decode a JSON message.
try:
data = json.loads(data)
except ValueError:
self.json(ok=False, result="can't decode JSON")
return
# Lookup the desired method. Return an error if it doesn't exist.
method = data['method']
if not method in self.methods:
self.json(ok=False, result="no such method")
return
method = self.methods[method]
args = data.get("args", tuple())
kwargs = data.get("kwargs", dict())
ok = True
# Try running the method, and capture the result. If it errors, set
# the result to the error string and ok to False.
try:
result = method(*args, **kwargs)
except Exception as ex:
ok = False
result = str(ex)
self.json(ok=ok, result=result)
@RPCSocket.method("rot13")
def rot13(string):
return string.decode("rot13")
HTTPServer(RPCSocket).listen(8080)
Engine.instance().start()
As you can see, it never even *uses* :attr:`~WebSocket.read_delimiter`. The
client simply sends JSON messages, with code such as:
.. code-block:: javascript
my_websocket.send(JSON.stringify({method: "rot13", args: ["test"]}));
This behavior is completely reliable, even when the client is sending
fragmented messages.
"""
###############################################################################
# Imports
###############################################################################
import base64
import hashlib
import re
import struct
import sys
if sys.platform == "win32":
from time import clock as time
else:
from time import time
from pants.stream import StreamBufferOverflow
from pants.http.utils import log
try:
from netstruct import NetStruct as _NetStruct
except ImportError:
# Create the fake class because isinstance expects a class.
class _NetStruct(object):
def __init__(self, *a, **kw):
raise NotImplementedError
###############################################################################
# Constants
###############################################################################
CLOSE_REASONS = {
1000: 'Normal Closure',
1001: 'Endpoint Going Away',
1002: 'Protocol Error',
1003: 'Unacceptable Data Type',
1005: 'No Status Code',
1006: 'Abnormal Close',
1007: 'Invalid UTF-8 Data',
1008: 'Message Violates Policy',
1009: 'Message Too Big',
1010: 'Extensions Not Present',
1011: 'Unexpected Condition Prevented Fulfillment',
1015: 'TLS Handshake Error'
}
# Handshake Key
WEBSOCKET_KEY = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"
# Supported WebSocket Versions
WEBSOCKET_VERSIONS = (13, 8, 0)
# Frame Opcodes
FRAME_CONTINUATION = 0
FRAME_TEXT = 1
FRAME_BINARY = 2
FRAME_CLOSE = 8
FRAME_PING = 9
FRAME_PONG = 10
# Special read_delimiter Value
EntireMessage = object()
# Regex Stuff
RegexType = type(re.compile(""))
Struct = struct.Struct
# Structs
STRUCT_H = Struct("!H")
STRUCT_Q = Struct("!Q")
###############################################################################
# WebSocket Class
###############################################################################
[docs]class WebSocket(object):
"""
An implementation of `WebSockets <http://en.wikipedia.org/wiki/WebSockets>`_
on top of the Pants HTTP server using an API similar to that of
:class:`pants.stream.Stream`.
A :class:`WebSocket` instance represents a WebSocket connection to a
remote client. In the future, WebSocket will be modified to support acting
as a client in addition to acting as a server.
When using WebSockets you write logic as you could for
:class:`~pants.stream.Stream`, using the same :attr:`read_delimiter` and
event handlers, while the WebSocket implementation handles the initial
negotiation and all data framing for you.
========= ============
Argument Description
========= ============
request The :class:`~pants.http.server.HTTPRequest` to begin negotiating a WebSocket connection over.
========= ============
"""
protocols = None
allow_old_handshake = False
def __init__(self, request, *arguments):
# Store the request and play nicely with web.
self._connection = request.connection
self.engine = self._connection.engine
request.auto_finish = False
self._arguments = arguments
# Base State
self.fileno = self._connection.fileno
self._remote_address = None
self._local_address = None
self._pings = {}
self._last_ping = 0
# I/O attributes
self._read_delimiter = EntireMessage
self._recv_buffer_size_limit = self._buffer_size
self._recv_buffer = ""
self._read_buffer = None
self._rb_type = None
self._frag_frame = None
self.connected = False
self._closed = False
# Copy the HTTPRequest's security state.
self.is_secure = request.is_secure
# First up, make sure we're dealing with an actual WebSocket request.
# If we aren't, return a simple 426 Upgrade Required page.
fail = False
headers = {}
if not request.headers.get('Connection','').lower() == 'upgrade' and \
not request.headers.get('Upgrade','').lower() == 'websocket':
fail = True
# It's a WebSocket. Rejoice. Make sure the handshake information is
# all acceptable.
elif not self._safely_call(self.on_handshake, request, headers):
fail = True
# Determine which version of WebSockets we're dealing with.
if 'Sec-WebSocket-Version' in request.headers:
# New WebSockets. Handshake.
if not 'Sec-WebSocket-Key' in request.headers:
fail = True
else:
accept = base64.b64encode(hashlib.sha1(
request.headers['Sec-WebSocket-Key'] + WEBSOCKET_KEY
).digest())
headers['Upgrade'] = 'websocket'
headers['Connection'] = 'Upgrade'
headers['Sec-WebSocket-Accept'] = accept
self.version = int(request.headers['Sec-WebSocket-Version'])
if self.version not in WEBSOCKET_VERSIONS:
headers['Sec-WebSocket-Version'] = False
fail = True
elif not self.allow_old_handshake:
# No old WebSockets allowed.
fail = True
else:
# Old WebSockets. Wut?
self.version = 0
self._headers = headers
self._request = request
self._connection.on_read = self._finish_handshake
self._connection.on_close = self._con_close
self._connection.on_write = self._con_write
self._connection.read_delimiter = 8
return
if fail:
if 'Sec-WebSocket-Version' in headers:
request.send_status(400)
request.send_headers({
'Content-Type': 'text/plain',
'Content-Length': 15,
'Sec-WebSocket-Version': ', '.join(str(x) for x in
WEBSOCKET_VERSIONS)
})
request.send('400 Bad Request')
else:
request.send_status(426)
headers = {
'Content-Type': 'text/plain',
'Content-Length': '20',
'Sec-WebSocket-Version': ', '.join(str(x) for x in
WEBSOCKET_VERSIONS)
}
request.send_headers(headers)
request.send("426 Upgrade Required")
request.finish()
return
# Still here? No fail! Send the handshake response, hook up event
# handlers, and call on_connect.
request.send_status(101)
request.send_headers(headers)
self._connection.on_read = self._con_read
self._connection.on_close = self._con_close
self._connection.on_write = self._con_write
self._connection.read_delimiter = None
self.connected = True
self._safely_call(self.on_connect, *self._arguments)
del self._arguments
def _finish_handshake(self, key3):
self._connection.read_delimiter = None
request = self._request
headers = self._headers
del self._headers
del self._request
scheme = 'wss' if self.is_secure else 'ws'
request.send_status(101)
headers.update({
'Upgrade': 'WebSocket',
'Connection': 'Upgrade',
'Sec-WebSocket-Origin': request.headers['Origin'],
'Sec-WebSocket-Location': '%s://%s%s' % (
scheme, request.host, request.url)
})
request.send_headers(headers)
try:
request.send(challenge_response(
request.headers, key3))
except ValueError:
log.warning("Malformed WebSocket challenge to %r." % self)
self.close(False)
return
# Move on.
self._expect_frame = True
# Finish up.
self.connected = True
self._connection.on_read = self._con_old_read
self._safely_call(self.on_connect, *self._arguments)
del self._arguments
##### Properties ##########################################################
@property
def remote_address(self):
"""
The remote address to which the WebSocket is connected.
By default, this will be the value of ``socket.getpeername`` or
None. It is possible for user code to override the default
behaviour and set the value of the property manually. In order
to return the property to its default behaviour, user code then
has to delete the value. Example::
# default behaviour
channel.remote_address = custom_value
# channel.remote_address will return custom_value now
del channel.remote_address
# default behaviour
"""
if self._remote_address is not None:
return self._remote_address
elif self._connection:
return self._connection.remote_address
else:
return None
@remote_address.setter
def remote_address(self, val):
self._remote_address = val
@remote_address.deleter
def remote_address(self):
self._remote_address = None
@property
def local_address(self):
"""
The address of the WebSocket on the local machine.
By default, this will be the value of ``socket.getsockname`` or
None. It is possible for user code to override the default
behaviour and set the value of the property manually. In order
to return the property to its default behaviour, user code then
has to delete the value. Example::
# default behaviour
channel.local_address = custom_value
# channel.local_address will return custom_value now
del channel.local_address
# default behaviour
"""
if self._local_address is not None:
return self._local_address
elif self._connection:
return self._connection.local_address
else:
return None
@local_address.setter
def local_address(self, val):
self._local_address = val
@local_address.deleter
def local_address(self):
self._local_address = None
@property
def read_delimiter(self):
"""
The magical read delimiter which determines how incoming data is
buffered by the WebSocket.
As data is read from the socket, it is buffered internally by
the WebSocket before being passed to the :meth:`on_read` callback. The
value of the read delimiter determines when the data is passed to the
callback. Valid values are ``None``, a string, an integer/long,
a compiled regular expression, an instance of :class:`struct.Struct`,
an instance of :class:`netstruct.NetStruct`, or the
:attr:`~pants.http.websocket.EntireMessage` object.
When the read delimiter is the ``EntireMessage`` object, entire
WebSocket messages will be passed to :meth:`on_read` immediately once
they have been received in their entirety. This is the default behavior
for :class:`WebSocket` instances.
When the read delimiter is ``None``, data will be passed to
:meth:`on_read` immediately after it has been received.
When the read delimiter is a byte string or unicode string, data will
be buffered internally until that string is encountered in the incoming
data. All data up to but excluding the read delimiter is then passed
to :meth:`on_read`. The segment matching the read delimiter itself is
discarded from the buffer.
.. note::
When using strings as your read delimiter, you must be careful to
use unicode strings if you wish to send and receive strings to a
remote JavaScript client.
When the read delimiter is an integer or a long, it is treated
as the number of bytes to read before passing the data to
:meth:`on_read`.
When the read delimiter is an instance of :class:`struct.Struct`, the
Struct's ``size`` is fully buffered and the data is unpacked before the
unpacked data is sent to :meth:`on_read`. Unlike other types of read
delimiters, this can result in more than one argument being sent to the
:meth:`on_read` event handler, as in the following example::
import struct
from pants.http import WebSocket
class Example(WebSocket):
def on_connect(self):
self.read_delimiter = struct.Struct("!ILH")
def on_read(self, packet_type, length, id):
pass
You must send binary data from the client when using structs as your
read delimiter. If Pants receives a unicode string while a struct
read delimiter is set, it will close the connection with a protocol
error. This holds true for the :class:`~netstruct.Netstruct`
delimiters as well.
When the read delimiter is an instance of :class:`netstruct.NetStruct`,
the NetStruct's :attr:`~netstruct.NetStruct.minimum_size` is buffered
and unpacked with the NetStruct, and additional data is buffered as
necessary until the NetStruct can be completely unpacked. Once ready,
the data will be passed to :meth:`on_read`. Using Struct and NetStruct
are *very* similar.
When the read delimiter is a compiled regular expression
(:class:`re.RegexObject`), there are two possible behaviors that you
may switch between by setting the value of :attr:`regex_search`. If
:attr:`regex_search` is True, as is the default, the delimiter's
:meth:`~re.RegexObject.search` method is used and, if a match is found,
the string before that match is passed to :meth:`on_read`. The segment
that was matched by the regular expression will be discarded.
If :attr:`regex_search` is False, the delimiter's
:meth:`~re.RegexObject.match` method is used instead and, if a match
is found, the match object itself will be passed to :meth:`on_read`,
giving you access to the capture groups. Again, the segment that was
matched by the regular expression will be discarded from the buffer.
Attempting to set the read delimiter to any other value will
raise a :exc:`TypeError`.
The effective use of the read delimiter can greatly simplify the
implementation of certain protocols.
"""
return self._read_delimiter
@read_delimiter.setter
def read_delimiter(self, value):
if value is None or isinstance(value, basestring) or\
isinstance(value, RegexType):
self._read_delimiter = value
self._recv_buffer_size_limit = self._buffer_size
elif isinstance(value, (int, long)):
self._read_delimiter = value
self._recv_buffer_size_limit = max(self._buffer_size, value)
elif isinstance(value, Struct):
self._read_delimiter = value
self._recv_buffer_size_limit = max(self._buffer_size, value.size)
elif isinstance(value, _NetStruct):
self._read_delimiter = value
self._recv_buffer_size_limit = max(self._buffer_size,
value.minimum_size)
elif value is EntireMessage:
self._read_delimiter = value
self._recv_buffer_size_limit = self._buffer_size
else:
raise TypeError("Attempted to set read_delimiter to a value with an invalid type.")
# Reset NetStruct state when we change the read delimiter.
self._netstruct_iter = None
self._netstruct_needed = None
# Setting these at the class level makes them easy to override on a
# per-class basis.
regex_search = True
_buffer_size = 2 ** 16 # 64kb
@property
def buffer_size(self):
"""
The maximum size, in bytes, of the internal buffer used for
incoming data.
When buffering data it is important to ensure that inordinate
amounts of memory are not used. Setting the buffer size to a
sensible value can prevent coding errors or malicious use from
causing your application to consume increasingly large amounts
of memory. By default, a maximum of 64kb of data will be stored.
The buffer size is mainly relevant when using a string value for
the :attr:`read_delimiter`. Because you cannot guarantee that the
string will appear, having an upper limit on the size of the data
is appropriate.
If the read delimiter is set to a number larger than the buffer
size, the buffer size will be increased to accommodate the read
delimiter.
When the internal buffer's size exceeds the maximum allowed, the
:meth:`on_overflow_error` callback will be invoked.
Attempting to set the buffer size to anything other than an
integer or long will raise a :exc:`TypeError`.
"""
return self._buffer_size
@buffer_size.setter
def buffer_size(self, value):
if not isinstance(value, (long, int)):
raise TypeError("buffer_size must be an int or a long")
self._buffer_size = value
if isinstance(self._read_delimiter, (int, long)):
self._recv_buffer_size_limit = max(value, self._read_delimiter)
elif isinstance(self._read_delimiter, Struct):
self._recv_buffer_size_limit = max(value,
self._read_delimiter.size)
elif isinstance(self._read_delimiter, _NetStruct):
self._recv_buffer_size_limit = max(value,
self._read_delimiter.minimum_size)
else:
self._recv_buffer_size_limit = value
##### Control Methods #####################################################
[docs] def close(self, flush=True, reason=1000, message=None):
"""
Close the WebSocket connection. If flush is True, wait for any remaining
data to be sent and send a close frame before closing the connection.
========= ========== ============
Argument Default Description
========= ========== ============
flush ``True`` *Optional.* If False, closes the connection immediately, without ensuring all buffered data is sent.
reason ``1000`` *Optional.* The reason the socket is closing, as defined at :rfc:`6455#section-7.4`.
message ``None`` *Optional.* A message string to send with the reason code, rather than the default.
========= ========== ============
"""
if self._connection is None or self._closed:
return
self.read_delimiter = None
self._read_buffer = None
self._rb_type = None
self._recv_buffer = ""
self._closed = True
if flush:
if not self.version:
self._connection.close(True)
else:
# Look up the reason.
if not message:
message = CLOSE_REASONS.get(reason, 'Unknown Close')
reason = STRUCT_H.pack(reason) + message
self.write(reason, frame=FRAME_CLOSE)
self._connection.close(True)
self.connected = False
self._connection = None
return
self.connected = False
if self._connection and self._connection.connected:
self._connection.close(False)
self._connection = None
##### Public Event Handlers ###############################################
[docs] def on_read(self, data):
"""
Placeholder. Called when data is read from the WebSocket.
========= ============
Argument Description
========= ============
data A chunk of data received from the socket. Binary data will be provided as a byte string, and text data will be provided as a unicode string.
========= ============
"""
pass
[docs] def on_write(self):
"""
Placeholder. Called after the WebSocket has finished writing data.
"""
pass
[docs] def on_connect(self, *arguments):
"""
Placeholder. Called after the WebSocket has connected to a client and
completed its handshake. Any additional arguments passed to the
:class:`WebSocket` instance's constructor will be passed to this
method when it is invoked, making it easy to use :class:`WebSocket`
together with the URL variables captured by
:class:`pants.web.application.Application`, as shown in the
following example::
from pants.web import Application
from pants.http import WebSocket
app = Application()
@app.route("/ws/<int:id>")
class MyConnection(WebSocket):
def on_connect(self, id):
pass
"""
pass
[docs] def on_close(self):
"""
Placeholder. Called after the WebSocket has finished closing.
"""
pass
[docs] def on_handshake(self, request, headers):
"""
Placeholder. Called during the initial handshake, making it possible to
validate the request with custom logic, such as Origin checking and
other forms of authentication.
If this function returns a False value, the handshake will be stopped
and an error will be sent to the client.
========= ============
Argument Description
========= ============
request The :class:`pants.http.server.HTTPRequest` being upgraded to a WebSocket.
headers An empty dict. Any values set here will be sent as headers when accepting (or rejecting) the connection.
========= ============
"""
return True
[docs] def on_pong(self, data):
"""
Placeholder. Called when a PONG control frame is received from the
remote end-point in response to an earlier ping.
When used together with the :meth:`ping` method, ``on_pong`` may be
used to measure the connection's round-trip time. See :meth:`ping` for
more information.
========= ============
Argument Description
========= ============
data Either the RTT expressed as seconds, or an arbitrary byte string that served as the PONG frame's payload.
========= ============
"""
pass
[docs] def on_overflow_error(self, exception):
"""
Placeholder. Called when an internal buffer on the WebSocket has
exceeded its size limit.
By default, logs the exception and closes the WebSocket.
========== ============
Argument Description
========== ============
exception The exception that was raised.
========== ============
"""
log.exception(exception)
self.close(reason=1009)
##### I/O Methods #########################################################
[docs] def ping(self, data=None):
"""
Write a ping frame to the WebSocket. You may, optionally, provide a
byte string of data to be used as the ping's payload. When the
end-point returns a PONG, and the :meth:`on_pong` method is called,
that byte string will be provided to ``on_pong``. Otherwise, ``on_pong``
will be called with the elapsed time.
========= ============
Argument Description
========= ============
data *Optional.* A byte string of data to be sent as the ping's payload.
========= ============
"""
if data is None:
self._last_ping += 1
data = str(self._last_ping)
self._pings[data] = time()
self.write(data, FRAME_PING)
[docs] def write(self, data, frame=None, flush=False):
"""
Write data to the WebSocket.
Data will not be written immediately, but will be buffered internally
until it can be sent without blocking the process.
Calling :meth:`write` on a closed or disconnected WebSocket will raise
a :exc:`RuntimeError`.
If data is a unicode string, the data will be sent to the remote
end-point as text using the frame opcode for text. If data is a byte
string, the data will be sent to the remote end-point as binary data
using the frame opcode for binary data. If you manually specify a frame
opcode, the provided data *must* be a byte string.
An appropriate header for the data will be generated by this method,
using the length of the data and the frame opcode.
========== ============================================================
Arguments Description
========== ============================================================
data A string of data to write to the WebSocket. Unicode will be
converted automatically.
frame *Optional.* The frame opcode for this message.
flush *Optional.* If True, flush the internal write buffer. See
:meth:`pants.stream.Stream.flush` for details.
========== ============================================================
"""
if self._connection is None:
raise RuntimeError("write() called on closed %r" % self)
if not self.connected:
raise RuntimeError("write() called on disconnected %r." % self)
if frame is None:
if isinstance(data, unicode):
frame = FRAME_TEXT
data = data.encode('utf-8')
elif isinstance(data, bytes):
frame = FRAME_BINARY
else:
raise TypeError("data must be unicode or bytes")
elif frame == FRAME_TEXT:
if isinstance(data, unicode):
data = data.encode('utf-8')
elif not isinstance(data, bytes):
raise TypeError("data must be bytes or unicode for FRAME_TEXT.")
elif not isinstance(data, bytes):
raise TypeError("data must be bytes for frames other "
"than FRAME_TEXT.")
if self.version == 0:
if frame != FRAME_TEXT:
raise TypeError("Attempted to send non-unicode data across "
"outdated WebSocket protocol.")
self._connection.write(b"\x00" + data + b"\xFF", flush=flush)
return
header = chr(0x80 | frame)
plen = len(data)
if plen > 65535:
header += b"\x7F" + STRUCT_Q.pack(plen)
elif plen > 125:
header += b"\x7E" + STRUCT_H.pack(plen)
else:
header += chr(plen)
self._connection.write(header + data, flush=flush)
[docs] def write_file(self, sfile, nbytes=0, offset=0, flush=False):
"""
Write a file to the WebSocket.
This method sends an entire file as one huge binary frame, so be
careful with how you use it.
Calling :meth:`write_file()` on a closed or disconnected WebSocket will
raise a :exc:`RuntimeError`.
========== ====================================================
Arguments Description
========== ====================================================
sfile A file object to write to the WebSocket.
nbytes *Optional.* The number of bytes of the file to
write. If 0, all bytes will be written.
offset *Optional.* The number of bytes to offset writing
by.
flush *Optional.* If True, flush the internal write
buffer. See :meth:`~pants.stream.Stream.flush` for
details.
========== ====================================================
"""
if not self._connection:
raise RuntimeError("write_file() called on closed %r." % self)
elif not self.connected:
raise RuntimeError("write_file() called on disconnected %r." % self)
elif not self.version:
raise TypeError("Attempted to send non-unicode data across "
"outdated WebSocket protocol.")
# Determine the length we're sending.
current_pos = sfile.tell()
sfile.seek(0, 2)
size = sfile.tell()
sfile.seek(current_pos)
if offset > size:
raise ValueError("offset outsize of file size.")
elif offset:
size -= offset
if nbytes == 0:
nbytes = size
elif nbytes < size:
size = nbytes
header = b"\x82"
if size > 65535:
header += b"\x7F" + STRUCT_Q.pack(size)
elif size > 125:
header += b"\x7E" + STRUCT_H.pack(size)
else:
header += chr(size)
self._connection.write(header)
self._connection.write_file(sfile, nbytes, offset, flush)
[docs] def write_packed(self, *data, **kwargs):
"""
Write packed binary data to the WebSocket.
Calling :meth:`write_packed` on a closed or disconnected WebSocket will
raise a :exc:`RuntimeError`.
If the current :attr:`read_delimiter` is an instance of
:class:`struct.Struct` or :class:`netstruct.NetStruct` the format
will be read from that Struct, otherwise you will need to provide
a ``format``.
========== ====================================================
Argument Description
========== ====================================================
\*data Any number of values to be passed through
:mod:`struct` and written to the remote host.
format *Optional.* A formatting string to pack the
provided data with. If one isn't provided, the read
delimiter will be used.
flush *Optional.* If True, flush the internal write
buffer. See :meth:`~pants.stream.Stream.flush` for
details.
========== ====================================================
"""
frame = kwargs.get("frame", FRAME_BINARY)
if not self._connection:
raise RuntimeError("write_packed() called on closed %r." % self)
elif not self.connected:
raise RuntimeError("write_packed() called on disconnected %r."
% self)
elif not self.version and frame != FRAME_TEXT:
raise TypeError("Attempted to send non-unicode data across "
"outdated WebSocket protocol.")
format = kwargs.get("format", None)
flush = kwargs.get("flush", False)
if format:
self.write(struct.pack(format, *data), frame=frame, flush=flush)
elif not isinstance(self._read_delimiter, (Struct, _NetStruct)):
raise ValueError("No format is available for writing packed data.")
else:
self.write(self._read_delimiter.pack(*data), frame=frame, flush=flush)
##### Internal Methods ####################################################
def _safely_call(self, thing_to_call, *args, **kwargs):
"""
Safely execute a callable.
The callable is wrapped in a try block and executed. If an
exception is raised it is logged.
============== ============
Argument Description
============== ============
thing_to_call The callable to execute.
*args The positional arguments to be passed to the callable.
**kwargs The keyword arguments to be passed to the callable.
============== ============
"""
try:
return thing_to_call(*args, **kwargs)
except Exception:
log.exception("Exception raised on %r." % self)
##### Internal Event Handler Methods ######################################
def _con_old_read(self, data):
"""
Process incoming data, the old way.
"""
self._recv_buffer += data
while len(self._recv_buffer) >= 2:
if self._expect_frame:
self._expect_frame = False
self._frame = ord(self._recv_buffer[0])
self._recv_buffer = self._recv_buffer[1:]
if self._frame & 0x80 == 0x80:
log.error("Unsupported frame type for old-style WebSockets %02X on %r." %
(self._frame, self))
self.close(False)
return
# Simple Frame.
ind = self._recv_buffer.find('\xFF')
if ind == -1:
if len(self._recv_buffer) > self._recv_buffer_size_limit:
# TODO: Callback for handling this event?
self.close(reason=1009)
return
# Read the data.
try:
data = self._recv_buffer[:ind].decode('utf-8')
except UnicodeDecodeError:
self.close(reason=1007)
return
if not self._read_buffer:
self._read_buffer = data
self._rb_type = type(self._read_buffer)
else:
self._read_buffer += data
self._recv_buffer = self._recv_buffer[ind+1:]
self._expect_frame = True
# Act on the data.
self._process_read_buffer()
def _con_read(self, data):
"""
Process incoming data.
"""
self._recv_buffer += data
while len(self._recv_buffer) >= 2:
byte1 = ord(self._recv_buffer[0])
final = 0x80 & byte1
rsv1 = 0x40 & byte1
rsv2 = 0x20 & byte1
rsv3 = 0x10 & byte1
opcode = 0x0F & byte1
byte2 = ord(self._recv_buffer[1])
mask = 0x80 & byte2
length = 0x7F & byte2
if length == 126:
if len(self._recv_buffer) < 4:
return
length = STRUCT_H.unpack(self._recv_buffer[2:4])
headlen = 4
elif length == 127:
if len(self._recv_buffer) < 10:
return
length = STRUCT_Q.unpack(self._recv_buffer[2:10])
headlen = 10
else:
headlen = 2
if mask:
if len(self._recv_buffer) < headlen + 4:
return
mask = [ord(x) for x in self._recv_buffer[headlen:headlen+4]]
headlen += 4
total_size = headlen + length
if len(self._recv_buffer) < total_size:
if len(self._recv_buffer) > self._recv_buffer_size_limit:
# TODO: Callback for handling this event?
self.close(reason=1009)
return
# Got a full message!
data = self._recv_buffer[headlen:total_size]
self._recv_buffer = self._recv_buffer[total_size:]
if mask:
new_data = ""
for i in xrange(len(data)):
new_data += chr(ord(data[i]) ^ mask[i % 4])
data = new_data
del new_data
# Control Frame Nonsense!
if opcode == FRAME_CLOSE:
if data:
reason = STRUCT_H.unpack(data[:2])[0]
message = data[2:]
else:
reason = 1000
message = None
self.close(True, reason, message)
return
elif opcode == FRAME_PING:
if self.connected:
self.write(data, frame=FRAME_PONG)
elif opcode == FRAME_PONG:
sent = self._pings.pop(data, None)
if sent:
data = time() - sent
self._safely_call(self.on_pong, data)
return
elif opcode == FRAME_CONTINUATION:
if not self._frag_frame:
self.close(reason=1002)
return
opcode = self._frag_frame
self._frag_frame = None
if opcode == FRAME_TEXT:
try:
data = data.decode('utf-8')
except UnicodeDecodeError:
self.close(reason=1007)
return
if not self._read_buffer:
self._read_buffer = data
self._rb_type = type(data)
elif not isinstance(data, self._rb_type):
# TODO: Improve wrong string type handling with event handler.
log.error("Received string type not matching buffer on %r." % self)
self.close(reason=1002)
return
else:
self._read_buffer += data
if not final:
if not opcode in (FRAME_BINARY, FRAME_TEXT):
log.error("Received fragment control frame on %r." % self)
self.close(reason=1002)
return
self._frag_frame = opcode
if self._read_delimiter is EntireMessage:
return
self._process_read_buffer()
if self._read_buffer and len(self._read_buffer) > self._recv_buffer_size_limit:
e = StreamBufferOverflow("Buffer length exceeded upper limit "
"on %r." % self)
self._safely_call(self.on_overflow_error, e)
return
def _con_close(self):
"""
Close the WebSocket.
"""
if hasattr(self, '_request'):
del self._request
if hasattr(self, '_headers'):
del self._headers
self.connected = False
self._closed = True
self._safely_call(self.on_close)
self._connection = None
def _con_write(self):
if self.connected:
self._safely_call(self.on_write)
##### Internal Processing Methods #########################################
def _process_read_buffer(self):
"""
Process the read_buffer. This is only used when the ReadDelimiter isn't
EntireMessage.
"""
while self._read_buffer:
delimiter = self._read_delimiter
if delimiter is None or delimiter is EntireMessage:
data = self._read_buffer
self._read_buffer = None
self._rb_type = None
self._safely_call(self.on_read, data)
elif isinstance(delimiter, (int, long)):
size = len(self._read_buffer)
if size < delimiter:
break
elif size == delimiter:
data = self._read_buffer
self._read_buffer = None
self._rb_type = None
else:
data = self._read_buffer[:delimiter]
self._read_buffer = self._read_buffer[delimiter:]
self._safely_call(self.on_read, data)
elif isinstance(delimiter, (bytes, unicode)):
if not isinstance(delimiter, self._rb_type):
log.error("buffer string type doesn't match read_delimiter "
"on %r." % self)
self.close(reason=1002)
break
mark = self._read_buffer.find(delimiter)
if mark == -1:
break
else:
data = self._read_buffer[:mark]
self._read_buffer = self._read_buffer[mark + len(delimiter):]
if not self._read_buffer:
self._read_buffer = None
self._rb_type = None
self._safely_call(self.on_read, data)
elif isinstance(delimiter, Struct):
if self._rb_type is not bytes:
log.error("buffer is not bytes for struct read_delimiter "
"on %r." % self)
self.close(reason=1002)
break
size = len(self._read_buffer)
if size < delimiter.size:
break
elif size == delimiter.size:
data = self._read_buffer
self._read_buffer = None
self._rb_type = None
else:
data = self._read_buffer[:delimiter.size]
self._read_buffer = self._read_buffer[delimiter.size:]
# Safely unpack it. This should *probably* never error.
try:
data = delimiter.unpack(data)
except struct.error:
log.exception("Unable to unpack data on %r." % self)
self.close(reason=1002)
break
# Unlike most on_read calls, this one sends every variable of
# the parsed data as its own argument.
self._safely_call(self.on_read, *data)
elif isinstance(delimiter, _NetStruct):
if self._rb_type is not bytes:
log.error("buffer is not bytes for struct read_delimiter "
"on %r." % self)
self.close(reason=1002)
break
if not self._netstruct_iter:
# We need to get started.
self._netstruct_iter = delimiter.iter_unpack()
self._netstruct_needed = next(self._netstruct_iter)
size = len(self._read_buffer)
if size < self._netstruct_needed:
break
elif size == self._netstruct_needed:
data = self._read_buffer
self._read_buffer = None
self._rb_type = None
else:
data = self._read_buffer[:self._netstruct_needed]
self._read_buffer = self._read_buffer[self._netstruct_needed:]
data = self._netstruct_iter.send(data)
if isinstance(data, (int, long)):
self._netstruct_needed = data
continue
# Still here? Then we've got an object. Delete the NetStruct
# state and send the data.
self._netstruct_needed = None
self._netstruct_iter = None
self._safely_call(self.on_read, *data)
elif isinstance(delimiter, RegexType):
if not isinstance(delimiter.pattern, self._rb_type):
log.error("buffer string type does not match "
"read_delimiter on %r." % self)
self.close(reason=1002)
break
# Depending on regex_search, we could do this two ways.
if self.regex_search:
match = delimiter.search(self._read_buffer)
if not match:
break
data = self._read_buffer[:match.start()]
self._read_buffer = self._read_buffer[match.end():]
if not self._read_buffer:
self._read_buffer = None
self._rb_type = None
else:
# Require the match to be at the beginning.
data = delimiter.match(self._read_buffer)
if not data:
break
self._read_buffer = self._read_buffer[data.end():]
if not self._read_buffer:
self._read_buffer = None
self._rb_type = None
# Send either the string or the match object.
self._safely_call(self.on_read, data)
else:
log.warning("Invalid read_delimiter on %r." % self)
break
if self._connection is None or not self.connected:
break
###############################################################################
# Support Functions
###############################################################################
def challenge_response(headers, key3):
"""
Calculate the response for a WebSocket security challenge and return it.
"""
resp = hashlib.md5()
for key in (headers.get('Sec-WebSocket-Key1'),
headers.get('Sec-WebSocket-Key2')):
n = ''
s = 0
for c in key:
if c.isdigit(): n += c
elif c == ' ': s += 1
n = int(n)
if n > 4294967295 or s == 0 or n % s != 0:
raise ValueError("The provided keys aren't valid.")
n /= s
resp.update(
chr(n >> 24 & 0xFF) +
chr(n >> 16 & 0xFF) +
chr(n >> 8 & 0xFF) +
chr(n & 0xFF)
)
resp.update(key3)
return resp.digest()