diff --git a/doc/09-object-types.md b/doc/09-object-types.md index 17583f871c3..9bd3e66a697 100644 --- a/doc/09-object-types.md +++ b/doc/09-object-types.md @@ -1241,6 +1241,148 @@ for an example. TLS for the HTTP proxy can be enabled with `enable_tls`. In addition to that you can specify the certificates with the `ca_path`, `cert_path` and `cert_key` attributes. +### ElasticsearchDatastreamWriter + +Writes check result metrics and performance data to an Elasticsearch timeseries datastream. +This configuration object is available as the [elasticsearch datastream feature](14-features.md#elasticsearchdatastream-writer). + + +Example: + +``` +object ElasticsearchDatastreamWriter "datastreamwriter" { + host = "127.0.0.1" + port = 9200 + datastream_namespace = "production" + + enable_send_perfdata = true + + host_tags_template = ["icinga-production"] + filter = {{ "datastream" in host.groups }} + + flush_threshold = 1024 + flush_interval = 10 +} +``` + +Configuration Attributes: + + Name | Type | Description + --------------------------|-----------------------|---------------------------------- + host | String | **Required.** Elasticsearch host address. Defaults to `127.0.0.1`. + port | Number | **Required.** Elasticsearch port. Defaults to `9200`. + enable\_tls | Boolean | **Optional.** Whether to use a TLS stream. Defaults to `false`. + insecure\_noverify | Boolean | **Optional.** Disable TLS peer verification. + ca\_path | String | **Optional.** Path to CA certificate to validate the remote host. Requires `enable_tls` set to `true`. + enable\_ha | Boolean | **Optional.** Enable the high availability functionality. Only valid in a [cluster setup](06-distributed-monitoring.md#distributed-monitoring-high-availability-features). Defaults to `false`. + flush\_interval | Duration | **Optional.** How long to buffer data points before transferring to Elasticsearch. Defaults to `10s`. + flush\_threshold | Number | **Optional.** How many data points to buffer before forcing a transfer to Elasticsearch. Defaults to `1024`. + +Auth: + + Name | Type | Description + --------------------------|-----------------------|---------------------------------- + username | String | **Optional.** Basic auth username for Elasticsearch + password | String | **Optional.** Basic auth password for Elasticsearch + api_token | String | **Optional.** Authorization token for Elasticsearch + cert\_path | String | **Optional.** Path to host certificate to present to the remote host for mutual verification. Requires `enable_tls` set to `true`. + key\_path | String | **Optional.** Path to host key to accompany the cert\_path. Requires `enable_tls` set to `true`. + +Changing the behavior of the writer: + + Name | Type | Description + --------------------------|-----------------------|---------------------------------- + datastream_namespace | String | **Required.** Suffix for the datastream names. Defaults to `default`. + manage\_index\_template | Boolean | **Optional.** Whether to create and manage the index template in Elasticsearch. This requires the user to have `manage_index_templates` permission in Elasticsearch. Defaults to `true`. + enable\_send\_perfdata | Boolean | **Optional.** Send parsed performance data metrics for check results. Defaults to `false`. + enable\_send\_thresholds | Boolean | **Optional.** Whether to send warn, crit, min & max performance data. + host\_tags\_template | Array | **Optional.** Allows add [tags](https://www.elastic.co/docs/reference/ecs/ecs-base#field-tags) to the document for a Host check result. + service\_tags\_template | Array | **Optional.** Allows add [tags](https://www.elastic.co/docs/reference/ecs/ecs-base#field-tags) to the document for a Service check result. + host\_labels\_template | Dictionary | **Optional.** Allows add [labels](https://www.elastic.co/docs/reference/ecs/ecs-base#field-labels) to the document for a Host check result. + service\_labels\_template | Dictionary | **Optional.** Allows add [labels](https://www.elastic.co/docs/reference/ecs/ecs-base#field-labels) to the document for a Service check result. + filter | Function | **Optional.** An expression to filter which check results should be sent to Elasticsearch. Defaults to sending all check results. + +#### Macro Usage (Tags, Labels & Namespace) + +Macros can be used inside the following template attributes: + +- host_tags_template (array of strings) +- service_tags_template (array of strings) +- host_labels_template (dictionary of key -> string value) +- service_labels_template (dictionary of key -> string value) +- datastream_namespace (string) + +Behavior: + +- Tags: Each array element may contain zero or more macros. If at least one macro is missing/unresolvable, + the entire tag element is skipped and a debug log entry is written. +- Labels: Each dictionary value may contain macros. If at least one macro inside the value is missing, that + label key/value pair is skipped and a debug log entry is written. +- Namespace: The datastream_namespace string may contain macros. If a macro is missing or resolves to an + empty value, the writer falls back to the default namespace "default". +- Validation: A template string with an unterminated '$' (e.g. "$host.name") raises a configuration + validation error referencing the original string. +- Macros never partially substitute: either all macros in the string resolve and the rendered value + is used, or (for tags/labels) the entry is skipped + + +#### Normalization + +The resolved datastream namespace and the check name included in the datastream dataset name undergo +normalization. any leading whitespace and leading special characters are trimmed; all remaining special +(non-alphanumeric) characters are replaced with an underscore; consecutive underscores are collapsed and all +characters are written in lowercase. This ensures stable index names that are +[accepted by Elasticsearch](https://www.elastic.co/docs/api/doc/elasticsearch/operation/operation-indices-create#operation-indices-create-index). + +Field names for the performance data are only normalized in that they cannot contain dots (`.`) as they are +used for field separation and would break the index templates field mapping. If any performance data label +contains a dot, it will be replaced with an underscore. + +Examples: + +``` +object ElasticsearchDatastreamWriter "example-datastream" { + datastream_namespace = "$host.vars.env$" // Falls back to "default" if $host.vars.env$ is missing + + host_tags_template = [ + "env-$host.vars.env$", + "$host.name$" + ] + + service_tags_template = [ + "svc-$service.name$", + "$service.display_name$" + ] + + host_labels_template = { + os = "$host.vars.os$" + fqdn = "$host.name$" + } + + service_labels_template = { + check_cmd = "$service.check_command$" + attempted_env = "$host.vars.missing_env$" // Skipped if missing_env not set + } + + filter = {{ service && "production" in host.groups }} +} +``` + +A missing macro example for a host check result: +- service_tags_template element "svc-$service.name$" is skipped (service not in scope). +- service_labels_template value "$service.check_command$" is skipped for host check results. + +#### Filter Expression + +The filter accepts an expression (function literal) and only the variables host and service are available. (service is null / undefined for host check results.) + +Examples: +``` +filter = {{ "production" in host.groups }} +filter = {{ service && "linux" in host.groups }} +``` +If the filter returns true, the check result is sent; otherwise it is skipped. + ### ExternalCommandListener Implements the Icinga 1.x command pipe which can be used to send commands to Icinga. diff --git a/doc/14-features.md b/doc/14-features.md index 5aa775c4251..ef8a4652d6d 100644 --- a/doc/14-features.md +++ b/doc/14-features.md @@ -439,6 +439,130 @@ The recommended way of running Elasticsearch in this scenario is a dedicated ser where you either have the Elasticsearch HTTP API, or a TLS secured HTTP proxy, or Logstash for additional filtering. + +#### Elasticsearch Datastream Writer + +> **Note** +> +> This is a newer alternative to the Elasticsearch Writer above. The Elasticsearch Datastream Writer uses +> Elasticsearch's data stream feature and follows the Elastic Common Schema (ECS), providing better performance +> and data organization. Use this writer for new installations. The original Elasticsearch Writer is still +> available for backward compatibility. +> +> OpenSearch: The data stream mode and ECS component template usage differ slightly in OpenSearch. The +> ElasticsearchDatastreamWriter focuses on Elasticsearch compatibility first. OpenSearch can ingest the data, +> but you may need to adapt the installed index/component templates manually (e.g. remove time_series mode if +> unsupported, adjust mappings). The option `manage_index_template` will not work with OpenSearch. + + +This feature sends check results with performance data to an [Elasticsearch](https://www.elastic.co/products/elasticsearch) instance or cluster. + +> **Note** +> +> This feature requires Elasticsearch to support time series data streams (Elasticsearch 8.x+), and to have the ECS +> component template installed. It was tested successfully with Elasticsearch 8.12 and 9.0.8. + + +Enable the feature and restart Icinga 2. + +```bash +icinga2 feature enable elasticsearchdatastream +``` + +The default configuration expects an Elasticsearch instance running on `localhost` on port `9200` + and writes to datastreams with the pattern `metrics-icinga2.-`. + +More configuration details can be found [here](09-object-types.md#objecttype-elasticsearchdatastreamwriter). + +#### Current Elasticsearch Schema + +The documents for the ElasticsearchDatastreamWriter try to follow the [Elastic Common Schema (ECS)](https://www.elastic.co/guide/en/ecs/current/index.html) +version `8.0` as close as possible, with some additional changes to fit the Icinga 2 data model. +All documents are written to a data stream of the format `metrics-icinga.-`, +where `` is the name of the checkcommand being executed to keep the number of fields per index low +and documents with the same performance data grouped together. `` is an optional +configuration parameter to further separate documents, e.g. by environment like `production` or `development`. +The `datastream_namespace` can also be used to separate documents e.g. by hostgroups or zones, by using the +`filter` function to filter the check results and use several writers with different namespaces. +Time‑series dimensions are applied to `host.name` and (when present) `service.name`, aligning with ECS host and service +definitions: [ECS host fields](https://www.elastic.co/guide/en/ecs/current/ecs-host.html), +[ECS service fields](https://www.elastic.co/guide/en/ecs/current/ecs-service.html). + +Icinga 2 automatically adds the following threshold metrics +if existing: + +``` +perfdata..min +perfdata..max +perfdata..warn +perfdata..crit +``` + +#### Adding additional tags and labels + +Additionally it is possible to configure custom tags and labels that are applied to the metrics via +`host_tags_template`/`service_tags_template` and `host_labels_template`/`service_labels_template` +respectively. Depending on whether the write event was triggered on a service or host object, +additional tags are added to the ElasticSearch entries. + +A host metrics entry configured with the following `host_tags_template`: + +``` +host_tags_template = ["production", "$host.groups"] +host_labels_template = { + os = "$host.vars.os$" +} +``` + +Will in addition to the above mentioned lines also contain: + +``` +"tags": ["production", "linux-servers;group-A"], +"labels": { "os": "Linux" } +``` + +#### Filtering check results + +You can filter which check results are sent to Elasticsearch by using the `filter` parameter. +It takes a function (expression) evaluated for every check result and must return a boolean. +If the function returns `true`, the check result is sent; otherwise it is skipped. + +Only the variables `host` and `service` are available inside this expression. +For host check results `service` is not set (null/undefined). No other variables (such as +the raw check result object) are exposed. + +Example configuration that only sends service check results for hosts in the `linux-server` hostgroup: + + +``` +object ElasticsearchDatastreamWriter "elasticsearchdatastream" { + ... + datastream_namespace = "production" + filter = {{ service && "linux-server" in host.groups }} +} +``` + +#### Elasticsearch Datastream Writer in Cluster HA Zones + +The Elasticsearch Datastream Writer feature supports [high availability](06-distributed-monitoring.md#distributed-monitoring-high-availability-features) +in cluster zones. + +By default, all endpoints in a zone will activate the feature and start +writing events to the Elasticsearch HTTP API. In HA enabled scenarios, +it is possible to set `enable_ha = true` in all feature configuration +files. This allows each endpoint to calculate the feature authority, +and only one endpoint actively writes events, the other endpoints +pause the feature. + +When the cluster connection breaks at some point, the remaining endpoint(s) +in that zone will automatically resume the feature. This built-in failover +mechanism ensures that events are written even if the cluster fails. + +The recommended way of running Elasticsearch in this scenario is a dedicated server +where you either have the Elasticsearch HTTP API, or a TLS secured HTTP proxy, +or Logstash for additional filtering. + + ### Graylog Integration #### GELF Writer diff --git a/etc/icinga2/features-available/elasticsearchdatastream.conf b/etc/icinga2/features-available/elasticsearchdatastream.conf new file mode 100644 index 00000000000..668507af31a --- /dev/null +++ b/etc/icinga2/features-available/elasticsearchdatastream.conf @@ -0,0 +1,82 @@ +/* + * The ElasticsearchDatastreamWriter feature writes Icinga 2 events to an Elasticsearch datastream. + * This feature requires Elasticsearch 8.12 or later. + */ + +object ElasticsearchDatastreamWriter "elasticsearch" { + host = "127.0.0.1" + port = 9200 + + /* To enable a https connection, set enable_tls to true. */ + // enable_tls = false + + /* The datastream namespace to use. This can be used to separate different + * Icinga instances or let multiple Writers write to different + * datastreams in the same Elasticsearch cluster by using the filter option. + * The Elasticsearch datastream name will be + * "metrics-icinga2.{check}-{datastream_namespace}". + */ + // datastream_namespace = "default" + + /* You can authorize icinga2 through three different methods. + * 1. Basic authentication with username and password. + * 2. Bearer token authentication with api_token. + * 3. Client certificate authentication with cert_path and key_path. + */ + // username = "icinga2" + // password = "changeme" + + // api_token = "" + + // cert_path = "/path/to/cert.pem" + // key_path = "/path/to/key.pem" + // ca_path = "/path/to/ca.pem" + + /* Enable sending the threshold values as additional fields + * with the service check metrics. If set to true, it will + * send warn and crit for every performance data item. + */ + // enable_send_thresholds = false + + /* The flush settings control how often data is sent to Elasticsearch. + * You can either flush based on a time interval or the number of + * events in the buffer. Whichever comes first will trigger a flush. + */ + // flush_threshold = 1024 + // flush_interval = 10s + + /* By default, all endpoints in a zone will activate the feature and start + * writing events to the Elasticsearch HTTP API. In HA enabled scenarios, + * it is possible to set `enable_ha = true` in all feature configuration + * files. This allows each endpoint to calculate the feature authority, + * and only one endpoint actively writes events, the other endpoints + * pause the feature. + */ + // enable_ha = false + + /* By default, the feature will create an index template in Elasticsearch + * for the datastreams. If you want to manage the index template yourself, + * set manage_index_template to false. + */ + // manage_index_template = true + + /* Additional tags and labels can be added to the host and service + * documents by using the host_tags_template, service_tags_template, + * host_labels_template and service_labels_template options. + * The tags and labels are static and will be added to every document. + */ + // host_tags_template = [ "icinga", "$host.vars.os$" ] + // service_tags_template = [ "icinga", "$service.vars.id$" ] + // host_labels_template = { "env" = "production", "os" = "$host.vars.os$" } + // service_labels_template = { "env" = "production", "id" = "$host.vars.id$" } + + /* The filter option can be used to filter which events are sent to + * Elasticsearch. The filter is a regular Icinga 2 filter expression. + * The filter is applied to both host and service events. + * If the filter evaluates to true, the event is sent to Elasticsearch. + * If the filter is not set, all events are sent to Elasticsearch. + * You can use any attribute of the host, service, checkable or + * checkresult (cr) objects in the filter expression. + */ + // filter = {{ host.name == "myhost" || service.name == "myservice" }} +} diff --git a/lib/perfdata/CMakeLists.txt b/lib/perfdata/CMakeLists.txt index 168938c4206..9ad8b4df97f 100644 --- a/lib/perfdata/CMakeLists.txt +++ b/lib/perfdata/CMakeLists.txt @@ -6,6 +6,7 @@ mkclass_target(influxdbcommonwriter.ti influxdbcommonwriter-ti.cpp influxdbcommo mkclass_target(influxdbwriter.ti influxdbwriter-ti.cpp influxdbwriter-ti.hpp) mkclass_target(influxdb2writer.ti influxdb2writer-ti.cpp influxdb2writer-ti.hpp) mkclass_target(elasticsearchwriter.ti elasticsearchwriter-ti.cpp elasticsearchwriter-ti.hpp) +mkclass_target(elasticsearchdatastreamwriter.ti elasticsearchdatastreamwriter-ti.cpp elasticsearchdatastreamwriter-ti.hpp) mkclass_target(opentsdbwriter.ti opentsdbwriter-ti.cpp opentsdbwriter-ti.hpp) mkclass_target(perfdatawriter.ti perfdatawriter-ti.cpp perfdatawriter-ti.hpp) @@ -18,6 +19,7 @@ set(perfdata_SOURCES influxdb2writer.cpp influxdb2writer.hpp influxdb2writer-ti.hpp opentsdbwriter.cpp opentsdbwriter.hpp opentsdbwriter-ti.hpp perfdatawriter.cpp perfdatawriter.hpp perfdatawriter-ti.hpp + elasticsearchdatastreamwriter.cpp elasticsearchdatastreamwriter.hpp elasticsearchdatastreamwriter-ti.hpp ) if(ICINGA2_UNITY_BUILD) @@ -58,6 +60,15 @@ install_if_not_exists( ${ICINGA2_CONFIGDIR}/features-available ) +install_if_not_exists( + ${PROJECT_SOURCE_DIR}/usr/elasticsearch/index-template.json + ${ICINGA2_PKGDATADIR}/elasticsearch +) +install_if_not_exists( + ${PROJECT_SOURCE_DIR}/etc/icinga2/features-available/elasticsearchdatastream.conf + ${ICINGA2_CONFIGDIR}/features-available +) + install_if_not_exists( ${PROJECT_SOURCE_DIR}/etc/icinga2/features-available/opentsdb.conf ${ICINGA2_CONFIGDIR}/features-available diff --git a/lib/perfdata/elasticsearchdatastreamwriter.cpp b/lib/perfdata/elasticsearchdatastreamwriter.cpp new file mode 100644 index 00000000000..946adc3c183 --- /dev/null +++ b/lib/perfdata/elasticsearchdatastreamwriter.cpp @@ -0,0 +1,897 @@ +/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "base/application.hpp" +#include "base/base64.hpp" +#include "base/defer.hpp" +#include "base/dictionary.hpp" +#include "base/exception.hpp" +#include "base/io-engine.hpp" +#include "base/json.hpp" +#include "base/logger.hpp" +#include "base/perfdatavalue.hpp" +#include "base/statsfunction.hpp" +#include "base/stream.hpp" +#include "base/string.hpp" +#include "base/tcpsocket.hpp" +#include "base/tlsstream.hpp" +#include "base/utility.hpp" +#include "icinga/compatutility.hpp" +#include "icinga/macroprocessor.hpp" +#include "icinga/service.hpp" +#include "remote/url.hpp" + +#include "perfdata/elasticsearchdatastreamwriter.hpp" +#include "perfdata/elasticsearchdatastreamwriter-ti.cpp" + +using namespace icinga; +namespace beast = boost::beast; +namespace http = beast::http; + +static void ExceptionHandler(const boost::exception_ptr& exp); +static Array::Ptr ExtractTemplateTags(const MacroProcessor::ResolverList& resolvers, const Array::Ptr& tagsTmpl, + const Checkable::Ptr& checkable, const CheckResult::Ptr& cr); +static Dictionary::Ptr ExtractTemplateLabels(const MacroProcessor::ResolverList& resolvers, const Dictionary::Ptr& labelsTmpl, + const Checkable::Ptr& checkable, const CheckResult::Ptr& cr); +static String NormalizeElasticsearchFieldName(const String& fieldName); +static String NormalizeElasticsearchIndexPart(const String& fieldName); + +REGISTER_TYPE(ElasticsearchDatastreamWriter); + +REGISTER_STATSFUNCTION(ElasticsearchDatastreamWriter, &ElasticsearchDatastreamWriter::StatsFunc); + +void ElasticsearchDatastreamWriter::OnConfigLoaded() +{ + ObjectImpl::OnConfigLoaded(); + + m_WorkQueue.SetName("ElasticsearchDatastreamWriter, " + GetName()); + + if (!GetEnableHa()) { + Log(LogDebug, "ElasticsearchDatastreamWriter") + << "HA functionality disabled. Won't pause connection: " << GetName(); + + SetHAMode(HARunEverywhere); + } else { + SetHAMode(HARunOnce); + } +} + +void ElasticsearchDatastreamWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr&) +{ + for (const ElasticsearchDatastreamWriter::Ptr& writer : ConfigType::GetObjectsByType()) { + status->Set( + writer->GetName(), + new Dictionary({ + { "work_queue_items", writer->m_WorkQueue.GetTaskCount(60) / 60.0 }, + { "work_queue_item_rate", writer->m_WorkQueue.GetLength() }, + { "documents_sent", writer->m_DocumentsSent }, + { "documents_sent_error", writer->m_DocumentsFailed }, + { "checkresults_filtered_out", writer->m_ItemsFilteredOut.load() } + }) + ); + } +} + +void ElasticsearchDatastreamWriter::Resume() +{ + ObjectImpl::Resume(); + + Log(LogInformation, "ElasticsearchDatastreamWriter") + << "'" << GetName() << "' resumed."; + m_Paused = false; + m_WorkQueue.SetExceptionCallback([](boost::exception_ptr exp) { ExceptionHandler(exp); }); + + if (GetManageIndexTemplate()) { + // Ensure index template exists/is updated as the first item on the work queue. + m_WorkQueue.Enqueue([this]() { ManageIndexTemplate(); }); + } + + /* Setup timer for periodically flushing m_DataBuffer */ + m_FlushTimer = Timer::Create(); + m_FlushTimer->SetInterval(GetFlushInterval()); + m_FlushTimer->OnTimerExpired.connect([this](const Timer * const&) { + m_WorkQueue.Enqueue([this]() { Flush(); }); + }); + m_FlushTimer->Start(); + + /* Register for new metrics. */ + m_HandleCheckResults = Checkable::OnNewCheckResult.connect( + [this](const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, const MessageOrigin::Ptr&) { + CheckResultHandler(checkable, cr); + } + ); +} + +/* Pause is equivalent to Stop, but with HA capabilities to resume at runtime. */ +void ElasticsearchDatastreamWriter::Pause() +{ + m_HandleCheckResults.disconnect(); + m_FlushTimer->Stop(true); + m_Paused = true; + m_WorkQueue.Enqueue([this]() { Flush(); }); + m_WorkQueue.Join(); + + Log(LogInformation, "ElasticsearchDatastreamWriter") + << "'" << GetName() << "' paused."; + + ObjectImpl::Pause(); +} + +void ElasticsearchDatastreamWriter::ManageIndexTemplate() +{ + AssertOnWorkQueue(); + + String templatePath = ICINGA_PKGDATADIR "/elasticsearch/index-template.json"; + std::ifstream templateFile{ templatePath, std::ifstream::in }; + if (!templateFile.is_open()) { + Log(LogCritical, "ElasticsearchDatastreamWriter") + << "Could not open index template file: " << templatePath; + return; + } + std::ostringstream templateStream; + templateStream << templateFile.rdbuf(); + String templateJson = templateStream.str(); + templateFile.close(); + Log(LogDebug, "ElasticsearchDatastreamWriter") + << "Read index template from " << templatePath; + Log(LogDebug, "ElasticsearchDatastreamWriter") << templateJson; + + Url::Ptr indexTemplateUrl = new Url(); + indexTemplateUrl->SetScheme(GetEnableTls() ? "https" : "http"); + indexTemplateUrl->SetHost(GetHost()); + indexTemplateUrl->SetPort(GetPort()); + indexTemplateUrl->SetPath({ "_index_template", "icinga2-metrics" }); + + Url::Ptr componentTemplateUrl = new Url(); + componentTemplateUrl->SetScheme(GetEnableTls() ? "https" : "http"); + componentTemplateUrl->SetHost(GetHost()); + componentTemplateUrl->SetPort(GetPort()); + componentTemplateUrl->SetPath({ "_component_template", "metrics-icinga2@custom" }); + componentTemplateUrl->SetQuery({ {"create", "true"} }); + + while (true) { + if (m_Paused) { + Log(LogInformation, "ElasticsearchDatastreamWriter") + << "Shutdown in progress, aborting index template management."; + return; + } + + try { + Dictionary::Ptr jsonResponse = TrySend(componentTemplateUrl, "{\"template\":{}}"); + Log(LogInformation, "ElasticsearchDatastreamWriter") + << "Successfully installed component template 'icinga2@custom'."; + } catch (const StatusCodeException& es) { + if (es.GetStatusCode() == http::status::bad_request) { + Log(LogInformation, "ElasticsearchDatastreamWriter") + << "Component template 'metrics-icinga2@custom' already exists, skipping creation."; + // Continue to install/update index template + } else { + Log(LogWarning, "ElasticsearchDatastreamWriter") + << "Failed to install component template 'icinga2@custom', retrying in 5 seconds: " << DiagnosticInformation(es, false); + Log(LogDebug, "ElasticsearchDatastreamWriter") + << "Additional information:\n" << DiagnosticInformation(es, true); + Utility::Sleep(5); + continue; + } + } catch (const std::exception& ex) { + Log(LogWarning, "ElasticsearchDatastreamWriter") + << "Failed to install component template 'icinga2@custom', retrying in 5 seconds: " << DiagnosticInformation(ex, false); + Log(LogDebug, "ElasticsearchDatastreamWriter") + << "Additional information:\n" << DiagnosticInformation(ex, true); + Utility::Sleep(5); + continue; + } + + try { + // don't move, as we retry in a loop if it fails. + Dictionary::Ptr jsonResponse = TrySend(indexTemplateUrl, templateJson); + Log(LogInformation, "ElasticsearchDatastreamWriter") + << "Successfully installed/updated index template 'icinga2-metrics'."; + Log(LogDebug, "ElasticsearchDatastreamWriter") + << "Response: " << JsonEncode(jsonResponse); + break; + } catch (const std::exception& ex) { + Log(LogWarning, "ElasticsearchDatastreamWriter") + << "Failed to install/update index template 'icinga2-metrics', retrying in 5 seconds: " << DiagnosticInformation(ex, false); + Log(LogDebug, "ElasticsearchDatastreamWriter") + << "Additional information:\n" << DiagnosticInformation(ex, true); + Utility::Sleep(5); + } + } +} + +Dictionary::Ptr ElasticsearchDatastreamWriter::ExtractPerfData(const Checkable::Ptr& checkable, const Array::Ptr& perfdata) +{ + Dictionary::Ptr pdFields = new Dictionary(); + if (!perfdata) + return pdFields; + + ObjectLock olock(perfdata); + for (const Value& val : perfdata) { + PerfdataValue::Ptr pdv; + + if (val.IsObjectType()) + pdv = val; + else { + try { + pdv = PerfdataValue::Parse(val); + } catch (const std::exception&) { + Log(LogWarning, "ElasticsearchDatastreamWriter") + << "Ignoring invalid perfdata for checkable '" + << checkable->GetName() << "' with value: " << val; + continue; + } + } + + Dictionary::Ptr metric = new Dictionary(); + metric->Set("value", pdv->GetValue()); + if (pdv->GetCounter()) metric->Set("counter", pdv->GetCounter()); + + if (!pdv->GetMin().IsEmpty()) metric->Set("min", pdv->GetMin()); + if (!pdv->GetMax().IsEmpty()) metric->Set("max", pdv->GetMax()); + if (!pdv->GetUnit().IsEmpty()) metric->Set("unit", pdv->GetUnit()); + + if (!pdv->GetWarn().IsEmpty() && GetEnableSendThresholds()) + metric->Set("warn", pdv->GetWarn()); + if (!pdv->GetCrit().IsEmpty() && GetEnableSendThresholds()) + metric->Set("crit", pdv->GetCrit()); + + String label = NormalizeElasticsearchFieldName(pdv->GetLabel()); + pdFields->Set(label, metric); + } + + return pdFields; +} + +// user-defined tags, as specified in the ECS specification: https://www.elastic.co/docs/reference/ecs/ecs-base#field-tags +static Array::Ptr ExtractTemplateTags(const MacroProcessor::ResolverList& resolvers, + const Array::Ptr& tagsTmpl, const Checkable::Ptr& checkable, const CheckResult::Ptr& cr) +{ + if (tagsTmpl == nullptr) { + return nullptr; + } + + ObjectLock olock(tagsTmpl); + Array::Ptr tags = new Array(); + for (const String tag : tagsTmpl) { + String missingMacro; + Value value = MacroProcessor::ResolveMacros(tag, resolvers, cr, &missingMacro); + if (!missingMacro.IsEmpty()) { + Log(LogDebug, "ElasticsearchDatastreamWriter") + << "Missing macro for tag: " << tag + << ". Missing: " << missingMacro + << " for checkable '" << checkable->GetName() << "'. Skipping."; + continue; + } + tags->Add(value); + } + + return tags; +} + +// user-defined labels, as specified in the ECS specification: https://www.elastic.co/docs/reference/ecs/ecs-base#field-labels +static Dictionary::Ptr ExtractTemplateLabels(const MacroProcessor::ResolverList& resolvers, + const Dictionary::Ptr& labelsTmpl, const Checkable::Ptr& checkable, const CheckResult::Ptr& cr) +{ + if (labelsTmpl == nullptr) { + return nullptr; + } + + ObjectLock olock(labelsTmpl); + Dictionary::Ptr labels = new Dictionary(); + for (const Dictionary::Pair& labelKv : labelsTmpl) { + String missingMacro; + Value value = MacroProcessor::ResolveMacros(labelKv.second, resolvers, cr, &missingMacro); + if (!missingMacro.IsEmpty()) { + Log(LogDebug, "ElasticsearchDatastreamWriter") + << "Missing macro for label: " << labelKv.first + << "Label: " << labelKv.second + << ". Missing: " << missingMacro + << " for checkable '" << checkable->GetName() << "'. Skipping."; + continue; + } + labels->Set(labelKv.first, value); + } + + return labels; +} + +String ElasticsearchDatastreamWriter::ExtractDatastreamNamespace(const MacroProcessor::ResolverList& resolvers, + const Checkable::Ptr& checkable, const CheckResult::Ptr& cr) +{ + String namespaceTmpl = GetDatastreamNamespace(); + + String missingMacro; + Value value = MacroProcessor::ResolveMacros(namespaceTmpl, resolvers, cr, &missingMacro); + if (!missingMacro.IsEmpty() || value.IsEmpty()) { + Log(LogDebug, "ElasticsearchDatastreamWriter") + << "Missing value for namespace. Missing: " << missingMacro + << " for checkable '" << checkable->GetName() << "'. Skipping."; + return "default"; + } + + return NormalizeElasticsearchIndexPart(value); +} + +bool ElasticsearchDatastreamWriter::Filter(const Checkable::Ptr& checkable) +{ + if (!m_CompiledFilter) + return true; + + auto [host, service] = GetHostService(checkable); + + Namespace::Ptr frameNS = new Namespace(); + frameNS->Set("host", host); + frameNS->Set("service", service); + + ScriptFrame frame(true, frameNS); + return Convert::ToBool(m_CompiledFilter->Evaluate(frame)); +} + +void ElasticsearchDatastreamWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr) +{ + if (IsPaused()) + return; + + if (!IcingaApplication::GetInstance()->GetEnablePerfdata() || !checkable->GetEnablePerfdata()) + return; + + if (!Filter(checkable)) { + m_ItemsFilteredOut.fetch_add(1); + Log(LogDebug, "ElasticsearchDatastreamWriter") + << "Check result for checkable '" << checkable->GetName() << "' filtered out."; + return; + } + + auto [host, service] = GetHostService(checkable); + + /* ECS host object population + * References: + * https://www.elastic.co/guide/en/ecs/current/ecs-host.html + * The classic ECS host fields (name, hostname) are extended here with + * Icinga-specific state information (soft_state, hard_state) to aid consumers + * correlating monitoring state with metrics. + */ + Dictionary::Ptr ecsHost = new Dictionary({ + {"name", host->GetDisplayName()}, + {"hostname", host->GetName()}, + {"soft_state", host->GetState()}, + {"hard_state", host->GetLastHardState()} + }); + if (!host->GetZoneName().IsEmpty()) ecsHost->Set("zone", host->GetZoneName()); + + Dictionary::Ptr ecsService; + if (service) { + /* ECS service object population + * References: + * https://www.elastic.co/guide/en/ecs/current/ecs-service.html + * We include ECS service 'name' plus Icinga display/state details. + * The added state fields (soft_state, hard_state) are Icinga-centric + * extensions. + */ + ecsService = new Dictionary({ + {"name", service->GetName()}, + {"display_name", service->GetDisplayName()}, + {"soft_state", service->GetState()}, + {"hard_state", service->GetLastHardState()} + }); + if (!service->GetZoneName().IsEmpty()) ecsService->Set("zone", service->GetZoneName()); + } + + + Dictionary::Ptr checkResult = new Dictionary({ + {"current_check_attempts", checkable->GetCheckAttempt()}, + {"reachable", checkable->IsReachable()} + }); + + m_WorkQueue.Enqueue([this, cr, checkable, host = std::move(host), service = std::move(service), ecsHost = std::move(ecsHost), + ecsService = std::move(ecsService), checkResult = std::move(checkResult)]() + { + MacroProcessor::ResolverList resolvers; + resolvers.emplace_back("host", host); + if (service) { + resolvers.emplace_back("service", service); + } + + Dictionary::Ptr ecs_metadata = new Dictionary({ + { "version", "8.0.0" } + }); + + String datastreamNamespace = ExtractDatastreamNamespace(resolvers, checkable, cr); + String datastreamDataset = "icinga2." + NormalizeElasticsearchIndexPart(checkable->GetCheckCommandRaw()); + String datastreamName = "metrics-" + datastreamDataset + "-" + datastreamNamespace; + Dictionary::Ptr data_stream = new Dictionary({ + {"type", "metrics"}, + {"dataset", datastreamDataset}, + {"namespace", datastreamNamespace} + }); + + char _addr[16]; + if (!host->GetAddress().IsEmpty() && inet_pton(AF_INET, host->GetAddress().CStr(), _addr) == 1) { + ecsHost->Set("ip", host->GetAddress()); + } else if (!host->GetAddress6().IsEmpty() && inet_pton(AF_INET6, host->GetAddress6().CStr(), _addr) == 1) { + ecsHost->Set("ip", host->GetAddress6()); + } else if (!host->GetAddress().IsEmpty()) { + ecsHost->Set("fqdn", host->GetAddress()); + } + + Dictionary::Ptr ecsAgent = new Dictionary({ + {"type", "icinga2"}, + {"name", cr->GetCheckSource()} + }); + Endpoint::Ptr endpoint = checkable->GetCommandEndpoint(); + if (endpoint) { + ecsAgent->Set("id", endpoint->GetName()); + ecsAgent->Set("version", endpoint->GetIcingaVersionString()); + } else { + ecsAgent->Set("id", IcingaApplication::GetInstance()->GetName()); + ecsAgent->Set("version", IcingaApplication::GetAppSpecVersion()); + } + + Dictionary::Ptr ecsEvent = new Dictionary({ + {"created", FormatTimestamp(cr->GetScheduleEnd())}, + {"start", FormatTimestamp(cr->GetExecutionStart())}, + {"end", FormatTimestamp(cr->GetExecutionEnd())}, + {"kind", "metric"}, + {"module", "icinga2"} + }); + + checkResult->Set("exit_status", cr->GetExitStatus()); + checkResult->Set("execution_time", cr->CalculateExecutionTime()); + checkResult->Set("latency", cr->CalculateLatency()); + checkResult->Set("schedule_start", FormatTimestamp(cr->GetScheduleStart())); + checkResult->Set("schedule_end", FormatTimestamp(cr->GetScheduleEnd())); + checkResult->Set("active", cr->GetActive()); + checkResult->Set("max_attempts", checkable->GetMaxCheckAttempts()); + + Dictionary::Ptr document = new Dictionary({ + {"@timestamp", FormatTimestamp(cr->GetExecutionEnd())}, + { "ecs", ecs_metadata }, + { "data_stream", data_stream }, + { "host", ecsHost }, + { "agent", ecsAgent }, + { "event", ecsEvent }, + { "check", checkResult }, + { "message", cr->GetOutput() }, + }); + + Dictionary::Ptr perfdata = ExtractPerfData(checkable, cr->GetPerformanceData()); + if (perfdata->GetLength() != 0) { + document->Set("perfdata", perfdata); + } + + Array::Ptr ecsTags = ExtractTemplateTags( + resolvers, service ? GetServiceTagsTemplate() : GetHostTagsTemplate(), checkable, cr); + if (ecsTags && ecsTags->GetLength() != 0 ) { + document->Set("tags", ecsTags); + } + + Dictionary::Ptr ecsLabels = ExtractTemplateLabels( + resolvers, service ? GetServiceLabelsTemplate() : GetHostLabelsTemplate(), checkable, cr); + if (ecsLabels && ecsLabels->GetLength() != 0 ) { + document->Set("labels", ecsLabels); + } + + if (ecsService && ecsService->GetLength() != 0 ) { + document->Set("service", ecsService); + } + + m_DataBuffer.emplace_back(new EcsDocument(datastreamName,document)); + if (static_cast(m_DataBuffer.size()) >= GetFlushThreshold()) { + Flush(); + } + }); +} + +void ElasticsearchDatastreamWriter::Flush() +{ + AssertOnWorkQueue(); + + /* Flush can be called from 1) Timeout 2) Threshold 3) on shutdown/reload. */ + if (m_DataBuffer.empty()) + return; + + Url::Ptr url = new Url(); + url->SetScheme(GetEnableTls() ? "https" : "http"); + url->SetHost(GetHost()); + url->SetPort(GetPort()); + url->SetPath({ "_bulk" }); + + String body = String(); + for (const auto &document : m_DataBuffer) { + Dictionary::Ptr index = new Dictionary({ + { "create", new Dictionary({ { "_index", document->GetIndex() } }) } + }); + + body += JsonEncode(index) + "\n"; + body += JsonEncode(document->GetDocument()) + "\n"; + } + + Dictionary::Ptr jsonResponse; + while (true) { + try { + jsonResponse = TrySend(url, std::move(body)); + m_DocumentsSent += m_DataBuffer.size(); + break; + } catch (const std::exception& ex) { + if (m_Paused) { + // We are shutting down, don't retry. + Log(LogWarning, "ElasticsearchDatastreamWriter") + << "Flush failed during shutdown, dropping " << m_DataBuffer.size() << " documents: " << DiagnosticInformation(ex, false); + m_DataBuffer.clear(); + break; + } + + Log(LogWarning, "ElasticsearchDatastreamWriter") + << "Flush failed, retrying in 5 seconds: " << DiagnosticInformation(ex, false); + Log(LogDebug, "ElasticsearchDatastreamWriter") + << "Additional information:\n" << DiagnosticInformation(ex, true); + Utility::Sleep(5); + } + } + + Log(LogInformation, "ElasticsearchDatastreamWriter") + << "Successfully sent " << m_DataBuffer.size() + << " documents to Elasticsearch. Took " + << jsonResponse->Get("took") << "ms."; + + Value errors = jsonResponse->Get("errors"); + if (!errors.ToBool()) { + Log(LogDebug, "ElasticsearchDatastreamWriter") << "No errors during write operation."; + m_DataBuffer.clear(); + return; + } + + Array::Ptr items = jsonResponse->Get("items"); + int c = 0; + ObjectLock olock(items); + for (const Dictionary::Ptr value : items) { + Dictionary::Ptr itemResult = value->Get("create"); + int itemResultStatus = itemResult->Get("status"); + if (itemResultStatus > 299) { + m_DocumentsFailed += 1; + Dictionary::Ptr itemError = itemResult->Get("error"); + String value = itemError->Get("type") + ": " + itemError->Get("reason"); + Log(LogWarning, "ElasticsearchDatastreamWriter") + << "Error during document creation: " << value; + + if (c >= m_DataBuffer.size()) { + Log(LogCritical, "ElasticsearchDatastreamWriter") + << "Got an error response from elasticsearch ouside the sent messages"; + break; + } + + Log(LogDebug, "ElasticsearchDatastreamWriter") + << "Error response: " << JsonEncode(itemResult) << "\n" + << "Document: " << JsonEncode(m_DataBuffer[c]->GetDocument()); + } + ++c; + } + m_DataBuffer.clear(); +} + +Value ElasticsearchDatastreamWriter::TrySend(const Url::Ptr& url, String body) +{ + // Make sure we are not randomly flushing from a timeout under load. + m_FlushTimer->Reschedule(-1); + + http::request request (http::verb::post, std::string(url->Format(true)), 10); + request.set(http::field::user_agent, "Icinga/" + Application::GetAppSpecVersion()); + request.set(http::field::host, url->GetHost() + ":" + url->GetPort()); + + /* Specify required headers by Elasticsearch. */ + request.set(http::field::accept, "application/json"); + + /* Use application/x-ndjson for bulk streams. While ES + * is able to handle application/json, the newline separator + * causes problems with Logstash (#6609). + */ + request.set(http::field::content_type, "application/x-ndjson"); + + /* Send authentication if configured. */ + String username = GetUsername(); + String password = GetPassword(); + String apiToken = GetApiToken(); + + if (!username.IsEmpty() && !password.IsEmpty()) { + request.set(http::field::authorization, "Basic " + Base64::Encode(username + ":" + password)); + } else if (!apiToken.IsEmpty()) { + request.set(http::field::authorization, "ApiKey " + apiToken); + } + + request.body() = std::move(body); + request.content_length(request.body().size()); + + Log(LogDebug, "ElasticsearchDatastreamWriter") + << "Sending " << request.method_string() << " request" << ((!username.IsEmpty() && !password.IsEmpty()) ? " with basic auth" : "" ) + << " to '" << url->Format() << "'."; + + http::parser parser; + beast::flat_buffer buf; + + OptionalTlsStream stream = Connect(); + Defer closeStream([&stream]() { + if (stream.first) { + stream.first->lowest_layer().close(); + } else if (stream.second) { + stream.second->lowest_layer().close(); + } + }); + + try { + if (stream.first) { + http::write(*stream.first, request); + stream.first->flush(); + http::read(*stream.first, buf, parser); + } else { + http::write(*stream.second, request); + stream.second->flush(); + http::read(*stream.second, buf, parser); + } + } catch (const std::exception&) { + Log(LogWarning, "ElasticsearchDatastreamWriter") + << "Cannot perform http request API on host '" << GetHost() << "' port '" << GetPort() << "'."; + throw; + } + + auto& response (parser.get()); + if (response.result_int() > 299) { + Log(LogDebug, "ElasticsearchDatastreamWriter") + << "Unexpected response code " << static_cast(response.result()) << " from URL '" << url->Format() << "'. Error: " << response.body(); + BOOST_THROW_EXCEPTION(StatusCodeException( + response.result(), + "Unexpected response code from Elasticsearch", + response.body() + )); + } + + auto& contentType (response[http::field::content_type]); + /* Accept application/json with optional charset (any variant), case-insensitive. */ + std::string ctLower = std::string(contentType.data(), contentType.size()); + boost::trim(ctLower); + boost::to_lower(ctLower); + if (!(ctLower == "application/json" || ctLower == "application/json; charset=utf-8")) { + Log(LogCritical, "ElasticsearchDatastreamWriter") << "Unexpected Content-Type: '" << ctLower << "'"; + BOOST_THROW_EXCEPTION(std::runtime_error(String("Unexpected Content-Type '" + ctLower + "'"))); + } + + auto& responseBody (response.body()); + + Dictionary::Ptr jsonResponse; + try { + return JsonDecode(responseBody); + } catch (...) { + Log(LogWarning, "ElasticsearchDatastreamWriter") + << "Unable to parse JSON response:\n" << responseBody; + throw; + } +} + +OptionalTlsStream ElasticsearchDatastreamWriter::Connect() +{ + Log(LogNotice, "ElasticsearchDatastreamWriter") + << "Connecting to Elasticsearch on host '" << GetHost() << "' port '" << GetPort() << "'."; + + OptionalTlsStream stream; + bool tls = GetEnableTls(); + + if (tls) { + Shared::Ptr sslContext; + + try { + sslContext = MakeAsioSslContext(GetCertPath(), GetKeyPath(), GetCaPath()); + } catch (const std::exception&) { + Log(LogWarning, "ElasticsearchDatastreamWriter") + << "Unable to create SSL context."; + throw; + } + + stream.first = Shared::Make(IoEngine::Get().GetIoContext(), *sslContext, GetHost()); + + } else { + stream.second = Shared::Make(IoEngine::Get().GetIoContext()); + } + + try { + icinga::Connect(tls ? stream.first->lowest_layer() : stream.second->lowest_layer(), GetHost(), GetPort()); + } catch (const std::exception&) { + Log(LogWarning, "ElasticsearchDatastreamWriter") + << "Can't connect to Elasticsearch on host '" << GetHost() << "' port '" << GetPort() << "'."; + throw; + } + + if (tls) { + auto& tlsStream (stream.first->next_layer()); + + try { + tlsStream.handshake(boost::asio::ssl::stream_base::client); + } catch (const std::exception&) { + Log(LogWarning, "ElasticsearchDatastreamWriter") + << "TLS handshake with host '" << GetHost() << "' on port " << GetPort() << " failed."; + throw; + } + + if (!GetInsecureNoverify()) { + if (!tlsStream.GetPeerCertificate()) { + BOOST_THROW_EXCEPTION(std::runtime_error("Elasticsearch didn't present any TLS certificate.")); + } + + if (!tlsStream.IsVerifyOK()) { + BOOST_THROW_EXCEPTION(std::runtime_error( + "TLS certificate validation failed: " + std::string(tlsStream.GetVerifyError()) + )); + } + } + } + + return stream; +} + +void ElasticsearchDatastreamWriter::AssertOnWorkQueue() +{ + ASSERT(m_WorkQueue.IsWorkerThread()); +} + +static void ExceptionHandler(const boost::exception_ptr& exp) +{ + Log(LogCritical, "ElasticsearchDatastreamWriter", "Exception during Elastic operation: Verify that your backend is operational!"); + + Log(LogDebug, "ElasticsearchDatastreamWriter") + << "Exception during Elasticsearch operation: " << DiagnosticInformation(exp); +} + +String ElasticsearchDatastreamWriter::FormatTimestamp(double ts) +{ + /* The date format must match the default dynamic date detection + * pattern in indexes. This enables applications like Kibana to + * detect a qualified timestamp index for time-series data. + * + * Example: 2017-09-11T10:56:21.463+0200 + * + * References: + * https://www.elastic.co/guide/en/elasticsearchdatastream/reference/current/dynamic-field-mapping.html#date-detection + * https://www.elastic.co/guide/en/elasticsearchdatastream/reference/current/mapping-date-format.html + * https://www.elastic.co/guide/en/elasticsearchdatastream/reference/current/date.html + */ + auto milliSeconds = static_cast((ts - static_cast(ts)) * 1000); + + return Utility::FormatDateTime("%Y-%m-%dT%H:%M:%S", ts) + "." + Convert::ToString(milliSeconds) + Utility::FormatDateTime("%z", ts); +} + +void ElasticsearchDatastreamWriter::ValidateTagsTemplate(const Array::Ptr& tags) +{ + ObjectLock olock(tags); + for (const Value& tag : tags) { + if (!MacroProcessor::ValidateMacroString(tag)) { + BOOST_THROW_EXCEPTION(ValidationError(this, { "host_tags_template" }, "Closing $ not found in macro format string '" + tag + "'.")); + } + } +} + +void ElasticsearchDatastreamWriter::ValidateHostTagsTemplate(const Lazy& lvalue, const ValidationUtils& utils) +{ + ObjectImpl::ValidateHostTagsTemplate(lvalue, utils); + auto& tags = lvalue(); + if (tags) { + ValidateTagsTemplate(tags); + } +} + +void ElasticsearchDatastreamWriter::ValidateServiceTagsTemplate(const Lazy& lvalue, const ValidationUtils& utils) +{ + ObjectImpl::ValidateServiceTagsTemplate(lvalue, utils); + auto& tags = lvalue(); + if (tags) { + ValidateTagsTemplate(tags); + } +} + +void ElasticsearchDatastreamWriter::ValidateLabelsTemplate(const Dictionary::Ptr& labels) +{ + ObjectLock olock(labels); + for (const Dictionary::Pair& kv : labels) { + if (!MacroProcessor::ValidateMacroString(kv.second)) { + BOOST_THROW_EXCEPTION(ValidationError(this, { "host_tags_template", kv.first }, "Closing $ not found in macro format string '" + kv.second + "'.")); + } + } +} + +void ElasticsearchDatastreamWriter::ValidateHostLabelsTemplate(const Lazy& lvalue, const ValidationUtils& utils) +{ + ObjectImpl::ValidateHostLabelsTemplate(lvalue, utils); + auto& labels = lvalue(); + if (labels) { + ValidateLabelsTemplate(labels); + } +} + +void ElasticsearchDatastreamWriter::ValidateServiceLabelsTemplate(const Lazy& lvalue, const ValidationUtils& utils) +{ + ObjectImpl::ValidateServiceLabelsTemplate(lvalue, utils); + auto& labels = lvalue(); + if (labels) { + ValidateLabelsTemplate(labels); + } +} + +void ElasticsearchDatastreamWriter::ValidateFilter(const Lazy &lvalue, const ValidationUtils &) +{ + const Value &filter = lvalue(); + if (filter.IsEmpty()) { + return; + } + + if (!filter.IsObjectType()) { + BOOST_THROW_EXCEPTION(ValidationError(this, { "filter" }, "Filter must be an expression.")); + } + + std::vector > args; + args.emplace_back(new GetScopeExpression(ScopeThis)); + std::unique_ptr indexer{new IndexerExpression( + std::unique_ptr(MakeLiteral(filter)), + std::unique_ptr(MakeLiteral("call")) + )}; + FunctionCallExpression *fexpr = new FunctionCallExpression(std::move(indexer), std::move(args)); + + m_CompiledFilter = fexpr; +} + +static String NormalizeElasticsearchIndexPart(const String& fieldName) +{ + String normalized(""); + + bool first_char = true; + bool preceding_invalid = false; + for (char& c : boost::trim_copy(fieldName)) { + if (!std::isalnum(static_cast(c))) { + if (first_char || preceding_invalid) continue; + preceding_invalid = true; + c = '_'; + } else { + first_char = false; + preceding_invalid = false; + } + + normalized += tolower(c); + } + + return normalized; +} + +static String NormalizeElasticsearchFieldName(const String& fieldName) +{ + String normalized(""); + + for (char& c : boost::trim_copy(fieldName)) { + // Elasticsearch uses dots for field separation, so we need to avoid them here. + if (c == '.') { + c = '_'; + } + + normalized += tolower(c); + } + + return normalized; +} + +EcsDocument::EcsDocument(String index, Dictionary::Ptr document) + : m_Index(std::move(index)), m_Document(std::move(document)) +{} diff --git a/lib/perfdata/elasticsearchdatastreamwriter.hpp b/lib/perfdata/elasticsearchdatastreamwriter.hpp new file mode 100644 index 00000000000..da3234c0be9 --- /dev/null +++ b/lib/perfdata/elasticsearchdatastreamwriter.hpp @@ -0,0 +1,115 @@ +/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */ + +#ifndef ELASTICSEARCHDATASTREAMWRITER_H +#define ELASTICSEARCHDATASTREAMWRITER_H + +#include +#include + +#include "base/configobject.hpp" +#include "base/dictionary.hpp" +#include "base/shared-object.hpp" +#include "base/workqueue.hpp" +#include "base/timer.hpp" +#include "base/tlsstream.hpp" +#include "config/expression.hpp" +#include "icinga/checkable.hpp" +#include "icinga/checkresult.hpp" +#include "icinga/macroprocessor.hpp" +#include "remote/url.hpp" + +#include "perfdata/elasticsearchdatastreamwriter-ti.hpp" + +namespace icinga +{ + +class EcsDocument final : public SharedObject { + public: + DECLARE_PTR_TYPEDEFS(EcsDocument); + + EcsDocument() = default; + EcsDocument(String index, Dictionary::Ptr document); + + String GetIndex() const { return m_Index; } + void SetIndex(const String& index) { m_Index = index; } + Dictionary::Ptr GetDocument() const { return m_Document; } + void SetDocument(Dictionary::Ptr document) { m_Document = document; } + + private: + String m_Index; + Dictionary::Ptr m_Document; +}; + +class ElasticsearchDatastreamWriter final : public ObjectImpl +{ +public: + DECLARE_OBJECT(ElasticsearchDatastreamWriter); + DECLARE_OBJECTNAME(ElasticsearchDatastreamWriter); + + static void StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata); + + static String FormatTimestamp(double ts); + + void ValidateHostTagsTemplate(const Lazy &lvalue, const ValidationUtils &utils) override; + void ValidateServiceTagsTemplate(const Lazy &lvalue, const ValidationUtils &utils) override; + void ValidateHostLabelsTemplate(const Lazy &lvalue, const ValidationUtils &utils) override; + void ValidateServiceLabelsTemplate(const Lazy &lvalue, const ValidationUtils &utils) override; + void ValidateFilter(const Lazy &lvalue, const ValidationUtils &utils) override; + +protected: + void OnConfigLoaded() override; + void Resume() override; + void Pause() override; + Value TrySend(const Url::Ptr& url, String body); + +private: + WorkQueue m_WorkQueue{10000000, 1}; + boost::signals2::connection m_HandleCheckResults; + Timer::Ptr m_FlushTimer; + bool m_Paused = false; + + // This buffer should only be accessed from the worker thread. + // Every other access will lead to a race-condition. + std::vector m_DataBuffer; + + std::uint64_t m_DocumentsSent = 0; + std::uint64_t m_DocumentsFailed = 0; + std::atomic_uint64_t m_ItemsFilteredOut = 0; + + Expression::Ptr m_CompiledFilter; + + void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr); + void ManageIndexTemplate(); + + Dictionary::Ptr ExtractPerfData(const Checkable::Ptr& checkable, const Array::Ptr& perfdata); + String ExtractDatastreamNamespace(const MacroProcessor::ResolverList& resolvers, const Checkable::Ptr& checkable, + const CheckResult::Ptr& cr); + bool Filter(const Checkable::Ptr& checkable); + + OptionalTlsStream Connect(); + void AssertOnWorkQueue(); + void Flush(); + void SendRequest(const String& body); + + void ValidateTagsTemplate(const Array::Ptr& tags); + void ValidateLabelsTemplate(const Dictionary::Ptr& labels); +}; + +class StatusCodeException : public std::runtime_error +{ +public: + StatusCodeException(boost::beast::http::status statusCode, String message, String body) + : std::runtime_error((message + " (HTTP " + std::to_string(static_cast(statusCode)) + ")").CStr()), + m_StatusCode(statusCode), m_Body(std::move(body)) {} + + boost::beast::http::status GetStatusCode() const { return m_StatusCode; } + const String& GetBody() const { return m_Body; } + +private: + boost::beast::http::status m_StatusCode; + String m_Body; +}; + +} + +#endif /* ELASTICSEARCHDATASTREAM_H */ diff --git a/lib/perfdata/elasticsearchdatastreamwriter.ti b/lib/perfdata/elasticsearchdatastreamwriter.ti new file mode 100644 index 00000000000..04bd3edde9a --- /dev/null +++ b/lib/perfdata/elasticsearchdatastreamwriter.ti @@ -0,0 +1,82 @@ +/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */ + +#include "base/configobject.hpp" + +library perfdata; + +namespace icinga +{ + +class ElasticsearchDatastreamWriter : ConfigObject +{ + activation_priority 100; + + [config, required] String host { + default {{{ return "127.0.0.1"; }}} + }; + [config, required] String port { + default {{{ return "9200"; }}} + }; + [config] bool enable_send_perfdata { + default {{{ return false; }}} + }; + [config] String username; + [config, no_user_view, no_user_modify] String password; + + [config, no_user_view, no_user_modify] String api_token; + + [config] bool enable_tls { + default {{{ return false; }}} + }; + [config] String datastream_namespace { + default {{{ return "default"; }}} + }; + [config] bool enable_send_thresholds { + default {{{ return false; }}} + }; + [config] bool manage_index_template { + default {{{ return true; }}} + }; + [config] bool insecure_noverify { + default {{{ return false; }}} + }; + [config] String ca_path; + [config] String cert_path; + [config] String key_path; + + [config] int flush_interval { + default {{{ return 10; }}} + }; + [config] int flush_threshold { + default {{{ return 1024; }}} + }; + [config] bool enable_ha { + default {{{ return false; }}} + }; + + [config] Array::Ptr host_tags_template; + [config] Array::Ptr service_tags_template; + [config] Dictionary::Ptr host_labels_template; + [config] Dictionary::Ptr service_labels_template; + [config] Value filter; +}; + +validator ElasticsearchDatastreamWriter { + Array host_tags_template { + String "*"; + }; + Array service_tags_template { + String "*"; + }; + + Dictionary host_labels_template { + String "*"; + }; + Dictionary service_labels_template { + String "*"; + }; + + Value filter; +}; + +} diff --git a/lib/remote/endpoint.cpp b/lib/remote/endpoint.cpp index 751f895396e..32b109f4028 100644 --- a/lib/remote/endpoint.cpp +++ b/lib/remote/endpoint.cpp @@ -6,10 +6,8 @@ #include "remote/apilistener.hpp" #include "remote/jsonrpcconnection.hpp" #include "remote/zone.hpp" -#include "base/configtype.hpp" #include "base/utility.hpp" #include "base/exception.hpp" -#include "base/convert.hpp" using namespace icinga; @@ -172,3 +170,12 @@ double Endpoint::GetSecondsProcessingMessages() const { return m_InputProcessingTime; } + +String Endpoint::GetIcingaVersionString() const { + unsigned long version = GetIcingaVersion(); + auto bugfix = version % 100; + version /= 100; + auto minor = version % 100; + auto major = version / 100; + return String() + std::to_string(major) + "." + std::to_string(minor) + "." + std::to_string(bugfix); +} diff --git a/lib/remote/endpoint.hpp b/lib/remote/endpoint.hpp index 0cd015bf9eb..f59a03a464a 100644 --- a/lib/remote/endpoint.hpp +++ b/lib/remote/endpoint.hpp @@ -61,6 +61,8 @@ class Endpoint final : public ObjectImpl double GetSecondsProcessingMessages() const override; + String GetIcingaVersionString() const; + protected: void OnAllConfigLoaded() override; diff --git a/tools/syntax/nano/icinga2.nanorc b/tools/syntax/nano/icinga2.nanorc index cff78be3b64..4388ab05891 100644 --- a/tools/syntax/nano/icinga2.nanorc +++ b/tools/syntax/nano/icinga2.nanorc @@ -1,5 +1,5 @@ ### Nano synteax file -### Icinga2 object configuration file +### Icinga2 object configuration file syntax "icinga2" "/etc/icinga2/.*\.conf$" "/usr/share/icinga2/include/(plugin|itl|.*\.conf$)" @@ -7,6 +7,7 @@ syntax "icinga2" "/etc/icinga2/.*\.conf$" "/usr/share/icinga2/include/(plugin|i icolor brightgreen "object[ \t]+(host|hostgroup|service|servicegroup|user|usergroup)" icolor brightgreen "object[ \t]+(checkcommand|notificationcommand|eventcommand|notification)" icolor brightgreen "object[ \t]+(timeperiod|scheduleddowntime|dependency|perfdatawriter)" +icolor brightgreen "object[ \t]+(elasticsearchwriter|elasticsearchdatastreamwriter)" icolor brightgreen "object[ \t]+(graphitewriter|idomysqlconnection|idomysqlconnection)" icolor brightgreen "object[ \t]+(livestatuslistener|externalcommandlistener)" icolor brightgreen "object[ \t]+(compatlogger|checkcomponent|notificationcomponent)" @@ -32,6 +33,9 @@ icolor red "(^|^\s+)(port|ranges|retry_interval|rotation_interval|rotation_met icolor red "(^|^\s+)(service_format_template|service_name|service_name_template|service_perfdata_path|service_temp_path)" icolor red "(^|^\s+)(severity|socket_path|socket_type|spool_dir|states|status_path|table_prefix)" icolor red "(^|^\s+)(timeout|times|types|update_interval|user|user_groups|users|volatile|zone)" +icolor red "(^|^\s+)(insecure_noverify|flush_interval|flush_threshold|filter|api_token|username|enable_tls)" +icolor red "(^|^\s+)(enable_send_perfdata|datastream_namespace|enable_send_thresholds|manage_index_template)" +icolor red "(^|^\s+)(host_labels_template|service_labels_template|host_tags_template|service_tags_template)" icolor red "(^|^\s+)(vars\.\w+)" diff --git a/tools/syntax/vim/syntax/icinga2.vim b/tools/syntax/vim/syntax/icinga2.vim index 5201d6fc836..5f4a65a8088 100644 --- a/tools/syntax/vim/syntax/icinga2.vim +++ b/tools/syntax/vim/syntax/icinga2.vim @@ -54,6 +54,7 @@ syn match Lambda "{{}}" " Object types syn keyword icinga2ObjType ApiListener ApiUser CheckCommand CheckerComponent syn keyword icinga2ObjType Comment Dependency Downtime ElasticsearchWriter +syn keyword icinga2ObjType ElasticsearchDatastreamWriter syn keyword icinga2ObjType Endpoint EventCommand ExternalCommandListener syn keyword icinga2ObjType FileLogger GelfWriter GraphiteWriter Host HostGroup syn keyword icinga2ObjType IcingaApplication IdoMysqlConnection IdoPgsqlConnection @@ -95,6 +96,8 @@ syn keyword icinga2ObjAttr contained ssl_ca ssl_capath ssl_ca_cert ssl_cert ss syn keyword icinga2ObjAttr contained states status_path table_prefix ticket_salt syn keyword icinga2ObjAttr contained timeout times tls_handshake_timeout tls_protocolmin syn keyword icinga2ObjAttr contained types update_interval user user_groups username users volatile zone +syn keyword icinga2ObjAttr contained api_token datastream_namespace manage_index_template insecure_noverify +syn keyword icinga2ObjAttr contained host_tags_template service_tags_template host_labels_template service_labels_template syn match icinga2ObjAttr contained "\(vars.\w\+\)" " keywords: https://icinga.com/docs/icinga2/latest/doc/17-language-reference/#reserved-keywords diff --git a/usr/elasticsearch/index-template.json b/usr/elasticsearch/index-template.json new file mode 100644 index 00000000000..a99d2360f72 --- /dev/null +++ b/usr/elasticsearch/index-template.json @@ -0,0 +1,130 @@ +{ + "index_patterns": ["metrics-icinga2.*-*"], + "_meta": { + "managed": true, + "managed_by": "icinga2" + }, + "composed_of": ["ecs@mappings", "metrics-icinga2@custom"], + "priority": 500, + "data_stream": {}, + "template": { + "settings": { + "index": { + "mode": "time_series", + "routing_path": ["host.name", "service.name"], + "mapping": { + "total_fields.limit": 2000 + } + } + }, + "mappings": { + "dynamic_templates": [ + { + "perfdata_fields": { + "path_match": "perfdata.*", + "match_mapping_type": "object", + "mapping": { + "type": "object", + "properties": { + "value": { + "type": "double" + }, + "min": { + "type": "double" + }, + "max": { + "type": "double" + }, + "warn": { + "type": "double" + }, + "crit": { + "type": "double" + }, + "unit": { + "type": "keyword" + } + } + } + } + } + ], + "properties": { + "perfdata": { + "type": "object" + }, + "check": { + "properties": { + "exit_status": { + "type": "integer" + }, + "execution_time": { + "type": "double" + }, + "latency": { + "type": "double" + }, + "schedule_start": { + "type": "date" + }, + "schedule_end": { + "type": "date" + }, + "active": { + "type": "boolean" + }, + "current_attempt": { + "type": "integer" + }, + "max_attempts": { + "type": "integer" + }, + "reachable": { + "type": "boolean" + } + } + }, + "host": { + "properties": { + "name": { + "time_series_dimension": true, + "type": "keyword" + }, + "hostname": { + "type": "keyword" + }, + "soft_state": { + "type": "keyword" + }, + "hard_state": { + "type": "keyword" + }, + "zone": { + "type": "keyword" + } + } + }, + "service": { + "properties": { + "name": { + "time_series_dimension": true, + "type": "keyword" + }, + "display_name": { + "type": "keyword" + }, + "zone": { + "type": "keyword" + }, + "hard_state": { + "type": "keyword" + }, + "soft_state": { + "type": "keyword" + } + } + } + } + } + } +}