/*
* This file is part of Cockpit.
*
* Copyright (C) 2014 Red Hat, Inc.
*
* Cockpit is free software; you can redistribute it and/or modify it
* under the terms of the GNU Lesser General Public License as published by
* the Free Software Foundation; either version 2.1 of the License, or
* (at your option) any later version.
*
* Cockpit is distributed in the hope that it will be useful, but
* WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License
* along with Cockpit; If not, see .
*/
#include "config.h"
#include "cockpitmetrics.h"
#include "common/cockpitjson.h"
#include
enum {
DERIVE_NONE = 0,
DERIVE_DELTA = 1,
DERIVE_RATE = 2,
};
typedef struct {
gint derive;
gboolean has_instances;
gint n_last_instances;
gint n_next_instances;
} MetricInfo;
struct _CockpitMetricsPrivate {
gboolean interpolate;
gboolean compress;
guint timeout;
gint64 next;
gint64 interval;
gint64 meta_interval;
gboolean meta_reset;
JsonObject *last_meta;
JsonObject *next_meta;
gint n_metrics;
MetricInfo *metric_info;
gint64 last_timestamp;
gint64 next_timestamp;
double **last_data;
double **next_data;
gboolean derived_valid;
double **derived;
JsonArray *message;
};
G_DEFINE_ABSTRACT_TYPE (CockpitMetrics, cockpit_metrics, COCKPIT_TYPE_CHANNEL);
static void
cockpit_metrics_init (CockpitMetrics *self)
{
self->priv = G_TYPE_INSTANCE_GET_PRIVATE (self, COCKPIT_TYPE_METRICS,
CockpitMetricsPrivate);
self->priv->interpolate = TRUE;
self->priv->compress = TRUE;
}
static void
cockpit_metrics_recv (CockpitChannel *channel,
GBytes *message)
{
cockpit_channel_fail (channel, "protocol-error", "received unexpected metrics1 payload");
}
static void
cockpit_metrics_close (CockpitChannel *channel,
const gchar *problem)
{
CockpitMetrics *self = COCKPIT_METRICS (channel);
if (self->priv->timeout)
{
g_source_remove (self->priv->timeout);
self->priv->timeout = 0;
}
COCKPIT_CHANNEL_CLASS (cockpit_metrics_parent_class)->close (channel, problem);
}
static void
cockpit_metrics_dispose (GObject *object)
{
CockpitMetrics *self = COCKPIT_METRICS (object);
if (self->priv->timeout)
{
g_source_remove (self->priv->timeout);
self->priv->timeout = 0;
}
if (self->priv->last_meta)
{
json_object_unref (self->priv->last_meta);
self->priv->last_meta = NULL;
}
if (self->priv->last_data)
{
g_free (self->priv->last_data[0]);
g_free (self->priv->last_data);
self->priv->last_data = NULL;
}
if (self->priv->next_meta)
{
json_object_unref (self->priv->next_meta);
self->priv->next_meta = NULL;
}
if (self->priv->next_data)
{
g_free (self->priv->next_data[0]);
g_free (self->priv->next_data);
self->priv->next_data = NULL;
}
if (self->priv->derived)
{
g_free (self->priv->derived[0]);
g_free (self->priv->derived);
self->priv->derived = NULL;
}
g_free (self->priv->metric_info);
self->priv->metric_info = NULL;
G_OBJECT_CLASS (cockpit_metrics_parent_class)->dispose (object);
}
static void
cockpit_metrics_class_init (CockpitMetricsClass *klass)
{
CockpitChannelClass *channel_class = COCKPIT_CHANNEL_CLASS (klass);
GObjectClass *object_class = G_OBJECT_CLASS (klass);
object_class->dispose = cockpit_metrics_dispose;
channel_class->recv = cockpit_metrics_recv;
channel_class->close = cockpit_metrics_close;
g_type_class_add_private (klass, sizeof (CockpitMetricsPrivate));
}
static gboolean
on_timeout_tick (gpointer data)
{
CockpitMetrics *self = data;
CockpitMetricsClass *klass;
gint64 next_interval;
if (self->priv->timeout > 0)
{
g_source_remove (self->priv->timeout);
self->priv->timeout = 0;
}
klass = COCKPIT_METRICS_GET_CLASS (self);
if (klass->tick)
(klass->tick) (self, self->priv->next);
self->priv->next += self->priv->interval;
next_interval = self->priv->next - g_get_monotonic_time() / 1000;
if (next_interval < 0)
next_interval = 0;
if (next_interval <= G_MAXUINT)
self->priv->timeout = g_timeout_add (next_interval, on_timeout_tick, self);
else if (next_interval / 1000 <= G_MAXUINT)
self->priv->timeout = g_timeout_add_seconds (next_interval / 1000, on_timeout_tick, self);
else
{
cockpit_channel_fail (COCKPIT_CHANNEL (self), "internal-error",
"invalid metric timeout tick offset");
}
return FALSE;
}
void
cockpit_metrics_metronome (CockpitMetrics *self,
gint64 interval)
{
g_return_if_fail (self->priv->timeout == 0);
g_return_if_fail (interval > 0);
self->priv->next = g_get_monotonic_time() / 1000;
self->priv->interval = interval;
on_timeout_tick (self);
}
static void
realloc_next_buffer (CockpitMetrics *self)
{
int total_next_instances = 0;
for (int i = 0; i < self->priv->n_metrics; i++)
total_next_instances += self->priv->metric_info[i].n_next_instances;
g_free (self->priv->next_data[0]);
self->priv->next_data[0] = g_new (double, total_next_instances);
for (int i = 1; i < self->priv->n_metrics; i++)
self->priv->next_data[i] = self->priv->next_data[i-1] + self->priv->metric_info[i-1].n_next_instances;
}
static void
realloc_derived_buffer (CockpitMetrics *self)
{
int total_next_instances = 0;
for (int i = 0; i < self->priv->n_metrics; i++)
total_next_instances += self->priv->metric_info[i].n_next_instances;
g_free (self->priv->derived[0]);
self->priv->derived[0] = g_new (double, total_next_instances);
for (int i = 1; i < self->priv->n_metrics; i++)
self->priv->derived[i] = self->priv->derived[i-1] + self->priv->metric_info[i-1].n_next_instances;
self->priv->derived_valid = FALSE;
}
static gboolean
update_for_meta (CockpitMetrics *self,
JsonObject *meta,
gboolean reset)
{
CockpitChannel *channel = COCKPIT_CHANNEL (self);
JsonArray *array;
JsonObject *info;
JsonArray *instances;
guint length;
gchar const *derive;
array = json_object_get_array_member (meta, "metrics");
g_return_val_if_fail (array != NULL, FALSE);
length = json_array_get_length (array);
if (self->priv->metric_info == NULL)
{
self->priv->n_metrics = length;
self->priv->metric_info = g_new0 (MetricInfo, length);
self->priv->last_data = g_new0 (double *, length);
self->priv->next_data = g_new0 (double *, length);
self->priv->derived = g_new0 (double *, length);
reset = TRUE;
}
else if (self->priv->n_metrics != length)
{
cockpit_channel_fail (channel, "protocol-error",
"number of metrics must not change");
return FALSE;
}
for (int i = 0; i < length; i++)
{
info = json_array_get_object_element (array, i);
g_return_val_if_fail (info != NULL, FALSE);
if (!cockpit_json_get_string (info, "derive", NULL, &derive))
{
cockpit_channel_fail (channel, "protocol-error",
"unsupported derive value: not a string");
return FALSE;
}
if (!derive)
{
self->priv->metric_info[i].derive = DERIVE_NONE;
}
else if (g_str_equal (derive, "delta"))
{
self->priv->metric_info[i].derive = DERIVE_DELTA;
}
else if (g_str_equal (derive, "rate"))
{
self->priv->metric_info[i].derive = DERIVE_RATE;
}
else
{
cockpit_channel_fail (channel, "protocol-error",
"unsupported derive function: %s", derive);
return FALSE;
}
if (!cockpit_json_get_array (info, "instances", NULL, &instances))
{
cockpit_channel_fail (channel, "protocol-error",
"unsupported instances value: not a string");
return FALSE;
}
if (instances)
{
self->priv->metric_info[i].has_instances = TRUE;
self->priv->metric_info[i].n_next_instances = json_array_get_length (instances);
}
else
{
self->priv->metric_info[i].has_instances = FALSE;
self->priv->metric_info[i].n_next_instances = 1;
}
}
realloc_next_buffer (self);
realloc_derived_buffer (self);
g_return_val_if_fail (cockpit_json_get_int (meta, "interval", 1000, &self->priv->meta_interval),
FALSE);
self->priv->meta_reset = reset;
return TRUE;
}
static void
send_object (CockpitMetrics *self,
JsonObject *object)
{
CockpitChannel *channel = (CockpitChannel *)self;
GBytes *bytes;
bytes = cockpit_json_write_bytes (object);
cockpit_channel_send (channel, bytes, TRUE);
g_bytes_unref (bytes);
}
/*
* cockpit_metrics_send_meta:
* @self: The CockpitMetrics
* @meta: An object containing metric meta data
*
* Send metrics meta data down the channel. If you use cockpit_metrics_send_data()
* then you must use this function instead of sending stuff on the channel directly.
*/
void
cockpit_metrics_send_meta (CockpitMetrics *self,
JsonObject *meta,
gboolean reset)
{
cockpit_metrics_flush_data (self);
if (self->priv->next_meta)
json_object_unref (self->priv->next_meta);
self->priv->next_meta = json_object_ref (meta);
if (update_for_meta (self, meta, reset))
send_object (self, meta);
}
static void
send_array (CockpitMetrics *self,
JsonArray *array)
{
CockpitChannel *channel = (CockpitChannel *)self;
GBytes *bytes;
JsonNode *node;
gsize length;
gchar *ret;
node = json_node_new (JSON_NODE_ARRAY);
json_node_set_array (node, array);
ret = cockpit_json_write (node, &length);
json_node_free (node);
bytes = g_bytes_new_take (ret, length);
cockpit_channel_send (channel, bytes, TRUE);
g_bytes_unref (bytes);
}
static JsonArray *
push_array_at (JsonArray *array,
guint index,
JsonNode *node)
{
if (array == NULL)
array = json_array_new ();
g_assert (index >= json_array_get_length (array));
while (index > json_array_get_length (array))
json_array_add_null_element (array);
if (node)
json_array_add_element (array, node);
return array;
}
static JsonArray *
compute_and_maybe_push_value (CockpitMetrics *self,
double interpol_r,
int metric,
int next_instance,
int last_instance,
JsonArray *array,
int index)
{
double val = self->priv->next_data[metric][next_instance];
if (last_instance >= 0)
{
double last_val = self->priv->last_data[metric][last_instance];
if (self->priv->interpolate && !isnan (last_val))
{
val = last_val * (1.0 - interpol_r) + val * interpol_r;
self->priv->next_data[metric][next_instance] = val;
}
switch (self->priv->metric_info[metric].derive)
{
case DERIVE_DELTA:
val = val - last_val;
break;
case DERIVE_RATE:
val = (val - last_val) / (self->priv->next_timestamp - self->priv->last_timestamp) * 1000;
break;
case DERIVE_NONE:
break;
}
}
else
{
if (self->priv->metric_info[metric].derive != DERIVE_NONE)
val = NAN;
}
if (self->priv->compress == FALSE
|| next_instance != last_instance
|| !self->priv->derived_valid
|| val != self->priv->derived[metric][next_instance])
{
self->priv->derived[metric][next_instance] = val;
JsonNode *node = json_node_new (JSON_NODE_VALUE);
if (!isnan (val))
json_node_set_double (node, val);
else
json_node_set_boolean (node, FALSE);
array = push_array_at (array, index, node);
}
return array;
}
static int
find_last_instance (CockpitMetrics *self,
int metric,
int instance)
{
if (self->priv->meta_reset)
return -1;
if (self->priv->last_meta == self->priv->next_meta)
return instance;
JsonArray *last_metrics = json_object_get_array_member (self->priv->last_meta, "metrics");
JsonArray *next_metrics = json_object_get_array_member (self->priv->next_meta, "metrics");
if (last_metrics == NULL
|| next_metrics == NULL
|| json_array_get_length (last_metrics) <= metric
|| json_array_get_length (next_metrics) <= metric)
return -1;
JsonObject *last_metric = json_array_get_object_element (last_metrics, metric);
JsonObject *next_metric = json_array_get_object_element (next_metrics, metric);
if (last_metric == NULL
|| next_metric == NULL)
return -1;
JsonArray *last_instances = json_object_get_array_member (last_metric, "instances");
JsonArray *next_instances = json_object_get_array_member (next_metric, "instances");
if (last_instances == NULL
|| next_instances == NULL
|| json_array_get_length (next_instances) <= instance)
return -1;
int n_last_instances = json_array_get_length (last_instances);
const gchar *next_instance = json_array_get_string_element (next_instances, instance);
for (int i = 0; i < n_last_instances; i++)
{
if (g_strcmp0 (json_array_get_string_element (last_instances, i), next_instance) == 0)
return i;
}
return -1;
}
static JsonArray *
build_json_data (CockpitMetrics *self, double interpol_r)
{
JsonArray *output = NULL;
JsonNode *node;
for (int i = 0; i < self->priv->n_metrics; i++)
{
if (self->priv->metric_info[i].has_instances)
{
JsonArray *res = NULL;
for (int j = 0; j < self->priv->metric_info[i].n_next_instances; j++)
res = compute_and_maybe_push_value (self, interpol_r, i, j, find_last_instance (self, i, j), res, j);
node = json_node_new (JSON_NODE_ARRAY);
json_node_take_array (node, res ? res : json_array_new ());
output = push_array_at (output, i, node);
}
else
output = compute_and_maybe_push_value (self, interpol_r, i, 0, (self->priv->meta_reset? -1 : 0), output, i);
}
if (output == NULL)
output = json_array_new ();
return output;
}
double **
cockpit_metrics_get_data_buffer (CockpitMetrics *self)
{
return self->priv->next_data;
}
/*
* cockpit_metrics_send_data:
* @self: The CockpitMetrics
*
* Send metrics data down the channel, possibly doing interframe
* compression between what was sent last. The data to send comes
* from the buffer returned by @cockpit_metrics_get_data_buffer.
*/
void
cockpit_metrics_send_data (CockpitMetrics *self, gint64 timestamp)
{
JsonArray *res;
double interpol_r = 1.0;
if (self->priv->message == NULL)
self->priv->message = json_array_new ();
if (self->priv->interpolate && !self->priv->meta_reset)
{
double interval = ((double)(timestamp - self->priv->last_timestamp));
if (interval > 0)
{
interpol_r = self->priv->meta_interval / interval;
timestamp = self->priv->last_timestamp + self->priv->meta_interval;
}
}
self->priv->next_timestamp = timestamp;
res = build_json_data (self, interpol_r);
json_array_add_array_element (self->priv->message, res);
/* Now setup for the next round by swapping buffers and then making
sure that the new 'next' buffer has the right layout.
*/
double **t = self->priv->last_data;
self->priv->last_data = self->priv->next_data;
self->priv->next_data = t;
if (self->priv->last_meta != self->priv->next_meta)
{
realloc_next_buffer (self);
for (int i = 0; i < self->priv->n_metrics; i++)
self->priv->metric_info[i].n_last_instances = self->priv->metric_info[i].n_next_instances;
if (self->priv->last_meta)
json_object_unref (self->priv->last_meta);
self->priv->last_meta = json_object_ref (self->priv->next_meta);
}
self->priv->derived_valid = TRUE;
self->priv->last_timestamp = self->priv->next_timestamp;
self->priv->meta_reset = FALSE;
}
void
cockpit_metrics_flush_data (CockpitMetrics *self)
{
if (self->priv->message)
{
send_array (self, self->priv->message);
json_array_unref (self->priv->message);
self->priv->message = NULL;
}
}
void
cockpit_metrics_set_interpolate (CockpitMetrics *self,
gboolean interpolate)
{
self->priv->interpolate = interpolate;
}
void
cockpit_metrics_set_compress (CockpitMetrics *self,
gboolean compress)
{
self->priv->compress = compress;
}