/*
* This file is part of Cockpit.
*
* Copyright (C) 2014 Red Hat, Inc.
*
* Cockpit is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation; either version 2.1 of the License, or
* (at your option) any later version.
*
* Cockpit is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with Cockpit; If not, see .
*/
#include "config.h"
#include "cockpitpipechannel.h"
#include "common/cockpitpipe.h"
#include "common/cockpitjson.h"
#include "common/cockpitunicode.h"
#include "common/cockpitunixsignal.h"
#include
#include
#include
#include
#include
/**
* CockpitPipeChannel:
*
* A #CockpitChannel that sends messages from a regular socket
* or file descriptor. Any data is read in whatever chunks it
* shows up in read().
*
* Only UTF8 text data is transmitted. Anything else is
* forced into UTF8 by replacing invalid characters.
*
* The payload type for this channel is 'stream'.
*/
#define COCKPIT_PIPE_CHANNEL(o) (G_TYPE_CHECK_INSTANCE_CAST ((o), COCKPIT_TYPE_PIPE_CHANNEL, CockpitPipeChannel))
typedef struct {
CockpitChannel parent;
CockpitPipe *pipe;
GSocket *sock;
gchar *name;
gboolean open;
gboolean closing;
guint sig_read;
guint sig_close;
gint64 batch;
gint64 latency;
guint timeout;
gboolean pty;
} CockpitPipeChannel;
typedef struct {
CockpitChannelClass parent_class;
} CockpitPipeChannelClass;
G_DEFINE_TYPE (CockpitPipeChannel, cockpit_pipe_channel, COCKPIT_TYPE_CHANNEL);
GHashTable *internal_fds;
static gboolean
steal_internal_fd (const gchar *name,
gint *fdp)
{
gpointer key;
gpointer value;
if (!internal_fds)
return FALSE;
if (!g_hash_table_lookup_extended (internal_fds, name, &key, &value))
return FALSE;
g_hash_table_steal (internal_fds, key);
g_free (key);
*fdp = GPOINTER_TO_INT (value);
return TRUE;
}
static void
cockpit_pipe_channel_recv (CockpitChannel *channel,
GBytes *message)
{
CockpitPipeChannel *self = COCKPIT_PIPE_CHANNEL (channel);
if (self->open)
cockpit_pipe_write (self->pipe, message);
}
static void
process_pipe_buffer (CockpitPipeChannel *self,
GByteArray *data)
{
CockpitChannel *channel = (CockpitChannel *)self;
GBytes *message;
if (!data && self->pipe)
data = cockpit_pipe_get_buffer (self->pipe);
if (!data)
return;
if (self->timeout)
{
g_source_remove (self->timeout);
self->timeout = 0;
}
if (data->len)
{
/* When array is reffed, this just clears byte array */
g_byte_array_ref (data);
message = g_byte_array_free_to_bytes (data);
cockpit_channel_send (channel, message, FALSE);
g_bytes_unref (message);
}
}
static gboolean
cockpit_pipe_channel_read_window_size_options (JsonObject *options,
gushort default_rows,
gushort default_cols,
gushort *rowsp,
gushort *colsp)
{
gint64 rows, cols;
JsonObject *window;
if (!cockpit_json_get_object (options, "window", NULL, &window))
return FALSE;
if (window == NULL)
{
*rowsp = default_rows;
*colsp = default_cols;
return TRUE;
}
if (cockpit_json_get_int (window, "rows", default_rows, &rows) &&
cockpit_json_get_int (window, "cols", default_cols, &cols))
{
*rowsp = (gushort) CLAMP (rows, 0, G_MAXUINT16);
*colsp = (gushort) CLAMP (cols, 0, G_MAXUINT16);
return TRUE;
}
return FALSE;
}
static gboolean
cockpit_pipe_channel_control (CockpitChannel *channel,
const gchar *command,
JsonObject *message)
{
CockpitPipeChannel *self = COCKPIT_PIPE_CHANNEL (channel);
gboolean ret = TRUE;
/* New set of options for channel */
if (g_str_equal (command, "options"))
{
if (!cockpit_json_get_int (message, "batch", self->batch, &self->batch))
{
cockpit_channel_fail (channel, "protocol-error",
"invalid \"batch\" option for stream channel");
goto out;
}
if (!cockpit_json_get_int (message, "latency", self->latency, &self->latency) ||
self->latency < 0 || self->latency >= G_MAXUINT)
{
cockpit_channel_fail (channel, "protocol-error",
"invalid \"latency\" option for stream channel");
goto out;
}
/* ignore size options if this channel is not a pty or we are in prepare() */
if (self->pty && self->pipe)
{
gushort rows, cols;
if (!cockpit_pipe_channel_read_window_size_options (message, 0, 0, &rows, &cols))
{
g_warning ("%s: invalid \"window.rows\" or \"window.cols\" option for stream channel", self->name);
goto out;
}
if (rows > 0 && cols > 0)
{
gint fd;
g_object_get (self->pipe, "in-fd", &fd, NULL);
if (fd >= 0)
{
struct winsize size = { rows, cols, 0, 0 };
if (ioctl (fd, TIOCSWINSZ, &size) < 0)
g_warning ("cannot set terminal size for stream channel: %s", g_strerror (errno));
}
}
}
process_pipe_buffer (self, NULL);
}
/* Channel input is done */
else if (g_str_equal (command, "done"))
{
self->closing = TRUE;
process_pipe_buffer (self, NULL);
/*
* If closed, call base class handler directly. Otherwise ask
* our pipe to close first, which will come back here.
*/
if (self->open)
cockpit_pipe_close (self->pipe, NULL);
}
else
{
ret = FALSE;
}
out:
return ret;
}
static void
cockpit_pipe_channel_close (CockpitChannel *channel,
const gchar *problem)
{
CockpitPipeChannel *self = COCKPIT_PIPE_CHANNEL (channel);
self->closing = TRUE;
process_pipe_buffer (self, NULL);
/*
* If closed, call base class handler directly. Otherwise ask
* our pipe to close first, which will come back here.
*/
if (self->open)
cockpit_pipe_close (self->pipe, problem);
else
COCKPIT_CHANNEL_CLASS (cockpit_pipe_channel_parent_class)->close (channel, problem);
}
static gboolean
on_batch_timeout (gpointer user_data)
{
CockpitPipeChannel *self = user_data;
self->timeout = 0;
process_pipe_buffer (self, NULL);
return FALSE;
}
static void
on_pipe_read (CockpitPipe *pipe,
GByteArray *data,
gboolean end_of_data,
gpointer user_data)
{
CockpitPipeChannel *self = user_data;
if (!end_of_data && self->batch > 0 && data->len < self->batch)
{
/* Delay the processing of this data */
if (!self->timeout)
self->timeout = g_timeout_add (self->latency, on_batch_timeout, self);
}
else
{
process_pipe_buffer (self, data);
}
/* Close the pipe when writing is done */
if (end_of_data && self->open)
{
g_debug ("%s: end of data, closing pipe", self->name);
cockpit_pipe_close (pipe, NULL);
}
}
static void
return_stderr_message (CockpitChannel *channel,
CockpitPipe *pipe)
{
JsonObject *options;
GByteArray *buffer;
GBytes *bytes;
GBytes *clean;
gchar *data;
gsize length;
buffer = cockpit_pipe_get_stderr (pipe);
if (!buffer)
return;
/* A little more complicated to avoid big copies */
g_byte_array_ref (buffer);
g_byte_array_append (buffer, (guint8 *)"x", 1); /* place holder for null terminate */
bytes = g_byte_array_free_to_bytes (buffer);
clean = cockpit_unicode_force_utf8 (bytes);
g_bytes_unref (bytes);
data = g_bytes_unref_to_data (clean, &length);
/* Fill in null terminate, for x above */
g_assert (length > 0);
data[length - 1] = '\0';
options = cockpit_channel_close_options (channel);
json_object_set_string_member (options, "message", data);
g_free (data);
}
static void
on_pipe_close (CockpitPipe *pipe,
const gchar *problem,
gpointer user_data)
{
CockpitPipeChannel *self = user_data;
CockpitChannel *channel = user_data;
JsonObject *options;
gint status;
gchar *signal;
process_pipe_buffer (self, NULL);
self->open = FALSE;
if (cockpit_pipe_get_pid (pipe, NULL))
{
options = cockpit_channel_close_options (channel);
status = cockpit_pipe_exit_status (pipe);
if (WIFEXITED (status))
{
json_object_set_int_member (options, "exit-status", WEXITSTATUS (status));
}
else if (WIFSIGNALED (status))
{
signal = cockpit_strsignal (WTERMSIG (status));
json_object_set_string_member (options, "exit-signal", signal);
g_free (signal);
}
else if (status)
{
json_object_set_int_member (options, "exit-status", -1);
}
}
return_stderr_message (channel, pipe);
/*
* In theory we should plumb done handling all the way through to CockpitPipe.
* But we can do that later in a compatible way.
*/
if (problem == NULL)
cockpit_channel_control (channel, "done", NULL);
cockpit_channel_close (channel, problem);
}
static void
cockpit_pipe_channel_init (CockpitPipeChannel *self)
{
/* Has no effect until batch is set */
self->latency = 75;
}
static gchar **
parse_environ (CockpitChannel *channel,
JsonObject *options,
const gchar *directory)
{
gchar **envset = NULL;
gchar **env;
if (!cockpit_json_get_strv (options, "environ", NULL, &envset))
{
cockpit_channel_fail (channel, "protocol-error", "invalid \"environ\" option for stream channel");
return NULL;
}
env = cockpit_pipe_get_environ ((const gchar **)envset, directory);
g_free (envset);
return env;
}
static void
cockpit_pipe_channel_prepare (CockpitChannel *channel)
{
CockpitPipeChannel *self = COCKPIT_PIPE_CHANNEL (channel);
GSocketAddress *address;
CockpitPipeFlags flags;
JsonObject *options;
gchar **argv = NULL;
gchar **env = NULL;
const gchar *internal = NULL;
const gchar *dir;
const gchar *error;
gint fd;
COCKPIT_CHANNEL_CLASS (cockpit_pipe_channel_parent_class)->prepare (channel);
options = cockpit_channel_get_options (channel);
if (!cockpit_json_get_strv (options, "spawn", NULL, &argv))
{
cockpit_channel_fail (channel, "protocol-error",
"invalid \"spawn\" option for stream channel");
goto out;
}
if (!cockpit_json_get_string (options, "internal", NULL, &internal))
{
cockpit_channel_fail (channel, "protocol-error",
"invalid \"internal\" option for stream channel");
goto out;
}
/* Support our options in the open message too */
cockpit_pipe_channel_control (channel, "options", options);
if (self->closing)
goto out;
if (argv)
{
if (!cockpit_json_get_string (options, "err", NULL, &error))
{
cockpit_channel_fail (channel, "protocol-error",
"invalid \"err\" options for stream channel");
goto out;
}
flags = COCKPIT_PIPE_FLAGS_NONE;
if (g_strcmp0 (error, "out") == 0)
flags = COCKPIT_PIPE_STDERR_TO_STDOUT;
else if (g_strcmp0 (error, "ignore") == 0)
flags = COCKPIT_PIPE_STDERR_TO_NULL;
else if (g_strcmp0 (error, "message") == 0)
flags = COCKPIT_PIPE_STDERR_TO_MEMORY;
self->name = g_strdup (argv[0]);
if (!cockpit_json_get_string (options, "directory", NULL, &dir))
{
cockpit_channel_fail (channel, "protocol-error",
"invalid \"directory\" option for stream channel");
goto out;
}
if (!cockpit_json_get_bool (options, "pty", FALSE, &self->pty))
{
cockpit_channel_fail (channel, "protocol-error",
"invalid \"pty\" option for stream channel");
goto out;
}
env = parse_environ (channel, options, dir);
if (!env)
goto out;
if (self->pty)
{
gushort rows, cols;
if (!cockpit_pipe_channel_read_window_size_options (options, 24, 80, &rows, &cols))
{
g_warning ("%s: invalid \"window.rows\" or \"window.cols\" option for stream channel", self->name);
goto out;
}
self->pipe = cockpit_pipe_pty ((const gchar **)argv, (const gchar **)env, dir, rows, cols);
}
else
{
self->pipe = cockpit_pipe_spawn ((const gchar **)argv, (const gchar **)env, dir, flags);
}
}
else if (internal && steal_internal_fd (internal, &fd))
{
self->pipe = cockpit_pipe_new_user_fd (internal, fd);
}
else
{
address = cockpit_channel_parse_address (channel, &self->name);
if (!address)
goto out;
self->pipe = cockpit_pipe_connect (self->name, address);
g_object_unref (address);
}
self->sig_read = g_signal_connect (self->pipe, "read", G_CALLBACK (on_pipe_read), self);
self->sig_close = g_signal_connect (self->pipe, "close", G_CALLBACK (on_pipe_close), self);
self->open = TRUE;
cockpit_channel_ready (channel, NULL);
out:
g_free (argv);
g_strfreev (env);
}
static void
cockpit_pipe_channel_dispose (GObject *object)
{
CockpitPipeChannel *self = COCKPIT_PIPE_CHANNEL (object);
if (self->pipe)
{
if (self->open)
cockpit_pipe_close (self->pipe, "terminated");
if (self->sig_read)
g_signal_handler_disconnect (self->pipe, self->sig_read);
if (self->sig_close)
g_signal_handler_disconnect (self->pipe, self->sig_close);
self->sig_read = self->sig_close = 0;
}
G_OBJECT_CLASS (cockpit_pipe_channel_parent_class)->dispose (object);
}
static void
cockpit_pipe_channel_finalize (GObject *object)
{
CockpitPipeChannel *self = COCKPIT_PIPE_CHANNEL (object);
g_clear_object (&self->sock);
g_clear_object (&self->pipe);
g_free (self->name);
G_OBJECT_CLASS (cockpit_pipe_channel_parent_class)->finalize (object);
}
static void
cockpit_pipe_channel_class_init (CockpitPipeChannelClass *klass)
{
GObjectClass *gobject_class = G_OBJECT_CLASS (klass);
CockpitChannelClass *channel_class = COCKPIT_CHANNEL_CLASS (klass);
gobject_class->dispose = cockpit_pipe_channel_dispose;
gobject_class->finalize = cockpit_pipe_channel_finalize;
channel_class->prepare = cockpit_pipe_channel_prepare;
channel_class->control = cockpit_pipe_channel_control;
channel_class->recv = cockpit_pipe_channel_recv;
channel_class->close = cockpit_pipe_channel_close;
}
/**
* cockpit_pipe_channel_open:
* @transport: the transport to send/receive messages on
* @channel_id: the channel id
* @unix_path: the UNIX socket path to communicate with
*
* This function is mainly used by tests. The usual way
* to get a #CockpitPipeChannel is via cockpit_channel_open()
*
* Returns: (transfer full): the new channel
*/
CockpitChannel *
cockpit_pipe_channel_open (CockpitTransport *transport,
const gchar *channel_id,
const gchar *unix_path)
{
CockpitChannel *channel;
JsonObject *options;
g_return_val_if_fail (channel_id != NULL, NULL);
options = json_object_new ();
json_object_set_string_member (options, "unix", unix_path);
json_object_set_string_member (options, "payload", "stream");
channel = g_object_new (COCKPIT_TYPE_PIPE_CHANNEL,
"transport", transport,
"id", channel_id,
"options", options,
NULL);
json_object_unref (options);
return channel;
}
static void
internal_fd_free (gpointer data)
{
gint fd = GPOINTER_TO_INT (data);
close (fd);
}
const gchar *
cockpit_pipe_channel_add_internal_fd (gint fd)
{
/* We are not multi-threaded. Also don't make this look like normal fd numbers */
static guint64 unique = 911111;
gboolean inserted;
gchar *id;
if (!internal_fds)
internal_fds = g_hash_table_new_full (g_str_hash, g_str_equal, g_free, internal_fd_free);
id = g_strdup_printf ("internal-stream-%" G_GUINT64_FORMAT, unique++);
inserted = g_hash_table_replace (internal_fds, id, GINT_TO_POINTER (fd));
g_assert (inserted);
return id;
}
gboolean
cockpit_pipe_channel_remove_internal_fd (const gchar *id)
{
if (internal_fds == NULL)
return FALSE;
if (!g_hash_table_remove (internal_fds, id))
return FALSE;
return TRUE;
}