Projects
domecam:mallard
collectd
0001-WIP-Initial-implementation-for-epics-plugi...
Log In
Username
Password
Overview
Repositories
Revisions
Requests
Users
Attributes
Meta
File 0001-WIP-Initial-implementation-for-epics-plugin.patch of Package collectd
From 83de4749e6df658533345959e08cd9ff76022a65 Mon Sep 17 00:00:00 2001 From: "Matwey V. Kornilov" <matwey.kornilov@gmail.com> Date: Wed, 5 Jan 2022 17:20:06 +0300 Subject: [PATCH] [WIP] Initial implementation for epics plugin Reference: [1] https://epics.anl.gov/docs/ca.php Signed-off-by: Matwey V. Kornilov <matwey.kornilov@gmail.com> --- Makefile.am | 8 + configure.ac | 33 ++++ src/epics.c | 546 +++++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 587 insertions(+) create mode 100644 src/epics.c diff --git a/Makefile.am b/Makefile.am index a7b5de97..eecdc6d9 100644 --- a/Makefile.am +++ b/Makefile.am @@ -1055,6 +1055,14 @@ entropy_la_SOURCES = src/entropy.c entropy_la_LDFLAGS = $(PLUGIN_LDFLAGS) endif +if BUILD_PLUGIN_EPICS +pkglib_LTLIBRARIES += epics.la +epics_la_CFLAGS = $(AM_CFLAGS) $(BUILD_WITH_EPICS_BASE_CFLAGS) +epics_la_SOURCES = src/epics.c +epics_la_LDFLAGS = $(PLUGIN_LDFLAGS) +epics_la_LIBADD = -lpthread $(BUILD_WITH_EPICS_BASE_LIBS) +endif + if BUILD_PLUGIN_EXEC pkglib_LTLIBRARIES += exec.la exec_la_SOURCES = src/exec.c diff --git a/configure.ac b/configure.ac index e8107675..5d6aa9e4 100644 --- a/configure.ac +++ b/configure.ac @@ -2104,6 +2104,35 @@ AC_SUBST([BUILD_WITH_CUDA_LIBS]) # }}} +# --with-epics-base {{{ +AC_ARG_WITH([epics-base], + [AS_HELP_STRING([--with-epics-base@<:@=PREFIX@:>@], [Path to epics-base.])], + [ + with_epics_base="$withval" + if test "x$withval" = "xno"; then + with_epics_base="no (disabled on command line)" + fi + ], + [with_epics_base="yes"] +) + +if test "x$with_epics_base" = "xyes"; then +PKG_CHECK_MODULES([EPICS_BASE], [epics-base], + [with_epics_base="yes"], + [with_epics_base="no (pkg-config could not find epics-base)"] +) +fi + +if test "x$with_epics_base" = "xyes"; then + BUILD_WITH_EPICS_BASE_CFLAGS="$EPICS_BASE_CFLAGS" + BUILD_WITH_EPICS_BASE_LIBS="$EPICS_BASE_LIBS -lca" +fi + +AC_SUBST([BUILD_WITH_EPICS_BASE_CFLAGS]) +AC_SUBST([BUILD_WITH_EPICS_BASE_LIBS]) + +# }}} + # --with-libaquaero5 {{{ AC_ARG_WITH([libaquaero5], [AS_HELP_STRING([--with-libaquaero5@<:@=PREFIX@:>@], [Path to aquatools-ng source code.])], @@ -6665,6 +6694,7 @@ plugin_dpdkevents="no" plugin_dpdkstat="no" plugin_dpdk_telemetry="no" plugin_entropy="no" +plugin_epics="no" plugin_ethstat="no" plugin_fhcount="no" plugin_fscache="no" @@ -7133,6 +7163,7 @@ AC_PLUGIN([dpdk_telemetry], [$plugin_dpdk_telemetry], [Metrics from DPDK AC_PLUGIN([drbd], [$plugin_drbd], [DRBD statistics]) AC_PLUGIN([email], [yes], [EMail statistics]) AC_PLUGIN([entropy], [$plugin_entropy], [Entropy statistics]) +AC_PLUGIN([epics], [$with_epics_base], [EPICS CA plugin]) AC_PLUGIN([ethstat], [$plugin_ethstat], [Stats from NIC driver]) AC_PLUGIN([exec], [yes], [Execution of external programs]) AC_PLUGIN([fhcount], [$plugin_fhcount], [File handles statistics]) @@ -7466,6 +7497,7 @@ AC_MSG_RESULT([ YACC . . . . . . . . $YACC]) AC_MSG_RESULT([ YFLAGS . . . . . . . $YFLAGS]) AC_MSG_RESULT() AC_MSG_RESULT([ Libraries:]) +AC_MSG_RESULT([ epics . . . . . . . . $with_epics_base]) AC_MSG_RESULT([ intel mic . . . . . . $with_mic]) AC_MSG_RESULT([ libaquaero5 . . . . . $with_libaquaero5]) AC_MSG_RESULT([ libatasmart . . . . . $with_libatasmart]) @@ -7583,6 +7615,7 @@ AC_MSG_RESULT([ drbd . . . . . . . . $enable_drbd]) AC_MSG_RESULT([ email . . . . . . . . $enable_email]) AC_MSG_RESULT([ entropy . . . . . . . $enable_entropy]) AC_MSG_RESULT([ ethstat . . . . . . . $enable_ethstat]) +AC_MSG_RESULT([ epics . . . . . . . . $enable_epics]) AC_MSG_RESULT([ exec . . . . . . . . $enable_exec]) AC_MSG_RESULT([ fhcount . . . . . . . $enable_fhcount]) AC_MSG_RESULT([ filecount . . . . . . $enable_filecount]) diff --git a/src/epics.c b/src/epics.c new file mode 100644 index 00000000..f77b76cd --- /dev/null +++ b/src/epics.c @@ -0,0 +1,546 @@ +/** + * collectd - src/epics.c + * Copyright (C) 2022 Matwey V. Kornilov + * + * This program is free software; you can redistribute it and/or modify it + * under the terms of the GNU General Public License as published by the + * Free Software Foundation; only version 2 of the License is applicable. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License for more details. + * + * You should have received a copy of the GNU General Public License along + * with this program; if not, write to the Free Software Foundation, Inc., + * 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA + * + * Authors: + * Matwey V. Kornilov <matwey.kornilov at gmail.com> + **/ + +#include "collectd.h" +#include "plugin.h" +#include "utils/common/common.h" + +#include <cadef.h> + +struct pv { + char *name; + + chtype ch_type; + chid id; + evid eid; + + union { + value_list_t value; + struct { + char *value; + size_t capacity; + } label; + }; + + int is_active : 1; + int is_label : 1; +}; + +static struct { + struct pv *pvs; + int pvs_num; + + pthread_mutex_t lock; + pthread_t thread_id; + + int thread_loop : 1; +} epics_plugin = {.pvs = NULL, + .pvs_num = 0, + .lock = PTHREAD_MUTEX_INITIALIZER, + .thread_loop = 0}; + +static void free_pvs() { + for (; epics_plugin.pvs_num > 0; --epics_plugin.pvs_num) { + struct pv *p = &epics_plugin.pvs[epics_plugin.pvs_num - 1]; + + if (p->is_label) { + free(p->label.value); + } else { + free(p->value.values); + } + free(p->name); + } + + free(epics_plugin.pvs); + epics_plugin.pvs = NULL; +} + +static int printf_handler(const char *pformat, va_list args) { +#if COLLECT_DEBUG + char msg[1024] = ""; // Size inherits from plugin_log() + int status = 0; + + status = vsnprintf(msg, sizeof(msg), pformat, args); + + if (status < 0) { + return status; + } + + msg[strcspn(msg, "\r\n")] = '\0'; + plugin_log(LOG_DEBUG, "%s", msg); + + return status; +#else + return 0; +#endif +} + +static void handle_var_event(struct pv *p, evargs args) { + const data_set_t *ds = plugin_get_ds(p->value.type); + const size_t values_len = p->value.values_len; + value_t *values = p->value.values; + + if (ds == NULL) { + ERROR("epics plugin: Unknown type \"%s\" for channel \"%s\". See types.db(5) for details.", p->value.type, p->name); + + return; + } + + int i = 0; + const int ds_type = ds->ds[0].type; + + if (args.count != values_len) { + DEBUG( + "epics plugin: Unexpected channel element count %lu for channel \"%s\"", + args.count, p->name); + return; + } + + pthread_mutex_lock(&epics_plugin.lock); + + if (ds_type == DS_TYPE_COUNTER && args.type == DBR_LONG) { + const long *value = (const long *)args.dbr; + + for (i = 0; i < values_len; ++i) { + values[i].counter = value[i]; + } + } else if (ds_type == DS_TYPE_DERIVE && args.type == DBR_LONG) { + const long *value = (const long *)args.dbr; + + for (i = 0; i < values_len; ++i) { + values[i].derive = value[i]; + } + } else if (ds_type == DS_TYPE_ABSOLUTE && args.type == DBR_LONG) { + const long *value = (const long *)args.dbr; + + for (i = 0; i < values_len; ++i) { + values[i].absolute = value[i]; + } + } else if (ds_type == DS_TYPE_GAUGE && args.type == DBR_DOUBLE) { + const double *value = (const double *)args.dbr; + + for (i = 0; i < values_len; ++i) { + values[i].gauge = value[i]; + } + } else { + DEBUG("epics plugin: Unexpected data type \"%s\" for channel type \"%s\"", + DS_TYPE_TO_STRING(ds_type), dbf_type_to_text(args.type)); + } + + pthread_mutex_unlock(&epics_plugin.lock); +} + +static void handle_label_event(struct pv *p, evargs args) { + const char *value = (const char *)args.dbr; + const size_t value_len = strlen(value); + + if (args.count != 1) { + DEBUG( + "epics plugin: Unexpected channel element count %lu for channel \"%s\"", + args.count, p->name); + return; + } + + pthread_mutex_lock(&epics_plugin.lock); + + if (p->label.capacity <= value_len) { + free(p->label.value); + + p->label.value = strdup(value); + if (p->label.value == NULL) { + ERROR("epics plugin: Cannot allocate memory for \"%s\" value", p->name); + + return; + } + + p->label.capacity = value_len + 1; + } else { + strcpy(p->label.value, value); + } + + pthread_mutex_unlock(&epics_plugin.lock); +} + +static void event_handler(evargs args) { + struct pv *p = (struct pv *)args.usr; + + if (args.status != ECA_NORMAL) { + WARNING("epics plugin: Error %s at channel \"%s\"", ca_message(args.status), + p->name); + + return; + } + + if (p->is_label) { + handle_label_event(p, args); + } else { + handle_var_event(p, args); + } +} + +static void handle_conn_up(struct pv *p) { + int ret = 0; + + if (p->eid) { + INFO("epics plugin: Channel \"%s\" reconnected", p->name); + + p->is_active = 1; + + return; + } + + if (p->is_label) { + p->ch_type = DBR_STRING; + } else { + const data_set_t *ds = plugin_get_ds(p->value.type); + + if (ds == NULL) { + ERROR("epics plugin: Unknown type \"%s\" for channel \"%s\". See types.db(5) for details.", p->value.type, p->name); + + return; + } + + const int ds_type = ds->ds[0].type; + + p->ch_type = ca_field_type(p->id); + if (ds_type == DS_TYPE_COUNTER && + (p->ch_type == DBR_SHORT || p->ch_type == DBR_LONG)) { + p->ch_type = DBR_LONG; + } else if (ds_type == DS_TYPE_GAUGE && + (p->ch_type == DBR_FLOAT || p->ch_type == DBR_DOUBLE || + p->ch_type == DBR_ENUM)) { + p->ch_type = DBR_DOUBLE; + } else if (ds_type == DS_TYPE_DERIVE && + (p->ch_type == DBR_SHORT || p->ch_type == DBR_LONG)) { + p->ch_type = DBR_LONG; + } else if (ds_type == DS_TYPE_ABSOLUTE && + (p->ch_type == DBR_SHORT || p->ch_type == DBR_LONG)) { + p->ch_type = DBR_LONG; + } else { + ERROR("epics plugin: Variable type \"%s\" doesn't match channel type " + "\"%s\" for channel \"%s\"", + DS_TYPE_TO_STRING(ds_type), dbf_type_to_text(p->ch_type), p->name); + + return; + } + + if (ca_element_count(p->id) != ds->ds_num) { + ERROR("epics plugin: Variable element number %lu doesn't channel element " + "count %lu for channel \"%s\"", + ds->ds_num, ca_element_count(p->id), p->name); + + return; + } + + p->value.values_len = ds->ds_num; + p->value.values = calloc(p->value.values_len, sizeof(value_t)); + if (p->value.values == NULL) { + ERROR("epics plugin: Cannot allocate memory for %lu values or channel \"%s\"", ds->ds_num, p->name); + + return; + } + } + + ret = + ca_create_subscription(p->ch_type, ca_element_count(p->id), p->id, + DBE_VALUE | DBE_ALARM, event_handler, p, &p->eid); + if (ret != ECA_NORMAL) { + ERROR("epics plugin: CA error %s occurred while trying to create " + "subscription for channel \"%s\"", + ca_message(ret), p->name); + + return; + } + + p->is_active = 1; +} + +static void handle_conn_down(struct pv *p) { + WARNING("epics plugin: Channel \"%s\" disconnected", p->name); + + p->is_active = 0; +} + +static void connection_handler(struct connection_handler_args args) { + struct pv *p = (struct pv *)ca_puser(args.chid); + + switch (args.op) { + case CA_OP_CONN_UP: + handle_conn_up(p); + break; + case CA_OP_CONN_DOWN: + handle_conn_down(p); + break; + } +} + +static void *epics_thread(void *args) { + int i = 0; + long ret = 0; + + ret = ca_context_create(ca_disable_preemptive_callback); + if (ret != ECA_NORMAL) { + // FIXME: report error back to start_thread() + ERROR("epics plugin: CA error %s occurred while trying to start channel " + "access", + ca_message(ret)); + return (void *)1; + } + + ca_replace_printf_handler(&printf_handler); + + for (i = 0; i < epics_plugin.pvs_num; ++i) { + struct pv *p = &epics_plugin.pvs[i]; + + ret = ca_create_channel(p->name, &connection_handler, p, 0, &p->id); + if (ret != ECA_NORMAL) { + ERROR("epics plugin: CA error %s occurred while trying to create channel " + "\"%s\"", + ca_message(ret), p->name); + ret = 1; + goto error; + } + } + + while (epics_plugin.thread_loop != 0) { + ca_pend_event(2.0); + } + +error: + for (; i > 0; --i) { + struct pv *p = &epics_plugin.pvs[i - 1]; + + if (p->eid) { + ca_clear_subscription(p->eid); + } + ca_clear_channel(p->id); + } + + ca_context_destroy(); + + return (void *)ret; +} + +static int start_thread(void) { + int ret = 0; + + pthread_mutex_lock(&epics_plugin.lock); + + if (epics_plugin.thread_loop != 0) { + goto epics_unlock; + } + + epics_plugin.thread_loop = 1; + ret = plugin_thread_create(&epics_plugin.thread_id, epics_thread, (void *)0, + "epics"); + if (ret != 0) { + epics_plugin.thread_loop = 0; + ERROR("epics plugin: Starting thread failed: %d", ret); + + goto epics_unlock; + } + + // FIXME: wait untill ca_context_create success + +epics_unlock: + pthread_mutex_unlock(&epics_plugin.lock); + + return ret; +} + +static int stop_thread(void) { + int ret = 0; + + pthread_mutex_lock(&epics_plugin.lock); + + if (epics_plugin.thread_loop == 0) { + goto epics_unlock; + } + + epics_plugin.thread_loop = 0; + +epics_unlock: + pthread_mutex_unlock(&epics_plugin.lock); + + ret = pthread_join(epics_plugin.thread_id, NULL); + + return ret; +} + +static int epics_config_variable(oconfig_item_t *ci, struct pv *p) { + oconfig_item_t *c; + + if (cf_util_get_string(ci, &p->name) != 0 || p->name == NULL) { + ERROR("epics plugin: Wrong variable configuration"); + + return -1; + } + + for (c = ci->children; c != ci->children + ci->children_num; ++c) { + if (strcasecmp(c->key, "Type") == 0) { + if (cf_util_get_string_buffer(c, p->value.type, + sizeof(p->value.type)) != 0) { + + return -1; + } + + sstrncpy(p->value.type_instance, p->name, + sizeof(p->value.type_instance)); + } else { + ERROR( + "epics plugin: Unknown configuration key \"%s\" for variable \"%s\"", + c->key, p->name); + + return -1; + } + } + + sstrncpy(p->value.plugin, "epics", sizeof(p->value.plugin)); + + return 0; +} + +static int epics_config_label(oconfig_item_t *ci, struct pv *p) { + if (cf_util_get_string(ci, &p->name) != 0 || p->name == NULL) { + ERROR("epics plugin: Wrong label configuration"); + + return -1; + } + + p->is_label = 1; + + return 0; +} + +static int epics_config(oconfig_item_t *ci) { + oconfig_item_t *c; + + if (ci->children_num == 0) { + ERROR("epics plugin: No variables are specified"); + + return -1; + } + + epics_plugin.pvs = calloc(ci->children_num, sizeof(struct pv)); + if (!epics_plugin.pvs) { + ERROR("epics plugin: Cannot allocate memory for PV list"); + + return -1; + } + memset(epics_plugin.pvs, 0, sizeof(struct pv) * ci->children_num); + + for (c = ci->children, epics_plugin.pvs_num = 0; + c != ci->children + ci->children_num; ++c, ++epics_plugin.pvs_num) { + struct pv *p = &epics_plugin.pvs[epics_plugin.pvs_num]; + + if (strcasecmp(c->key, "Variable") == 0) { + if (epics_config_variable(c, p) != 0) + goto error; + } else if (strcasecmp(c->key, "Label") == 0) { + if (epics_config_label(c, p) != 0) + goto error; + } else { + ERROR("epics plugin: Unknown configuration key \"%s\"", ci->key); + goto error; + } + } + + return 0; + +error: + free_pvs(); + + return -1; +} + +static int epics_init(void) { return start_thread(); } + +static int epics_shutdown(void) { + stop_thread(); + free_pvs(); + + return 0; +} + +static int epics_read(void) { + int i = 0; + int ret = 0; + cdtime_t time; + meta_data_t *md = NULL; + + md = meta_data_create(); + if (md == NULL) { + ERROR("epics plugin: Cannot allocate memory for meta data"); + + return -1; + } + + pthread_mutex_lock(&epics_plugin.lock); + + time = cdtime(); + + for (i = 0; i < epics_plugin.pvs_num; ++i) { + struct pv *p = &epics_plugin.pvs[i]; + + if (!p->is_active || !p->is_label) { + continue; + } + + ret = meta_data_add_string(md, p->name, p->label.value); + if (ret != 0) { + ERROR("epics plugin: Cannot add value for meta \"%s\"", p->name); + + goto error; + } + } + + for (i = 0; i < epics_plugin.pvs_num; ++i) { + struct pv *p = &epics_plugin.pvs[i]; + + if (!p->is_active || p->is_label) { + continue; + } + + p->value.time = time; + p->value.meta = md; + + ret = plugin_dispatch_values(&p->value); + if (ret != 0) { + ERROR("epics plugin: Cannot dispatch values for \"%s\"", p->name); + + goto error; + } + } + + ret = 0; +error: + pthread_mutex_unlock(&epics_plugin.lock); + meta_data_destroy(md); + + return ret; +} + +void module_register(void) { + plugin_register_complex_config("epics", epics_config); + plugin_register_read("epics", epics_read); + plugin_register_init("epics", epics_init); + plugin_register_shutdown("epics", epics_shutdown); +} -- 2.26.2
Locations
Projects
Search
Status Monitor
Help
Open Build Service
OBS Manuals
API Documentation
OBS Portal
Reporting a Bug
Contact
Mailing List
Forums
Chat (IRC)
Twitter
Open Build Service (OBS)
is an
openSUSE project
.