/*
* This file is part of Cockpit.
*
* Copyright (C) 2013 Red Hat, Inc.
*
* Cockpit is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation; either version 2.1 of the License, or
* (at your option) any later version.
*
* Cockpit is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with Cockpit; If not, see .
*/
#include "config.h"
#include "websocket.h"
#include "websocketprivate.h"
#include
/*
* SECTION:websocketconnection
* @title: WebSocketConnection
* @short_description: A WebSocket connection
*
* A #WebSocketConnection is a WebSocket connection to a peer. This API is modeled
* after the W3C API for interacting with WebSockets.
*
* Use the #WebSocketClient or #WebSocketServer derived classes on the
* appropriate sides. As a client, to connect to a Websocket server, use the
* web_socket_client_new() function. To handle a WebSocket connection from
* a client, you can use the web_socket_server_new_for_stream() function.
*
* The #WebSocketConnection:ready-state property will indicate the state of the
* connection. You can only send messages once the connection is in the
* %WEB_SOCKET_STATE_OPEN state. The #WebSocketConnection::open signal will fire
* when transitioning to this state.
*
* Use web_socket_connection_send() to send a message to the peer. When a
* message is received the #WebSocketConnection::message signal will fire.
*
* The web_socket_connection_close() function will perform an orderly close
* of the connection. The #WebSocketConnection::close signal will fire once
* the connection closes, whether it was initiated by this side or the peer.
*
* Connect to the #WebSocketConnection::closing signal to detect when either
* peer begins closing the connection. You can prevent closure of this side
* by returning %FALSE from the signal handler. You should in that case
* call the web_socket_connection_close() function at a later time to complete
* the close.
*/
/**
* WebSocketConnection:
*
* An abstract base class representing a WebSocket connection. Use
* instances of the derived #WebSocketClient or #WebSocketServer classes.
*/
/**
* WebSocketConnectionClass:
* @server_behavior: set by #WebSocketServer to %TRUE
* @handshake: used by derived classes to handle received HTTP handshake
* @open: default handler for the #WebSocketConnection::open signal
* @message: default handler for the #WebSocketConnection::message signal
* @error: default handler for the #WebSocketConnection::error signal
* @closing: the default handler for the #WebSocketConnection:closing signal
* @close: default handler for the #WebSocketConnection::close signal
*
* The abstract base class for #WebSocketConnection
*/
enum {
PROP_0,
PROP_URL,
PROP_PROTOCOL,
PROP_READY_STATE,
PROP_BUFFERED_AMOUNT,
PROP_IO_STREAM,
};
enum {
OPEN,
MESSAGE,
ERROR,
CLOSING,
CLOSE,
NUM_SIGNALS
};
static guint signals[NUM_SIGNALS] = { 0, };
typedef struct {
GBytes *data;
gboolean last;
gsize sent;
gsize amount;
} Frame;
struct _WebSocketConnectionPrivate
{
/* FALSE if client, TRUE if server */
gboolean server_side;
/*
* On the client this is the url we connect to,
* on the server, the one the socket lives at
*/
gchar *url;
gchar *chosen_protocol;
GSource *start_idle;
gboolean handshake_done;
gushort peer_close_code;
gchar *peer_close_data;
gboolean close_sent;
gboolean close_received;
gboolean dirty_close;
GSource *close_timeout;
GMainContext *main_context;
GIOStream *io_stream;
gboolean io_open;
gboolean io_closed;
GPollableInputStream *input;
GSource *input_source;
GByteArray *incoming;
GPollableOutputStream *output;
GSource *output_source;
GQueue outgoing;
/* Current message being assembled */
guint8 message_opcode;
GByteArray *message_data;
};
#define MAX_PAYLOAD 128 * 1024
G_DEFINE_ABSTRACT_TYPE (WebSocketConnection, web_socket_connection, G_TYPE_OBJECT);
static void
frame_free (gpointer data)
{
Frame *frame = data;
if (frame)
{
g_bytes_unref (frame->data);
g_slice_free (Frame, frame);
}
}
static void
web_socket_connection_init (WebSocketConnection *self)
{
WebSocketConnectionPrivate *pv;
pv = self->pv = G_TYPE_INSTANCE_GET_PRIVATE (self, WEB_SOCKET_TYPE_CONNECTION,
WebSocketConnectionPrivate);
g_queue_init (&pv->outgoing);
pv->main_context = g_main_context_ref_thread_default ();
}
static void
on_iostream_closed (GObject *source,
GAsyncResult *result,
gpointer user_data)
{
WebSocketConnection *self = user_data;
WebSocketConnectionPrivate *pv = self->pv;
GError *error = NULL;
gboolean unused;
/* We treat connection as closed even if close fails */
pv->io_closed = TRUE;
g_io_stream_close_finish (pv->io_stream, result, &error);
if (error)
{
g_message ("error closing web socket stream: %s", error->message);
if (!pv->dirty_close)
g_signal_emit (self, signals[ERROR], 0, error, &unused);
pv->dirty_close = TRUE;
g_error_free (error);
}
g_assert (web_socket_connection_get_ready_state (self) == WEB_SOCKET_STATE_CLOSED);
g_debug ("closed: completed io stream close");
g_signal_emit (self, signals[CLOSE], 0);
g_object_unref (self);
}
static void
stop_input (WebSocketConnection *self)
{
WebSocketConnectionPrivate *pv = self->pv;
if (pv->input_source)
{
g_debug ("stopping input source");
g_source_destroy (pv->input_source);
g_source_unref (pv->input_source);
pv->input_source = NULL;
}
}
static void
stop_output (WebSocketConnection *self)
{
WebSocketConnectionPrivate *pv = self->pv;
if (pv->output_source)
{
g_debug ("stopping output source");
g_source_destroy (pv->output_source);
g_source_unref (pv->output_source);
pv->output_source = NULL;
}
}
static void
close_io_stop_timeout (WebSocketConnection *self)
{
WebSocketConnectionPrivate *pv = self->pv;
if (pv->close_timeout)
{
g_source_destroy (pv->close_timeout);
g_source_unref (pv->close_timeout);
pv->close_timeout = NULL;
}
if (pv->start_idle)
{
g_source_destroy (pv->start_idle);
g_source_unref (pv->start_idle);
pv->start_idle = NULL;
}
}
static void
close_io_stream (WebSocketConnection *self)
{
WebSocketConnectionPrivate *pv = self->pv;
close_io_stop_timeout (self);
/* Close a connection that's not yet open */
if (!pv->io_stream && !pv->io_closed)
{
pv->io_closed = TRUE;
g_assert (web_socket_connection_get_ready_state (self) == WEB_SOCKET_STATE_CLOSED);
g_debug ("closed: no stream was opened");
g_signal_emit (self, signals[CLOSE], 0);
}
/* Close an open stream, which is not yet close_async'ing */
else if (pv->io_open)
{
stop_input (self);
stop_output (self);
pv->io_open = FALSE;
g_debug ("closing io stream");
g_io_stream_close_async (pv->io_stream, G_PRIORITY_DEFAULT,
NULL, on_iostream_closed, g_object_ref (self));
}
g_object_notify (G_OBJECT (self), "ready-state");
}
static void
shutdown_wr_io_stream (WebSocketConnection *self)
{
WebSocketConnectionPrivate *pv = self->pv;
GSocket *socket;
GError *error = NULL;
stop_output (self);
if (G_IS_SOCKET_CONNECTION (pv->io_stream))
{
socket = g_socket_connection_get_socket (G_SOCKET_CONNECTION (pv->io_stream));
g_socket_shutdown (socket, FALSE, TRUE, &error);
if (error != NULL)
{
g_message ("error shutting down io stream: %s", error->message);
g_error_free (error);
}
}
g_object_notify (G_OBJECT (self), "ready-state");
}
static gboolean
on_timeout_close_io (gpointer user_data)
{
WebSocketConnection *self = WEB_SOCKET_CONNECTION (user_data);
WebSocketConnectionPrivate *pv = self->pv;
pv->close_timeout = 0;
g_message ("peer did not close io when expected");
close_io_stream (self);
return FALSE;
}
static void
close_io_after_timeout (WebSocketConnection *self)
{
WebSocketConnectionPrivate *pv = self->pv;
const gint timeout = 5;
if (pv->close_timeout)
return;
g_debug ("waiting %d seconds for peer to close io", timeout);
pv->close_timeout = g_timeout_source_new_seconds (timeout);
g_source_set_callback (pv->close_timeout, on_timeout_close_io, self, NULL);
g_source_attach (pv->close_timeout, pv->main_context);
}
static void
xor_with_mask_rfc6455 (const guint8 *mask,
guint8 *data,
gsize len)
{
gsize n;
/* Do the masking */
for (n = 0; n < len; n++)
data[n] ^= mask[n & 3];
}
static void
send_prefixed_message_rfc6455 (WebSocketConnection *self,
WebSocketQueueFlags flags,
guint8 opcode,
const guint8 *prefix,
gsize prefix_len,
const guint8 *payload,
gsize payload_len)
{
gsize amount;
GByteArray *bytes;
gsize frame_len;
guint8 *outer;
guint8 *mask = 0;
guint8 *at;
gsize len;
guint64 size;
len = payload_len + prefix_len;
amount = len;
bytes = g_byte_array_sized_new (14 + len);
outer = bytes->data;
outer[0] = 0x80 | opcode;
/* If control message, truncate payload */
if (opcode & 0x08)
{
if (len > 125)
{
g_warning ("Truncating WebSocket control message payload");
if (prefix_len > 125)
prefix_len = 125;
payload_len = 125 - prefix_len;
len = 125;
}
/* Buffered amount of bytes is zero for control messages */
amount = 0;
}
size = len;
if (size < 126)
{
outer[1] = (0xFF & size); /* mask | 7-bit-len */
bytes->len = 2;
}
else if (size < 65536)
{
outer[1] = 126; /* mask | 16-bit-len */
outer[2] = (size >> 8) & 0xFF;
outer[3] = (size >> 0) & 0xFF;
bytes->len = 4;
}
else
{
outer[1] = 127; /* mask | 64-bit-len */
outer[2] = (size >> 56) & 0xFF;
outer[3] = (size >> 48) & 0xFF;
outer[4] = (size >> 40) & 0xFF;
outer[5] = (size >> 32) & 0xFF;
outer[6] = (size >> 24) & 0xFF;
outer[7] = (size >> 16) & 0xFF;
outer[8] = (size >> 8) & 0xFF;
outer[9] = (size >> 0) & 0xFF;
bytes->len = 10;
}
/*
* The server side doesn't need to mask, so we don't. There's
* probably a client somewhere that's not expecting it.
*/
if (!self->pv->server_side)
{
outer[1] |= 0x80;
mask = outer + bytes->len;
* ((guint32 *)mask) = g_random_int ();
bytes->len += 4;
}
at = bytes->data + bytes->len;
g_byte_array_append (bytes, prefix, prefix_len);
g_byte_array_append (bytes, payload, payload_len);
if (!self->pv->server_side)
xor_with_mask_rfc6455 (mask, at, len);
frame_len = bytes->len;
_web_socket_connection_queue (self, flags, g_byte_array_free (bytes, FALSE),
frame_len, amount);
g_debug ("queued rfc6455 %d frame of len %u", (gint)opcode, (guint)frame_len);
}
static void
send_message_rfc6455 (WebSocketConnection *self,
WebSocketQueueFlags flags,
guint8 opcode,
const guint8 *payload,
gsize payload_len)
{
return send_prefixed_message_rfc6455 (self, flags, opcode, NULL, 0, payload, payload_len);
}
static void
send_close_rfc6455 (WebSocketConnection *self,
WebSocketQueueFlags flags,
gushort code,
const gchar *reason)
{
/* Note that send_message truncates as expected */
gchar buffer[128];
gsize len = 0;
if (code != 0)
{
buffer[len++] = code >> 8;
buffer[len++] = code & 0xFF;
if (reason)
len += g_strlcpy (buffer + len, reason, sizeof (buffer) - len);
}
send_message_rfc6455 (self, flags, 0x08, (guint8 *)buffer, len);
self->pv->close_sent = TRUE;
}
gboolean
_web_socket_connection_error (WebSocketConnection *self,
GError *error)
{
gboolean unused;
if (web_socket_connection_get_ready_state (self) != WEB_SOCKET_STATE_CLOSED)
{
if (error)
{
self->pv->dirty_close = TRUE;
g_signal_emit (self, signals[ERROR], 0, error, &unused);
}
g_error_free (error);
return TRUE;
}
g_error_free (error);
return FALSE;
}
void
_web_socket_connection_error_and_close (WebSocketConnection *self,
GError *error,
gboolean prejudice)
{
gboolean ignore = FALSE;
gushort code;
if (error && error->domain == WEB_SOCKET_ERROR)
code = error->code;
else
code = WEB_SOCKET_CLOSE_GOING_AWAY;
if (!self->pv->server_side && error && error->domain == G_TLS_ERROR)
{
self->pv->peer_close_code = WEB_SOCKET_CLOSE_TLS_HANDSHAKE;
if (g_error_matches (error, G_TLS_ERROR, G_TLS_ERROR_NOT_TLS) ||
g_error_matches (error, G_TLS_ERROR, G_TLS_ERROR_MISC))
{
self->pv->peer_close_data = g_strdup ("protocol-error");
}
else if (g_error_matches (error, G_TLS_ERROR, G_TLS_ERROR_BAD_CERTIFICATE))
{
self->pv->peer_close_data = g_strdup ("unknown-hostkey");
}
}
if (!_web_socket_connection_error (self, error))
return;
if (!self->pv->handshake_done)
prejudice = TRUE;
/* If already closing, so just ignore this stuff */
switch (web_socket_connection_get_ready_state (self))
{
case WEB_SOCKET_STATE_CLOSED:
ignore = TRUE;
break;
case WEB_SOCKET_STATE_CLOSING:
ignore = !prejudice;
break;
default:
break;
}
if (ignore)
{
g_debug ("already closing/closed, ignoring error");
}
else if (prejudice)
{
g_debug ("forcing close due to error");
close_io_stream (self);
}
else
{
g_debug ("requesting close due to error");
send_close_rfc6455 (self, WEB_SOCKET_QUEUE_URGENT | WEB_SOCKET_QUEUE_LAST, code, NULL);
}
}
static void
protocol_error_and_close_full (WebSocketConnection *self,
gboolean prejudice)
{
GError *error = g_error_new_literal (WEB_SOCKET_ERROR,
WEB_SOCKET_CLOSE_PROTOCOL,
self->pv->server_side ?
"Received invalid WebSocket response from the server" :
"Received invalid WebSocket response from the client");
_web_socket_connection_error_and_close (self, error, prejudice);
}
static void
protocol_error_and_close (WebSocketConnection *self)
{
protocol_error_and_close_full (self, FALSE);
}
static void
bad_data_error_and_close (WebSocketConnection *self)
{
GError *error = g_error_new_literal (WEB_SOCKET_ERROR,
WEB_SOCKET_CLOSE_BAD_DATA,
self->pv->server_side ?
"Received invalid WebSocket data from the server" :
"Received invalid WebSocket data from the client");
_web_socket_connection_error_and_close (self, error, FALSE);
}
static void
too_big_error_and_close (WebSocketConnection *self,
gsize payload_len)
{
GError *error = g_error_new_literal (WEB_SOCKET_ERROR,
WEB_SOCKET_CLOSE_TOO_BIG,
self->pv->server_side ?
"Received extremely large WebSocket data from the server" :
"Received extremely large WebSocket data from the client");
g_message ("%s is trying to frame of size %" G_GSIZE_FORMAT " or greater, but max supported size is 128KiB",
self->pv->server_side ? "server" : "client", payload_len);
_web_socket_connection_error_and_close (self, error, TRUE);
/* The input is in an invalid state now */
stop_input (self);
}
static gboolean
web_socket_connection_default_error (WebSocketConnection *connection,
GError *error)
{
if (g_error_matches (error, G_TLS_ERROR, G_TLS_ERROR_EOF))
g_debug ("web socket error: %s", error->message);
else
g_message ("%s", error->message);
return TRUE;
}
static gboolean
web_socket_connection_default_closing (WebSocketConnection *self)
{
return TRUE;
}
static void
receive_close_rfc6455 (WebSocketConnection *self,
const guint8 *data,
gsize len)
{
WebSocketConnectionPrivate *pv = self->pv;
pv->peer_close_code = 0;
g_free (pv->peer_close_data);
pv->peer_close_data = NULL;
pv->close_received = TRUE;
/* Store the code/data payload */
if (len >= 2)
{
pv->peer_close_code = (guint16)data[0] << 8 | data[1];
}
if (len > 2)
{
data += 2;
len -= 2;
if (g_utf8_validate ((gchar *)data, len, NULL))
pv->peer_close_data = g_strndup ((gchar *)data, len);
else
g_message ("received non-UTF8 close data: %d '%.*s' %d", (int)len, (int)len, (gchar *)data, (int)data[0]);
}
/* Once we receive close response on server, close immediately */
if (pv->close_sent)
{
shutdown_wr_io_stream (self);
if (pv->server_side)
close_io_stream (self);
}
else
{
/* Send back the response */
web_socket_connection_close (self, pv->peer_close_code, NULL);
}
}
static void
receive_ping_rfc6455 (WebSocketConnection *self,
const guint8 *data,
gsize len)
{
/* Send back a pong with same data */
g_debug ("received ping, responding");
send_message_rfc6455 (self, WEB_SOCKET_QUEUE_URGENT, 0x0A, data, len);
}
static void
process_contents_rfc6455 (WebSocketConnection *self,
gboolean control,
gboolean fin,
guint8 opcode,
gconstpointer payload,
gsize payload_len)
{
WebSocketConnectionPrivate *pv = self->pv;
GBytes *message;
if (control)
{
/* Control frames must never be fragmented */
if (!fin)
{
g_message ("received fragmented control frame");
protocol_error_and_close (self);
return;
}
g_debug ("received control frame %d with %d payload", (int)opcode, (int)payload_len);
switch (opcode)
{
case 0x08:
receive_close_rfc6455 (self, payload, payload_len);
break;
case 0x09:
receive_ping_rfc6455 (self, payload, payload_len);
break;
case 0x0A:
break;
default:
g_message ("received unsupported control frame: %d", (gint)opcode);
break;
}
}
else if (pv->close_received)
{
g_message ("received message after close was received");
}
/* A message frame */
else
{
/* Initial fragment of a message */
if (!fin && opcode)
{
if (pv->message_data)
{
g_message ("received out of order inital message fragment");
protocol_error_and_close (self);
return;
}
g_debug ("received inital fragment frame %d with %d payload", (int)opcode, (int)payload_len);
}
/* Middle fragment of a message */
else if (!fin && !opcode)
{
if (!pv->message_data)
{
g_message ("received out of order middle message fragment");
protocol_error_and_close (self);
return;
}
g_debug ("received middle fragment frame with %d payload", (int)payload_len);
}
/* Last fragment of a message */
else if (fin && !opcode)
{
if (!pv->message_data)
{
g_message ("received out of order ending message fragment");
protocol_error_and_close (self);
return;
}
g_debug ("received last fragment frame with %d payload", (int)payload_len);
}
/* An unfragmented message */
else
{
g_assert (opcode != 0);
if (pv->message_data)
{
g_message ("received unfragmented message when fragment was expected");
protocol_error_and_close (self);
return;
}
g_debug ("received frame %d with %d payload", (int)opcode, (int)payload_len);
}
if (opcode)
{
pv->message_opcode = opcode;
pv->message_data = g_byte_array_sized_new (payload_len);
}
switch (pv->message_opcode)
{
case 0x01:
if (!g_utf8_validate ((gchar *)payload, payload_len, NULL))
{
g_message ("received invalid non-UTF8 text data");
/* Discard the entire message */
g_byte_array_unref (pv->message_data);
pv->message_data = NULL;
pv->message_opcode = 0;
bad_data_error_and_close (self);
return;
}
/* fall through */
case 0x02:
g_byte_array_append (pv->message_data, payload, payload_len);
break;
default:
g_debug ("received unknown data frame: %d", (gint)opcode);
break;
}
/* Actually deliver the message? */
if (fin)
{
/* Always null terminate, as a convenience */
g_byte_array_append (pv->message_data, (guchar *)"\0", 1);
/* But don't include the null terminator in the byte count */
pv->message_data->len--;
opcode = pv->message_opcode;
message = g_byte_array_free_to_bytes (pv->message_data);
pv->message_data = NULL;
pv->message_opcode = 0;
g_debug ("message: delivering %d with %d length",
(int)opcode, (int)g_bytes_get_size (message));
g_signal_emit (self, signals[MESSAGE], 0, (int)opcode, message);
g_bytes_unref (message);
}
}
}
static gboolean
process_frame_rfc6455 (WebSocketConnection *self)
{
guint8 *header;
guint8 *payload;
guint64 payload_len;
guint8 *mask;
gboolean fin;
gboolean control;
gboolean masked;
guint8 opcode;
gsize len;
gsize at;
len = self->pv->incoming->len;
if (len < 2)
return FALSE; /* need more data */
header = self->pv->incoming->data;
fin = ((header[0] & 0x80) != 0);
control = header[0] & 0x08;
opcode = header[0] & 0x0f;
masked = ((header[1] & 0x80) != 0);
switch (header[1] & 0x7f)
{
case 126:
at = 4;
if (len < at)
return FALSE; /* need more data */
payload_len = ((guint16)header[2] << 8) |
((guint16)header[3] << 0);
break;
case 127:
at = 10;
if (len < at)
return FALSE; /* need more data */
payload_len = ((guint64)header[2] << 56) |
((guint64)header[3] << 48) |
((guint64)header[4] << 40) |
((guint64)header[5] << 32) |
((guint64)header[6] << 24) |
((guint64)header[7] << 16) |
((guint64)header[8] << 8) |
((guint64)header[9] << 0);
break;
default:
payload_len = header[1] & 0x7f;
at = 2;
break;
}
/* Safety valve */
if (payload_len >= MAX_PAYLOAD)
{
too_big_error_and_close (self, payload_len);
return FALSE;
}
if (len < at + payload_len)
return FALSE; /* need more data */
payload = header + at;
if (masked)
{
mask = header + at;
payload += 4;
at += 4;
if (len < at + payload_len)
return FALSE; /* need more data */
xor_with_mask_rfc6455 (mask, payload, payload_len);
}
/*
* Note that now that we've unmasked, we've modified the buffer, we can
* only return below via discarding or processing the message
*/
process_contents_rfc6455 (self, control, fin, opcode, payload, payload_len);
/* Move past the parsed frame */
g_byte_array_remove_range (self->pv->incoming, 0, at + payload_len);
return TRUE;
}
static void
process_incoming (WebSocketConnection *self)
{
WebSocketConnectionPrivate *pv = self->pv;
WebSocketConnectionClass *klass;
gboolean more;
if (!pv->handshake_done)
{
klass = WEB_SOCKET_CONNECTION_GET_CLASS (self);
g_assert (klass->handshake != NULL);
if ((klass->handshake) (self, pv->incoming))
{
pv->handshake_done = TRUE;
g_object_notify (G_OBJECT (self), "ready-state");
g_signal_emit (self, signals[OPEN], 0);
}
}
if (pv->handshake_done)
{
do
{
more = process_frame_rfc6455 (self);
}
while (more);
}
}
static gboolean
on_web_socket_input (GObject *pollable_stream,
gpointer user_data)
{
WebSocketConnection *self = WEB_SOCKET_CONNECTION (user_data);
WebSocketConnectionPrivate *pv = self->pv;
GError *error = NULL;
gboolean end = FALSE;
gssize count;
gsize len;
do
{
len = pv->incoming->len;
g_byte_array_set_size (pv->incoming, len + 1024);
count = g_pollable_input_stream_read_nonblocking (pv->input,
pv->incoming->data + len,
1024, NULL, &error);
if (count < 0)
{
if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
{
g_error_free (error);
count = 0;
}
else
{
_web_socket_connection_error_and_close (self, error, TRUE);
return TRUE;
}
}
else if (count == 0)
{
end = TRUE;
}
pv->incoming->len = len + count;
}
while (count > 0);
process_incoming (self);
if (end)
{
if (!pv->close_sent || !pv->close_received)
{
pv->dirty_close = TRUE;
g_message ("connection unexpectedly closed by peer");
}
else
{
g_debug ("peer has closed socket");
}
close_io_stream (self);
}
return TRUE;
}
static void
start_input (WebSocketConnection *self)
{
WebSocketConnectionPrivate *pv = self->pv;
if (pv->input_source)
return;
g_debug ("starting input source");
pv->input_source = g_pollable_input_stream_create_source (pv->input, NULL);
g_source_set_callback (pv->input_source, (GSourceFunc)on_web_socket_input, self, NULL);
g_source_attach (pv->input_source, pv->main_context);
}
static gboolean
on_web_socket_output (GObject *pollable_stream,
gpointer user_data)
{
WebSocketConnection *self = WEB_SOCKET_CONNECTION (user_data);
WebSocketConnectionPrivate *pv = self->pv;
const guint8 *data;
GError *error = NULL;
Frame *frame;
gssize count;
gsize len;
frame = g_queue_peek_head (&pv->outgoing);
/* No more frames to send */
if (frame == NULL)
{
stop_output (self);
return TRUE;
}
data = g_bytes_get_data (frame->data, &len);
g_assert (len > 0);
g_assert (len > frame->sent);
count = g_pollable_output_stream_write_nonblocking (pv->output,
data + frame->sent,
len - frame->sent,
NULL, &error);
if (count < 0)
{
if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK))
{
g_clear_error (&error);
count = 0;
}
else
{
_web_socket_connection_error_and_close (self, error, TRUE);
return FALSE;
}
}
frame->sent += count;
if (frame->sent >= len)
{
g_debug ("sent frame");
g_queue_pop_head (&pv->outgoing);
if (frame->last)
{
if (pv->server_side)
{
close_io_stream (self);
}
else
{
shutdown_wr_io_stream (self);
close_io_after_timeout (self);
}
}
frame_free (frame);
}
return TRUE;
}
static void
start_output (WebSocketConnection *self)
{
WebSocketConnectionPrivate *pv = self->pv;
if (pv->output_source)
return;
g_debug ("starting output source");
pv->output_source = g_pollable_output_stream_create_source (pv->output, NULL);
g_source_set_callback (pv->output_source, (GSourceFunc)on_web_socket_output, self, NULL);
g_source_attach (pv->output_source, pv->main_context);
}
void
_web_socket_connection_queue (WebSocketConnection *self,
WebSocketQueueFlags flags,
gpointer data,
gsize len,
gsize amount)
{
WebSocketConnectionPrivate *pv = self->pv;
Frame *frame;
Frame *prev;
g_return_if_fail (WEB_SOCKET_IS_CONNECTION (self));
g_return_if_fail (pv->close_sent == FALSE);
g_return_if_fail (data != NULL);
g_return_if_fail (len > 0);
frame = g_slice_new0 (Frame);
frame->data = g_bytes_new_take (data, len);
frame->amount = amount;
frame->last = (flags & WEB_SOCKET_QUEUE_LAST) ? TRUE : FALSE;
/* If urgent put at front of queue */
if (flags & WEB_SOCKET_QUEUE_URGENT)
{
/* But we can't interrupt a message already partially sent */
prev = g_queue_pop_head (&pv->outgoing);
if (prev == NULL)
{
g_queue_push_head (&pv->outgoing, frame);
}
else if (prev->sent > 0)
{
g_queue_push_head (&pv->outgoing, frame);
g_queue_push_head (&pv->outgoing, prev);
}
else
{
g_queue_push_head (&pv->outgoing, prev);
g_queue_push_head (&pv->outgoing, frame);
}
}
else
{
g_queue_push_tail (&pv->outgoing, frame);
}
start_output (self);
}
static gboolean
check_streams (WebSocketConnection *self)
{
WebSocketConnectionPrivate *pv = self->pv;
if (!pv->input || !g_pollable_input_stream_can_poll (pv->input))
{
g_critical ("WebSocket input stream is invalid or cannot poll");
return FALSE;
}
if (!pv->output || !g_pollable_output_stream_can_poll (pv->output))
{
g_critical ("WebSocket output stream is invalid or cannot poll");
return FALSE;
}
return TRUE;
}
void
_web_socket_connection_take_incoming (WebSocketConnection *self,
GByteArray *input_buffer)
{
g_return_if_fail (WEB_SOCKET_IS_CONNECTION (self));
g_return_if_fail (self->pv->incoming == NULL);
self->pv->incoming = input_buffer;
}
static gboolean
on_idle_start_input (gpointer user_data)
{
WebSocketConnection *self = WEB_SOCKET_CONNECTION (user_data);
g_source_unref (self->pv->start_idle);
self->pv->start_idle = NULL;
if (check_streams (self))
{
start_input (self);
process_incoming (self);
}
return FALSE;
}
void
_web_socket_connection_take_io_stream (WebSocketConnection *self,
GIOStream *io_stream)
{
WebSocketConnectionPrivate *pv = self->pv;
GInputStream *is;
GOutputStream *os;
g_return_if_fail (WEB_SOCKET_IS_CONNECTION (self));
g_return_if_fail (G_IS_IO_STREAM (io_stream));
g_return_if_fail (pv->io_stream == NULL);
pv->io_stream = io_stream;
is = g_io_stream_get_input_stream (io_stream);
os = g_io_stream_get_output_stream (io_stream);
if (G_IS_POLLABLE_INPUT_STREAM (is))
pv->input = G_POLLABLE_INPUT_STREAM (is);
if (G_IS_POLLABLE_OUTPUT_STREAM (os))
pv->output = G_POLLABLE_OUTPUT_STREAM (os);
pv->io_open = TRUE;
g_object_notify (G_OBJECT (self), "io-stream");
/* Start handshake from the main context */
pv->start_idle = g_idle_source_new ();
g_source_set_priority (pv->start_idle, G_PRIORITY_HIGH);
g_source_set_callback (pv->start_idle, (GSourceFunc)on_idle_start_input,
g_object_ref (self), g_object_unref);
g_source_attach (pv->start_idle, pv->main_context);
}
static void
web_socket_connection_constructed (GObject *object)
{
WebSocketConnection *self = WEB_SOCKET_CONNECTION (object);
WebSocketConnectionPrivate *pv = self->pv;
WebSocketConnectionClass *klass;
G_OBJECT_CLASS (web_socket_connection_parent_class)->constructed (object);
/*
* Here we choose a side to be based on our derived class. The handshake
* is different on either client/server side, as is the expectation of
* how to mask data.
*/
klass = WEB_SOCKET_CONNECTION_GET_CLASS (self);
pv->server_side = klass->server_behavior;
if (!pv->incoming)
pv->incoming = g_byte_array_sized_new (1024);
}
static void
web_socket_connection_get_property (GObject *object,
guint prop_id,
GValue *value,
GParamSpec *pspec)
{
WebSocketConnection *self = WEB_SOCKET_CONNECTION (object);
switch (prop_id)
{
case PROP_URL:
g_value_set_string (value, web_socket_connection_get_url (self));
break;
case PROP_PROTOCOL:
g_value_set_string (value, web_socket_connection_get_protocol (self));
break;
case PROP_READY_STATE:
g_value_set_int (value, web_socket_connection_get_ready_state (self));
break;
case PROP_BUFFERED_AMOUNT:
g_value_set_ulong (value, web_socket_connection_get_buffered_amount (self));
break;
case PROP_IO_STREAM:
g_value_set_object (value, web_socket_connection_get_io_stream (self));
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
}
}
static void
web_socket_connection_set_property (GObject *object,
guint prop_id,
const GValue *value,
GParamSpec *pspec)
{
WebSocketConnection *self = WEB_SOCKET_CONNECTION (object);
WebSocketConnectionPrivate *pv = self->pv;
GIOStream *io_stream;
switch (prop_id)
{
case PROP_URL:
g_return_if_fail (pv->url == NULL);
pv->url = g_value_dup_string (value);
break;
case PROP_IO_STREAM:
g_return_if_fail (pv->io_stream == NULL);
io_stream = g_value_dup_object (value);
if (io_stream)
_web_socket_connection_take_io_stream (self, io_stream);
break;
default:
G_OBJECT_WARN_INVALID_PROPERTY_ID (object, prop_id, pspec);
break;
}
}
static void
web_socket_connection_dispose (GObject *object)
{
WebSocketConnection *self = WEB_SOCKET_CONNECTION (object);
self->pv->dirty_close = TRUE;
close_io_stream (self);
G_OBJECT_CLASS (web_socket_connection_parent_class)->dispose (object);
}
static void
web_socket_connection_finalize (GObject *object)
{
WebSocketConnection *self = WEB_SOCKET_CONNECTION (object);
WebSocketConnectionPrivate *pv = self->pv;
g_free (pv->url);
g_free (pv->chosen_protocol);
g_free (pv->peer_close_data);
g_main_context_unref (pv->main_context);
if (pv->incoming)
g_byte_array_free (pv->incoming, TRUE);
while (!g_queue_is_empty (&pv->outgoing))
frame_free (g_queue_pop_head (&pv->outgoing));
g_clear_object (&pv->io_stream);
g_assert (!pv->input_source);
g_assert (!pv->output_source);
g_assert (!pv->io_open);
g_assert (pv->io_closed);
g_assert (!pv->close_timeout);
if (pv->start_idle)
g_source_unref (pv->start_idle);
if (pv->message_data)
g_byte_array_free (pv->message_data, TRUE);
G_OBJECT_CLASS (web_socket_connection_parent_class)->finalize (object);
}
static void
web_socket_connection_class_init (WebSocketConnectionClass *klass)
{
GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
g_type_class_add_private (klass, sizeof (WebSocketConnectionPrivate));
gobject_class->constructed = web_socket_connection_constructed;
gobject_class->get_property = web_socket_connection_get_property;
gobject_class->set_property = web_socket_connection_set_property;
gobject_class->dispose = web_socket_connection_dispose;
gobject_class->finalize = web_socket_connection_finalize;
/**
* WebSocketConnection:url:
*
* The URL of the WebSocket.
*
* For servers this represents the address of the WebSocket, and
* for clients it is the address connected to. This is required
* as a construct property.
*/
g_object_class_install_property (gobject_class, PROP_URL,
g_param_spec_string ("url", "URL", "The WebSocket URL", NULL,
G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY | G_PARAM_STATIC_STRINGS));
/**
* WebSocketConnection:protocol:
*
* The chosen protocol. Only becomes valid after the #WebSocketConnection:open
* signal has been fired, and when we are in the %WEB_SOCKET_STATE_OPEN state.
*
* May be NULL if neither peer cares about protocols.
*/
g_object_class_install_property (gobject_class, PROP_PROTOCOL,
g_param_spec_string ("protocol", "Protocol", "The chosen WebSocket protocol", NULL,
G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
/**
* WebSocketConnection:ready-state:
*
* The current state of the WebSocket.
*/
g_object_class_install_property (gobject_class, PROP_READY_STATE,
g_param_spec_int ("ready-state", "Ready state", "Ready state ",
WEB_SOCKET_STATE_CONNECTING, WEB_SOCKET_STATE_CLOSED, WEB_SOCKET_STATE_CONNECTING,
G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
/**
* WebSocketConnection:buffered-amount:
*
* This represents caller provided data passed into the
* web_socket_connection_send() function, which has been queued but not
* yet been sent.
*/
g_object_class_install_property (gobject_class, PROP_BUFFERED_AMOUNT,
g_param_spec_ulong ("buffered-amount", "Buffered amount", "Outstanding amount of data buffered",
0, G_MAXULONG, 0,
G_PARAM_READABLE | G_PARAM_STATIC_STRINGS));
/**
* WebSocketConnection:io-stream:
*
* The underlying IO stream the WebSocket is communicating over. For servers
* this must be specified as a construct property. For clients, this may be
* specified, if you have a stream that you've alrady connected to.
*
* The input and output streams must be pollable streams.
*/
g_object_class_install_property (gobject_class, PROP_IO_STREAM,
g_param_spec_object ("io-stream", "IO Stream", "Underlying io stream", G_TYPE_IO_STREAM,
G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY | G_PARAM_STATIC_STRINGS));
/**
* WebSocketConnection::open:
* @self: the WebSocket
*
* Emitted when the connection opens and is ready for communication.
*
* This will be emitted at most once. But if the connection fails during
* connecting, then this signal will not be emitted.
*/
signals[OPEN] = g_signal_new ("open",
WEB_SOCKET_TYPE_CONNECTION,
G_SIGNAL_RUN_FIRST,
G_STRUCT_OFFSET (WebSocketConnectionClass, open),
NULL, NULL, g_cclosure_marshal_generic,
G_TYPE_NONE, 0);
/**
* WebSocketConnection::message:
* @self: the WebSocket
* @type: the type of message contents
* @message: the message data
*
* Emitted when we receive a message from the peer.
*
* As a convenience, the @message data will always be null-terminated, but
* the null-terminator will not be included in the length count.
* This signal may emitted multiple times.
*/
signals[MESSAGE] = g_signal_new ("message",
WEB_SOCKET_TYPE_CONNECTION,
G_SIGNAL_RUN_FIRST,
G_STRUCT_OFFSET (WebSocketConnectionClass, message),
NULL, NULL, g_cclosure_marshal_generic,
G_TYPE_NONE, 2, G_TYPE_INT, G_TYPE_BYTES);
/**
* WebSocketConnection::error:
* @self: the WebSocket
* @error: the error that occured
*
* Emitted when an error occurred on the WebSocket. This may be fired
* multiple times. Fatal errors will be followed by the #WebSocketConnection::close
* signal being emitted.
*/
signals[ERROR] = g_signal_new ("error",
WEB_SOCKET_TYPE_CONNECTION,
G_SIGNAL_RUN_LAST,
G_STRUCT_OFFSET (WebSocketConnectionClass, error),
g_signal_accumulator_true_handled, NULL,
g_cclosure_marshal_generic,
G_TYPE_BOOLEAN, 1, G_TYPE_ERROR);
/**
* WebSocketConnection::closing:
* @self: the WebSocket
*
* This signal will be emitted during an orderly close
*/
signals[CLOSING] = g_signal_new ("closing",
WEB_SOCKET_TYPE_CONNECTION,
G_SIGNAL_RUN_LAST,
G_STRUCT_OFFSET (WebSocketConnectionClass, closing),
g_signal_accumulator_true_handled, NULL, g_cclosure_marshal_generic,
G_TYPE_BOOLEAN, 0);
klass->error = web_socket_connection_default_error;
klass->closing = web_socket_connection_default_closing;
/**
* WebSocketConnection::close:
* @self: the WebSocket
*
* Emitted when the connection has completely closed, either due to an
* orderly close from the peer, one initiated via web_socket_connection_close()
* or a fatal error condition that caused a close.
*
* This signal will be emitted once.
*/
signals[CLOSE] = g_signal_new ("close",
WEB_SOCKET_TYPE_CONNECTION,
G_SIGNAL_RUN_FIRST,
G_STRUCT_OFFSET (WebSocketConnectionClass, close),
NULL, NULL, g_cclosure_marshal_generic,
G_TYPE_NONE, 0);
}
/**
* web_socket_connection_get_url:
* @self: the WebSocket
*
* Get the URL of the WebSocket.
*
* For servers this represents the address of the WebSocket, and
* for clients it is the address connected to.
*
* Returns: the URL
*/
const gchar *
web_socket_connection_get_url (WebSocketConnection *self)
{
g_return_val_if_fail (WEB_SOCKET_IS_CONNECTION (self), NULL);
return self->pv->url;
}
/**
* web_socket_connection_get_protocol:
* @self: the WebSocket
*
* Get the protocol chosen via negotiation with the peer.
*
* A list of possible protocols is provided when creating a #WebSocketClient
* or #WebSocketServer, and one is negotiated during the handshake.
*
* This will be %NULL until the WebSocket is in the %WEB_SOCKET_STATE_OPEN
* state.
*
* Returns: the chosen protocol or %NULL
*/
const gchar *
web_socket_connection_get_protocol (WebSocketConnection *self)
{
g_return_val_if_fail (WEB_SOCKET_IS_CONNECTION (self), NULL);
return self->pv->chosen_protocol;
}
/**
* web_socket_connection_get_ready_state:
* @self: the WebSocket
*
* Get the current state of the WebSocket.
*
* Returns: the state
*/
WebSocketState
web_socket_connection_get_ready_state (WebSocketConnection *self)
{
g_return_val_if_fail (WEB_SOCKET_IS_CONNECTION (self), 0);
if (self->pv->io_closed)
return WEB_SOCKET_STATE_CLOSED;
else if ((self->pv->io_stream && !self->pv->io_open) || self->pv->close_sent)
return WEB_SOCKET_STATE_CLOSING;
else if (self->pv->handshake_done)
return WEB_SOCKET_STATE_OPEN;
else
return WEB_SOCKET_STATE_CONNECTING;
}
/**
* web_socket_connection_get_buffered_amount:
* @self: the WebSocket
*
* Get the amount of buffered data not yet sent.
*
* This represents caller provided data passed into the
* web_socket_connection_send() function.
*
* Returns: the amount of buffered data
*/
gsize
web_socket_connection_get_buffered_amount (WebSocketConnection *self)
{
gsize amount = 0;
Frame *frame;
GList *l;
g_return_val_if_fail (WEB_SOCKET_IS_CONNECTION (self), 0);
for (l = self->pv->outgoing.head; l != NULL; l = g_list_next (l))
{
frame = l->data;
amount += frame->amount;
}
return amount;
}
/**
* web_socket_connection_get_io_stream:
* @self: the WebSocket
*
* Get the IO stream the WebSocket is communicating over.
*
* Returns: (transfer none): the amount of buffered data
*/
GIOStream *
web_socket_connection_get_io_stream (WebSocketConnection *self)
{
g_return_val_if_fail (WEB_SOCKET_IS_CONNECTION (self), NULL);
return self->pv->io_stream;
}
/**
* web_socket_connection_get_close_code:
* @self: the WebSocket
*
* Get the close code received from the WebSocket peer.
*
* This only becomes valid once the WebSocket is in the
* %WEB_SOCKET_STATE_CLOSED state. The value will often be in the
* #WebSocketCloseCodes enumeration, but may also be an application
* defined close code.
*
* Returns: the close code or zero.
*/
gushort
web_socket_connection_get_close_code (WebSocketConnection *self)
{
g_return_val_if_fail (WEB_SOCKET_IS_CONNECTION (self), 0);
return self->pv->peer_close_code;
}
/**
* web_socket_connection_get_close_data:
* @self: the WebSocket
*
* Get the close data received from the WebSocket peer.
*
* This only becomes valid once the WebSocket is in the
* %WEB_SOCKET_STATE_CLOSED state. The data may be freed once
* the main loop is run, so copy it if you need to keep it around.
*
* Returns: the close data or %NULL
*/
const gchar *
web_socket_connection_get_close_data (WebSocketConnection *self)
{
g_return_val_if_fail (WEB_SOCKET_IS_CONNECTION (self), NULL);
return self->pv->peer_close_data;
}
/**
* web_socket_connection_send:
* @self: the WebSocket
* @type: the data type of message
* @prefix: (allow-none): an optional prefix prepended to the message
* @message: the message contents
*
* Send a message to the peer.
*
* The @type parameter describes whether this is a binary or text message.
* If a text message then the contents must be UTF-8 valid.
*
* The message is queued to be sent and will be sent when the main loop
* is run.
*
* The optional @prefix can be a canned header to be prefixed to the message.
* It can be specified as a separate argument for efficiency.
*/
void
web_socket_connection_send (WebSocketConnection *self,
WebSocketDataType type,
GBytes *prefix,
GBytes *message)
{
gconstpointer pref = NULL;
gsize prefix_len = 0;
gconstpointer payload;
gsize payload_len;
guint8 opcode;
g_return_if_fail (WEB_SOCKET_IS_CONNECTION (self));
g_return_if_fail (message != NULL);
if (web_socket_connection_get_ready_state (self) != WEB_SOCKET_STATE_OPEN)
{
g_critical ("Can only send messages when WebSocket is open");
return;
}
if (prefix)
pref = g_bytes_get_data (prefix, &prefix_len);
payload = g_bytes_get_data (message, &payload_len);
switch (type)
{
case WEB_SOCKET_DATA_TEXT:
opcode = 0x01;
if (!g_utf8_validate (pref, prefix_len, NULL) ||
!g_utf8_validate (payload, payload_len, NULL))
{
g_critical ("invalid non-UTF8 @data passed as text to web_socket_connection_send()");
return;
}
break;
case WEB_SOCKET_DATA_BINARY:
opcode = 0x02;
break;
default:
g_critical ("invalid @type argument for web_socket_connection_send()");
return;
}
send_prefixed_message_rfc6455 (self, WEB_SOCKET_QUEUE_NORMAL, opcode,
pref, prefix_len, payload, payload_len);
g_object_notify (G_OBJECT (self), "buffered-amount");
}
/**
* web_socket_connection_close:
* @self: the WebSocket
* @code: close code
* @data: (allow-none): close data
*
* Close the connection in an orderly fashion.
*
* Note that until the #WebSocketConnection::close signal fires, the connection
* is not yet completely closed. The close message is not even sent until the
* main loop runs.
*
* The @code and @data are sent to the peer along with the close request.
* Note that the @data must be UTF-8 valid.
*/
void
web_socket_connection_close (WebSocketConnection *self,
gushort code,
const gchar *data)
{
WebSocketQueueFlags flags;
gboolean handled = FALSE;
g_return_if_fail (WEB_SOCKET_IS_CONNECTION (self));
g_return_if_fail (!self->pv->close_sent);
g_signal_emit (self, signals[CLOSING], 0, &handled);
if (!handled)
return;
if (self->pv->close_received)
g_debug ("responding to close request");
if (self->pv->handshake_done)
{
flags = 0;
if (self->pv->server_side && self->pv->close_received)
flags |= WEB_SOCKET_QUEUE_LAST;
send_close_rfc6455 (self, flags, code, data);
close_io_after_timeout (self);
}
else
{
close_io_stream (self);
}
}
gboolean
_web_socket_connection_choose_protocol (WebSocketConnection *self,
const gchar **protocols,
const gchar *value)
{
WebSocketConnectionPrivate *pv = self->pv;
gboolean chosen = FALSE;
gchar **values;
gint i, j;
g_free (pv->chosen_protocol);
pv->chosen_protocol = NULL;
/* Automatically select one */
if (!value)
{
if (protocols)
{
pv->chosen_protocol = g_strdup (protocols[0]);
g_debug ("automatically selected protocol: %s", pv->chosen_protocol);
}
g_object_notify (G_OBJECT (self), "protocol");
return TRUE;
}
/* Choose one from what client/server agree on */
values = g_strsplit_set (value, ", ", -1);
/* Accept any protocol */
if (!protocols)
{
pv->chosen_protocol = g_strdup (values[0]);
g_debug ("automatically selected protocol: %s", pv->chosen_protocol);
chosen = TRUE;
}
for (j = 0; !chosen && values[j] != NULL; j++)
{
for (i = 0; protocols[i] != NULL; i++)
{
if (g_str_equal (protocols[i], values[j]))
{
pv->chosen_protocol = g_strdup (values[j]);
g_debug ("agreed on protocol: %s", pv->chosen_protocol);
chosen = TRUE;
}
}
}
g_strfreev (values);
if (chosen)
g_object_notify (G_OBJECT (self), "protocol");
else
g_message ("received invalid or unsupported Sec-WebSocket-Protocol: %s", value);
return chosen;
}
GMainContext *
_web_socket_connection_get_main_context (WebSocketConnection *self)
{
g_return_val_if_fail (WEB_SOCKET_IS_CONNECTION (self), NULL);
return self->pv->main_context;
}