/* * This file is part of Cockpit. * * Copyright (C) 2014 Red Hat, Inc. * * Cockpit is free software; you can redistribute it and/or modify it * under the terms of the GNU Lesser General Public License as published by * the Free Software Foundation; either version 2.1 of the License, or * (at your option) any later version. * * Cockpit is distributed in the hope that it will be useful, but * WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public License * along with Cockpit; If not, see . */ #include "config.h" #include "cockpittransport.h" #include "common/cockpitjson.h" #include #include typedef struct { gconstpointer channel; JsonObject *control; GBytes *data; } FrozenMessage; static void frozen_message_free (gpointer data) { FrozenMessage *frozen = data; if (frozen->data) g_bytes_unref (frozen->data); if (frozen->control) json_object_unref (frozen->control); g_slice_free (FrozenMessage, frozen); } enum { RECV, CONTROL, CLOSED, NUM_SIGNALS }; static guint signals[NUM_SIGNALS]; G_DEFINE_ABSTRACT_TYPE (CockpitTransport, cockpit_transport, G_TYPE_OBJECT); struct _CockpitTransportPrivate { GHashTable *freeze; GQueue *frozen; }; static void cockpit_transport_init (CockpitTransport *self) { self->priv = G_TYPE_INSTANCE_GET_PRIVATE (self, COCKPIT_TYPE_TRANSPORT, CockpitTransportPrivate); } static void cockpit_transport_get_property (GObject *object, guint property_id, GValue *value, GParamSpec *pspec) { /* Should be overridden by derived abstract classes */ G_OBJECT_WARN_INVALID_PROPERTY_ID (object, property_id, pspec); } static gboolean maybe_freeze_message (CockpitTransport *self, const gchar *channel, JsonObject *control, GBytes *data) { FrozenMessage *frozen = NULL; if (self->priv->freeze && channel) { /* Note that we dig out the real value for the channel */ channel = g_hash_table_lookup (self->priv->freeze, channel); if (channel) { frozen = g_slice_new0 (FrozenMessage); frozen->channel = channel; /* owned by hashtable */ frozen->data = g_bytes_ref (data); if (control) frozen->control = json_object_ref (control); if (!self->priv->frozen) self->priv->frozen = g_queue_new (); g_queue_push_tail (self->priv->frozen, frozen); return TRUE; } } return FALSE; } static gboolean cockpit_transport_default_recv (CockpitTransport *transport, const gchar *channel, GBytes *payload) { const gchar *inner_channel; JsonObject *options; const gchar *command = NULL; /* Our default handler parses control channel and fires control signal */ if (channel) return FALSE; /* Read out the actual command and channel this message is about */ if (!cockpit_transport_parse_command (payload, &command, &inner_channel, &options)) { /* Warning already logged */ cockpit_transport_close (transport, "protocol-error"); return TRUE; } cockpit_transport_emit_control (transport, command, inner_channel, options, payload); json_object_unref (options); return TRUE; } static void cockpit_transport_finalize (GObject *object) { CockpitTransport *self = COCKPIT_TRANSPORT (object); if (self->priv->freeze) g_hash_table_destroy (self->priv->freeze); if (self->priv->frozen) g_queue_free_full (self->priv->frozen, frozen_message_free); G_OBJECT_CLASS (cockpit_transport_parent_class)->finalize (object); } static void cockpit_transport_class_init (CockpitTransportClass *klass) { GObjectClass *object_class = G_OBJECT_CLASS (klass); klass->recv = cockpit_transport_default_recv; object_class->get_property = cockpit_transport_get_property; object_class->finalize = cockpit_transport_finalize; g_object_class_install_property (object_class, 1, g_param_spec_string ("name", "name", "name", NULL, G_PARAM_READABLE | G_PARAM_STATIC_STRINGS)); signals[RECV] = g_signal_new ("recv", COCKPIT_TYPE_TRANSPORT, G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (CockpitTransportClass, recv), g_signal_accumulator_true_handled, NULL, g_cclosure_marshal_generic, G_TYPE_BOOLEAN, 2, G_TYPE_STRING, G_TYPE_BYTES); signals[CONTROL] = g_signal_new ("control", COCKPIT_TYPE_TRANSPORT, G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (CockpitTransportClass, control), g_signal_accumulator_true_handled, NULL, g_cclosure_marshal_generic, G_TYPE_BOOLEAN, 4, G_TYPE_STRING, G_TYPE_STRING, JSON_TYPE_OBJECT, G_TYPE_BYTES); signals[CLOSED] = g_signal_new ("closed", COCKPIT_TYPE_TRANSPORT, G_SIGNAL_RUN_FIRST, G_STRUCT_OFFSET (CockpitTransportClass, closed), NULL, NULL, g_cclosure_marshal_generic, G_TYPE_NONE, 1, G_TYPE_STRING); g_type_class_add_private (klass, sizeof (CockpitTransportPrivate)); } void cockpit_transport_send (CockpitTransport *transport, const gchar *channel, GBytes *data) { CockpitTransportClass *klass; g_return_if_fail (COCKPIT_IS_TRANSPORT (transport)); klass = COCKPIT_TRANSPORT_GET_CLASS (transport); g_return_if_fail (klass && klass->send); klass->send (transport, channel, data); } void cockpit_transport_close (CockpitTransport *transport, const gchar *problem) { CockpitTransportClass *klass; g_return_if_fail (COCKPIT_IS_TRANSPORT (transport)); klass = COCKPIT_TRANSPORT_GET_CLASS (transport); g_return_if_fail (klass && klass->close); klass->close (transport, problem); } void cockpit_transport_emit_recv (CockpitTransport *transport, const gchar *channel, GBytes *data) { gboolean result = FALSE; g_return_if_fail (COCKPIT_IS_TRANSPORT (transport)); if (maybe_freeze_message (transport, channel, NULL, data)) return; g_signal_emit (transport, signals[RECV], 0, channel, data, &result); if (!result) g_debug ("no handler for received message in channel %s", channel); } void cockpit_transport_emit_control (CockpitTransport *transport, const gchar *command, const gchar *channel, JsonObject *options, GBytes *data) { gboolean result = FALSE; g_return_if_fail (COCKPIT_IS_TRANSPORT (transport)); if (maybe_freeze_message (transport, channel, options, data)) return; g_signal_emit (transport, signals[CONTROL], 0, command, channel, options, data, &result); if (!result) g_debug ("received unknown control command: %s", command); } void cockpit_transport_emit_closed (CockpitTransport *transport, const gchar *problem) { g_return_if_fail (COCKPIT_IS_TRANSPORT (transport)); g_signal_emit (transport, signals[CLOSED], 0, problem); } void cockpit_transport_freeze (CockpitTransport *self, const gchar *channel) { g_return_if_fail (COCKPIT_IS_TRANSPORT (self)); g_return_if_fail (channel != NULL); if (!self->priv->freeze) self->priv->freeze = g_hash_table_new_full (g_str_hash, g_str_equal, g_free, NULL); g_hash_table_add (self->priv->freeze, g_strdup (channel)); } void cockpit_transport_thaw (CockpitTransport *self, const gchar *channel) { FrozenMessage *frozen; const gchar *command; gchar *stolen = NULL; GList *l, *flush; g_return_if_fail (COCKPIT_IS_TRANSPORT (self)); g_return_if_fail (channel != NULL); if (self->priv->freeze) stolen = g_hash_table_lookup (self->priv->freeze, channel); if (stolen) g_hash_table_steal (self->priv->freeze, channel); for (l = self->priv->frozen ? self->priv->frozen->head : NULL; l != NULL; ) { frozen = l->data; flush = (stolen == frozen->channel) ? l : NULL; l = g_list_next (l); if (flush) { if (frozen->control) { command = NULL; cockpit_json_get_string (frozen->control, "command", NULL, &command); cockpit_transport_emit_control (self, command, stolen, frozen->control, frozen->data); } else { cockpit_transport_emit_recv (self, stolen, frozen->data); } g_queue_delete_link (self->priv->frozen, flush); frozen_message_free (frozen); } } g_free (stolen); } static GBytes * parse_frame (GBytes *message, gboolean expect, gchar **channel) { const gchar *data; gsize length; const gchar *line; gsize channel_len; g_return_val_if_fail (message != NULL, NULL); data = g_bytes_get_data (message, &length); line = memchr (data, '\n', length); if (!line) { if (expect) g_message ("received invalid message without channel prefix"); return NULL; } channel_len = line - data; if (memchr (data, '\0', channel_len) != NULL) { if (expect) g_message ("received massage with invalid channel prefix"); return NULL; } if (channel_len) *channel = g_strndup (data, channel_len); else *channel = NULL; channel_len++; return g_bytes_new_from_bytes (message, channel_len, length - channel_len); } /** * cockpit_transport_parse_frame: * @message: message to parse * @channel: location to return the channel * * Parse a message into a channel and payload. * @channel will be set to NULL if a control channel * message. @channel must be freed. * * Will return NULL if invalid message. * * Returns: (transfer full): the payload or NULL. */ GBytes * cockpit_transport_parse_frame (GBytes *message, gchar **channel) { return parse_frame (message, TRUE, channel); } GBytes * cockpit_transport_maybe_frame (GBytes *message, gchar **channel) { return parse_frame (message, FALSE, channel); } /** * cockpit_transport_parse_command: * @payload: command JSON payload to parse * @command: a location to return the command * @channel: location to return the channel * @options: location to return the options * * Parse a command and return various values from the * command. The @options value is transfered with ownership, * so you should free it after done. @command and @channel are owned by * @options. @channel will be NULL for a missing channel. * * On failure, message has already been printed. * * Returns: whether command parsed or not. */ gboolean cockpit_transport_parse_command (GBytes *payload, const gchar **command, const gchar **channel, JsonObject **options) { GError *error = NULL; gboolean ret = FALSE; JsonObject *object; gboolean valid; object = cockpit_json_parse_bytes (payload, &error); if (!object) { g_warning ("Received unparseable control message: %s", error->message); g_error_free (error); goto out; } /* Parse out the command */ if (command) { if (!cockpit_json_get_string (object, "command", NULL, command) || *command == NULL || g_str_equal (*command, "")) { g_warning ("Received invalid control message: invalid or missing command"); goto out; } } /* Parse out the channel */ if (channel) { valid = cockpit_json_get_string (object, "channel", NULL, channel); if (valid && *channel) { valid = (!g_str_equal ("", *channel) && strcspn (*channel, "\n") == strlen (*channel)); } if (!valid) { g_warning ("Received invalid control message: invalid channel"); goto out; } } *options = json_object_ref (object); ret = TRUE; out: if (object) json_object_unref (object); return ret; } static JsonObject * build_json_va (const gchar *name, va_list va) { JsonObject *object; const gchar *value; object = json_object_new (); while (name) { value = va_arg (va, const gchar *); if (value) json_object_set_string_member (object, name, value); name = va_arg (va, const gchar *); } return object; } JsonObject * cockpit_transport_build_json (const gchar *name, ...) { JsonObject *object; va_list va; va_start (va, name); object = build_json_va (name, va); va_end (va); return object; } GBytes * cockpit_transport_build_control (const gchar *name, ...) { JsonObject *object; GBytes *message; va_list va; va_start (va, name); object = build_json_va (name, va); va_end (va); message = cockpit_json_write_bytes (object); json_object_unref (object); return message; }