/* * This file is part of Cockpit. * * Copyright (C) 2015 Red Hat, Inc. * * Cockpit is free software; you can redistribute it and/or modify it * under the terms of the GNU Lesser General Public License as published by * the Free Software Foundation; either version 2.1 of the License, or * (at your option) any later version. * * Cockpit is distributed in the hope that it will be useful, but * WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU * Lesser General Public License for more details. * * You should have received a copy of the GNU Lesser General Public License * along with Cockpit; If not, see . */ #include "config.h" #include "cockpitstream.h" #include "common/cockpitjson.h" #include #include /** * CockpitStream: * * A stream with queued input and output based on top of a GIOStream */ enum { PROP_0, PROP_NAME, PROP_IO_STREAM, PROP_PROBLEM }; struct _CockpitStreamPrivate { gchar *name; GMainContext *context; gboolean closed; gboolean closing; CockpitConnectable *connecting; gchar *problem; GIOStream *io; GSource *out_source; GQueue *out_queue; gsize out_partial; gboolean out_closed; GSource *in_source; GByteArray *in_buffer; gboolean received; gulong sig_accept_cert; }; static guint cockpit_stream_sig_open; static guint cockpit_stream_sig_read; static guint cockpit_stream_sig_close; static guint cockpit_stream_sig_rejected_cert; static void cockpit_close_later (CockpitStream *self); G_DEFINE_TYPE (CockpitStream, cockpit_stream, G_TYPE_OBJECT); static void cockpit_stream_init (CockpitStream *self) { self->priv = G_TYPE_INSTANCE_GET_PRIVATE (self, COCKPIT_TYPE_STREAM, CockpitStreamPrivate); self->priv->in_buffer = g_byte_array_new (); self->priv->out_queue = g_queue_new (); self->priv->context = g_main_context_ref_thread_default (); } static void stop_output (CockpitStream *self) { g_assert (self->priv->out_source != NULL); g_source_destroy (self->priv->out_source); g_source_unref (self->priv->out_source); self->priv->out_source = NULL; } static void stop_input (CockpitStream *self) { g_assert (self->priv->in_source != NULL); g_source_destroy (self->priv->in_source); g_source_unref (self->priv->in_source); self->priv->in_source = NULL; } static void close_immediately (CockpitStream *self, const gchar *problem) { GError *error = NULL; GIOStream *io; if (self->priv->closed) return; if (problem) { g_free (self->priv->problem); self->priv->problem = g_strdup (problem); } if (self->priv->connecting) { cockpit_connectable_unref (self->priv->connecting); self->priv->connecting = NULL; } self->priv->closed = TRUE; g_debug ("%s: closing stream%s%s", self->priv->name, self->priv->problem ? ": " : "", self->priv->problem ? self->priv->problem : ""); if (self->priv->in_source) stop_input (self); if (self->priv->out_source) stop_output (self); if (self->priv->io) { io = self->priv->io; self->priv->io = NULL; if (self->priv->sig_accept_cert) { g_signal_handler_disconnect (io, self->priv->sig_accept_cert); self->priv->sig_accept_cert = 0; } g_io_stream_close (io, NULL, &error); if (error) { g_message ("%s: close failed: %s", self->priv->name, error->message); g_clear_error (&error); } g_object_unref (io); } g_debug ("%s: closed", self->priv->name); g_signal_emit (self, cockpit_stream_sig_close, 0, self->priv->problem); } static void close_maybe (CockpitStream *self) { if (!self->priv->closed) { if (!self->priv->in_source && !self->priv->out_source) { g_debug ("%s: input and output done", self->priv->name); close_immediately (self, NULL); } } } static void on_output_closed (GObject *object, GAsyncResult *result, gpointer user_data) { CockpitStream *self = COCKPIT_STREAM (user_data); GError *error = NULL; g_output_stream_close_finish (G_OUTPUT_STREAM (object), result, &error); if (error) { g_warning ("%s: couldn't close output stream: %s", self->priv->name, error->message); close_immediately (self, "internal-error"); } close_maybe (self); g_object_unref (self); } static void close_output (CockpitStream *self) { if (self->priv->out_closed) return; g_debug ("%s: end of output", self->priv->name); self->priv->out_closed = TRUE; if (!self->priv->io) { close_maybe (self); return; } g_output_stream_close_async (g_io_stream_get_output_stream (self->priv->io), G_PRIORITY_DEFAULT, NULL, on_output_closed, g_object_ref (self)); } #if !GLIB_CHECK_VERSION(2,43,2) #define G_IO_ERROR_CONNECTION_CLOSED G_IO_ERROR_BROKEN_PIPE #endif static gchar * describe_certificate_errors (GTlsCertificateFlags flags) { GString *str; if (flags == 0) return NULL; str = g_string_new (""); if (flags & G_TLS_CERTIFICATE_UNKNOWN_CA) { g_string_append (str, "untrusted-issuer "); flags &= ~G_TLS_CERTIFICATE_UNKNOWN_CA; } if (flags & G_TLS_CERTIFICATE_BAD_IDENTITY) { g_string_append (str, "bad-server-identity "); flags &= ~G_TLS_CERTIFICATE_BAD_IDENTITY; } if (flags & G_TLS_CERTIFICATE_NOT_ACTIVATED) { g_string_append (str, "not-yet-valid "); flags &= ~G_TLS_CERTIFICATE_NOT_ACTIVATED; } if (flags & G_TLS_CERTIFICATE_EXPIRED) { g_string_append (str, "expired "); flags &= ~G_TLS_CERTIFICATE_EXPIRED; } if (flags & G_TLS_CERTIFICATE_REVOKED) { g_string_append (str, "revoked "); flags &= ~G_TLS_CERTIFICATE_REVOKED; } if (flags & G_TLS_CERTIFICATE_INSECURE) { g_string_append (str, "insecure "); flags &= ~G_TLS_CERTIFICATE_INSECURE; } if (flags & G_TLS_CERTIFICATE_GENERIC_ERROR) { g_string_append (str, "generic-error "); flags &= ~G_TLS_CERTIFICATE_GENERIC_ERROR; } if (flags != 0) { g_string_append (str, "..."); } return g_string_free (str, FALSE); } static gboolean on_rejected_certificate (GTlsConnection *conn, GTlsCertificate *peer_cert, GTlsCertificateFlags errors, gpointer user_data) { CockpitStream *self = (CockpitStream *)user_data; gchar *details = describe_certificate_errors (errors); gchar *pem_data = NULL; g_return_val_if_fail (peer_cert != NULL, FALSE); g_message ("%s: Unacceptable TLS certificate: %s", self->priv->name, details); g_object_get (peer_cert, "certificate-pem", &pem_data, NULL); g_signal_emit (self, cockpit_stream_sig_rejected_cert, 0, pem_data); g_free (details); g_free (pem_data); return FALSE; } const gchar * cockpit_stream_problem (GError *error, const gchar *name, const gchar *summary, JsonObject *options) { const gchar *problem = NULL; gchar *message; if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_PERMISSION_DENIED)) problem = "access-denied"; else if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_NOT_FOUND) || g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CONNECTION_REFUSED) || g_error_matches (error, G_IO_ERROR, G_IO_ERROR_HOST_UNREACHABLE) || g_error_matches (error, G_IO_ERROR, G_IO_ERROR_NETWORK_UNREACHABLE) || g_error_matches (error, G_IO_ERROR, G_IO_ERROR_HOST_NOT_FOUND) || g_error_matches (error, G_RESOLVER_ERROR, G_RESOLVER_ERROR_NOT_FOUND)) problem = "not-found"; else if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_BROKEN_PIPE) || g_error_matches (error, G_IO_ERROR, G_IO_ERROR_CONNECTION_CLOSED) || g_error_matches (error, G_TLS_ERROR, G_TLS_ERROR_EOF)) problem = "disconnected"; #if !GLIB_CHECK_VERSION(2,43,2) else if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_FAILED) && strstr (error->message, g_strerror (ECONNRESET))) problem = "disconnected"; #endif else if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_TIMED_OUT)) problem = "timeout"; else if (g_error_matches (error, G_TLS_ERROR, G_TLS_ERROR_NOT_TLS) || g_error_matches (error, G_TLS_ERROR, G_TLS_ERROR_MISC)) problem = "protocol-error"; else if (g_error_matches (error, G_TLS_ERROR, G_TLS_ERROR_BAD_CERTIFICATE)) { problem = "unknown-hostkey"; } if (problem) { message = g_strdup_printf ("%s: %s: %s", name, summary, error->message); } else { message = g_strdup_printf ("%s: %s: %s", name, summary, error->message); problem = "internal-error"; } if (options) { if (!json_object_has_member (options, "message")) json_object_set_string_member (options, "message", message); } g_message ("%s", message); g_free (message); return problem; } static void set_problem_from_error (CockpitStream *self, const gchar *summary, GError *error) { const gchar *problem; if (g_error_matches (error, G_TLS_ERROR, G_TLS_ERROR_MISC)) { g_message ("%s: %s: %s", self->priv->name, summary, error->message); if (self->priv->received) problem = "disconnected"; else problem = "protocol-error"; } else { problem = cockpit_stream_problem (error, self->priv->name, summary, NULL); } g_free (self->priv->problem); self->priv->problem = g_strdup (problem); } static gboolean dispatch_input (GPollableInputStream *is, gpointer user_data) { CockpitStream *self = (CockpitStream *)user_data; GError *error = NULL; gboolean read = FALSE; gssize ret = 0; gsize len; gboolean eof; for (;;) { g_return_val_if_fail (self->priv->in_source, FALSE); len = self->priv->in_buffer->len; g_byte_array_set_size (self->priv->in_buffer, len + 1024); ret = g_pollable_input_stream_read_nonblocking (is, self->priv->in_buffer->data + len, 1024, NULL, &error); if (ret < 0) { g_byte_array_set_size (self->priv->in_buffer, len); if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) { g_error_free (error); break; } else { set_problem_from_error (self, "couldn't read", error); g_error_free (error); close_immediately (self, NULL); return FALSE; } } g_byte_array_set_size (self->priv->in_buffer, len + ret); if (ret == 0) { g_debug ("%s: end of input", self->priv->name); stop_input (self); break; } else if (ret > 0) { g_debug ("%s: read %d bytes", self->priv->name, (int)ret); self->priv->received = TRUE; read = TRUE; } } g_object_ref (self); eof = (self->priv->in_source == NULL); if (eof || read) g_signal_emit (self, cockpit_stream_sig_read, 0, self->priv->in_buffer, eof); if (eof) close_maybe (self); g_object_unref (self); return TRUE; } static gboolean dispatch_output (GPollableOutputStream *os, gpointer user_data) { CockpitStream *self = (CockpitStream *)user_data; GError *error = NULL; const gint8 *data; gsize len; gssize ret; g_return_val_if_fail (self->priv->out_source, FALSE); while (self->priv->out_queue->head) { data = g_bytes_get_data (self->priv->out_queue->head->data, &len); g_assert (self->priv->out_partial <= len); ret = g_pollable_output_stream_write_nonblocking (os, data + self->priv->out_partial, len - self->priv->out_partial, NULL, &error); if (ret < 0) { if (g_error_matches (error, G_IO_ERROR, G_IO_ERROR_WOULD_BLOCK)) { g_debug ("%s: output would block", self->priv->name); g_error_free (error); return TRUE; } else { set_problem_from_error (self, "couldn't write", error); g_error_free (error); close_immediately (self, NULL); return FALSE; } } self->priv->out_partial += ret; if (self->priv->out_partial >= len) { g_debug ("%s: wrote %d bytes", self->priv->name, (int)len); g_bytes_unref (g_queue_pop_head (self->priv->out_queue)); self->priv->out_partial = 0; } else { if (ret > 0) g_debug ("%s: partial write %d of %d bytes", self->priv->name, (int)ret, (int)len); return TRUE; } } g_debug ("%s: output queue empty", self->priv->name); /* If all messages are done, then stop polling out fd */ stop_output (self); if (self->priv->closing) close_output (self); else close_maybe (self); return TRUE; } static void start_output (CockpitStream *self) { GOutputStream *os; g_assert (self->priv->out_source == NULL); if (self->priv->connecting || self->priv->out_closed || self->priv->closed) return; g_assert (self->priv->io); os = g_io_stream_get_output_stream (self->priv->io); self->priv->out_source = g_pollable_output_stream_create_source (G_POLLABLE_OUTPUT_STREAM (os), NULL); g_source_set_name (self->priv->out_source, "stream-output"); g_source_set_callback (self->priv->out_source, (GSourceFunc)dispatch_output, self, NULL); g_source_attach (self->priv->out_source, self->priv->context); } static void initialize_io (CockpitStream *self) { GInputStream *is; GOutputStream *os; g_return_if_fail (self->priv->in_source == NULL); is = g_io_stream_get_input_stream (self->priv->io); os = g_io_stream_get_output_stream (self->priv->io); if (!G_IS_POLLABLE_INPUT_STREAM (is) || !g_pollable_input_stream_can_poll (G_POLLABLE_INPUT_STREAM (is)) || !G_IS_POLLABLE_OUTPUT_STREAM (os) || !g_pollable_output_stream_can_poll (G_POLLABLE_OUTPUT_STREAM (os))) { g_warning ("%s: stream is not pollable", self->priv->name); close_immediately (self, "internal-error"); return; } if (self->priv->connecting) { cockpit_connectable_unref (self->priv->connecting); self->priv->connecting = NULL; } self->priv->in_source = g_pollable_input_stream_create_source (G_POLLABLE_INPUT_STREAM (is), NULL); g_source_set_name (self->priv->in_source, "stream-input"); g_source_set_callback (self->priv->in_source, (GSourceFunc)dispatch_input, self, NULL); g_source_attach (self->priv->in_source, self->priv->context); if (G_IS_TLS_CONNECTION (self->priv->io)) { self->priv->sig_accept_cert = g_signal_connect (G_TLS_CONNECTION (self->priv->io), "accept-certificate", G_CALLBACK (on_rejected_certificate), self); } else { self->priv->sig_accept_cert = 0; } start_output (self); g_signal_emit (self, cockpit_stream_sig_open, 0); } static void cockpit_stream_constructed (GObject *object) { CockpitStream *self = COCKPIT_STREAM (object); G_OBJECT_CLASS (cockpit_stream_parent_class)->constructed (object); if (self->priv->io) initialize_io (self); } static void cockpit_stream_set_property (GObject *obj, guint prop_id, const GValue *value, GParamSpec *pspec) { CockpitStream *self = COCKPIT_STREAM (obj); switch (prop_id) { case PROP_NAME: self->priv->name = g_value_dup_string (value); break; case PROP_IO_STREAM: self->priv->io = g_value_dup_object (value); break; case PROP_PROBLEM: self->priv->problem = g_value_dup_string (value); if (self->priv->problem) cockpit_close_later (self); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (obj, prop_id, pspec); break; } } static void cockpit_stream_get_property (GObject *obj, guint prop_id, GValue *value, GParamSpec *pspec) { CockpitStream *self = COCKPIT_STREAM (obj); switch (prop_id) { case PROP_NAME: g_value_set_string (value, self->priv->name); break; case PROP_IO_STREAM: g_value_set_object (value, self->priv->io); break; case PROP_PROBLEM: g_value_set_string (value, self->priv->problem); break; default: G_OBJECT_WARN_INVALID_PROPERTY_ID (obj, prop_id, pspec); break; } } static void cockpit_stream_dispose (GObject *object) { CockpitStream *self = COCKPIT_STREAM (object); if (!self->priv->closed) close_immediately (self, "terminated"); while (self->priv->out_queue->head) g_bytes_unref (g_queue_pop_head (self->priv->out_queue)); G_OBJECT_CLASS (cockpit_stream_parent_class)->dispose (object); } static void cockpit_stream_finalize (GObject *object) { CockpitStream *self = COCKPIT_STREAM (object); g_assert (self->priv->closed); g_assert (!self->priv->in_source); g_assert (!self->priv->out_source); g_byte_array_unref (self->priv->in_buffer); g_queue_free (self->priv->out_queue); g_free (self->priv->problem); g_free (self->priv->name); if (self->priv->context) g_main_context_unref (self->priv->context); G_OBJECT_CLASS (cockpit_stream_parent_class)->finalize (object); } static void cockpit_stream_class_init (CockpitStreamClass *klass) { GObjectClass *gobject_class = G_OBJECT_CLASS (klass); gobject_class->constructed = cockpit_stream_constructed; gobject_class->get_property = cockpit_stream_get_property; gobject_class->set_property = cockpit_stream_set_property; gobject_class->dispose = cockpit_stream_dispose; gobject_class->finalize = cockpit_stream_finalize; /** * CockpitStream:io-stream: * * The underlying io stream. The input and output streams should * be pollable. */ g_object_class_install_property (gobject_class, PROP_IO_STREAM, g_param_spec_object ("io-stream", "io-stream", "io-stream", G_TYPE_IO_STREAM, G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY | G_PARAM_STATIC_STRINGS)); /** * CockpitStream:name: * * Pipe name used for debugging purposes. */ g_object_class_install_property (gobject_class, PROP_NAME, g_param_spec_string ("name", "name", "name", "", G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY | G_PARAM_STATIC_STRINGS)); /** * CockpitStream:problem: * * The problem that the pipe closed with. If used as a constructor argument then * the pipe will be created in a closed/failed state. Although 'closed' signal will * only fire once main loop is hit. */ g_object_class_install_property (gobject_class, PROP_PROBLEM, g_param_spec_string ("problem", "problem", "problem", NULL, G_PARAM_READWRITE | G_PARAM_CONSTRUCT_ONLY | G_PARAM_STATIC_STRINGS)); /** * CockpitStream::open: * * Emitted when actually open and connected. */ cockpit_stream_sig_open = g_signal_new ("open", COCKPIT_TYPE_STREAM, G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (CockpitStreamClass, open), NULL, NULL, NULL, G_TYPE_NONE, 0); /** * CockpitStream::read: * @buffer: a GByteArray of the read data * @eof: whether the pipe is done reading * * Emitted when data is read from the input file descriptor of the * pipe. * * Data consumed from @buffer by the handler should be removed from * the GByteArray. This can be done with the cockpit_stream_consume() * function. * * This handler will only be called once with @eof set to TRUE. But * in error conditions it may not be called with @eof set to TRUE * at all, and the CockpitStream::close signal will simply fire. */ cockpit_stream_sig_read = g_signal_new ("read", COCKPIT_TYPE_STREAM, G_SIGNAL_RUN_LAST, G_STRUCT_OFFSET (CockpitStreamClass, read), NULL, NULL, NULL, G_TYPE_NONE, 2, G_TYPE_BYTE_ARRAY, G_TYPE_BOOLEAN); /** * CockpitStream::close: * @problem: problem string or %NULL * * Emitted when the pipe closes, whether due to a problem or a normal * shutdown. * * @problem will be NULL if the pipe closed normally. */ cockpit_stream_sig_close = g_signal_new ("close", COCKPIT_TYPE_STREAM, G_SIGNAL_RUN_FIRST, G_STRUCT_OFFSET (CockpitStreamClass, close), NULL, NULL, NULL, G_TYPE_NONE, 1, G_TYPE_STRING); /** * CockpitStream::rejected-cert: * @pem: PEM data as a string * * Emitted when the pipe will close because a certificate is rejected */ cockpit_stream_sig_rejected_cert = g_signal_new ("rejected-cert", COCKPIT_TYPE_STREAM, G_SIGNAL_RUN_FIRST, G_STRUCT_OFFSET (CockpitStreamClass, close), NULL, NULL, NULL, G_TYPE_NONE, 1, G_TYPE_STRING); g_type_class_add_private (klass, sizeof (CockpitStreamPrivate)); } /** * cockpit_stream_write: * @self: the pipe * @data: the data to write * * Write @data to the pipe. This is not done immediately, it's * queued and written when the pipe is ready. * * If you cockpit_stream_close() with a @problem, then queued data * will be discarded. * * Calling this function on a closed or closing pipe (one on which * cockpit_stream_close() has been called) is invalid. * * Zero length data blocks are ignored, it doesn't makes sense to * write zero bytes to a pipe. */ void cockpit_stream_write (CockpitStream *self, GBytes *data) { g_return_if_fail (COCKPIT_IS_STREAM (self)); g_return_if_fail (!self->priv->closing); g_return_if_fail (!self->priv->closed); if (g_bytes_get_size (data) == 0) { g_debug ("%s: ignoring zero byte data block", self->priv->name); return; } g_queue_push_tail (self->priv->out_queue, g_bytes_ref (data)); if (!self->priv->out_source && !self->priv->out_closed) { start_output (self); } /* * If this becomes thread-safe, then something like this is needed: * g_main_context_wakeup (g_source_get_context (self->priv->source)); */ } /** * cockpit_stream_close: * @self: a pipe * @problem: a problem or NULL * * Close the pipe. If @problem is non NULL, then it's treated * as if an error occurred, and the pipe is closed immediately. * Otherwise the pipe output is closed when all data has been sent. * * The 'close' signal will be fired when the pipe actually closes. * This may be during this function call (esp. in the case of a * non-NULL @problem) or later. */ void cockpit_stream_close (CockpitStream *self, const gchar *problem) { g_return_if_fail (COCKPIT_IS_STREAM (self)); self->priv->closing = TRUE; if (problem) close_immediately (self, problem); else if (g_queue_is_empty (self->priv->out_queue)) close_output (self); } static gboolean on_later_close (gpointer user_data) { close_immediately (user_data, NULL); /* problem already set */ return FALSE; } static void cockpit_close_later (CockpitStream *self) { GSource *source = g_idle_source_new (); g_source_set_priority (source, G_PRIORITY_HIGH); g_source_set_callback (source, on_later_close, g_object_ref (self), g_object_unref); g_source_attach (source, g_main_context_get_thread_default ()); g_source_unref (source); } static void on_connect_stream (GObject *object, GAsyncResult *result, gpointer user_data) { CockpitStream *self = COCKPIT_STREAM (user_data); GError *error = NULL; GIOStream *io; io = cockpit_connect_stream_finish (result, &error); if (error) { set_problem_from_error (self, "couldn't connect", error); close_immediately (self, NULL); g_error_free (error); } else if (!self->priv->closed) { self->priv->io = g_object_ref (io); initialize_io (self); } g_clear_object (&io); g_object_unref (self); } CockpitStream * cockpit_stream_connect (const gchar *name, CockpitConnectable *connectable) { CockpitStream *self; g_return_val_if_fail (connectable != NULL, NULL); self = g_object_new (COCKPIT_TYPE_STREAM, "io-stream", NULL, "name", name, NULL); self->priv->connecting = cockpit_connectable_ref (connectable); cockpit_connect_stream_full (self->priv->connecting, NULL, on_connect_stream, g_object_ref (self)); return self; } /** * cockpit_stream_get_name: * @self: a pipe * * Get the name of the pipe. * * This is used for logging. * * Returns: (transfer none): the name */ const gchar * cockpit_stream_get_name (CockpitStream *self) { g_return_val_if_fail (COCKPIT_IS_STREAM (self), NULL); return self->priv->name; } /** * cockpit_stream_get_buffer: * @self: a pipe * * Get the input buffer for the pipe. * * This can change when the main loop is run. You can use * cockpit_pipe_consume() to consume data from it. * * Returns: (transfer none): the buffer */ GByteArray * cockpit_stream_get_buffer (CockpitStream *self) { g_return_val_if_fail (COCKPIT_IS_STREAM (self), NULL); return self->priv->in_buffer; } /** * cockpit_stream_new: * @name: a name for debugging * @io_stream: A stream to wrap * * Create a stream for the given io stream * * Returns: (transfer full): a new CockpitStream */ CockpitStream * cockpit_stream_new (const gchar *name, GIOStream *io_stream) { return g_object_new (COCKPIT_TYPE_STREAM, "name", name, "io-stream", io_stream, NULL); }