diff --git a/doc/09-object-types.md b/doc/09-object-types.md
index 17583f871c3..9bd3e66a697 100644
--- a/doc/09-object-types.md
+++ b/doc/09-object-types.md
@@ -1241,6 +1241,148 @@ for an example.
TLS for the HTTP proxy can be enabled with `enable_tls`. In addition to that
you can specify the certificates with the `ca_path`, `cert_path` and `cert_key` attributes.
+### ElasticsearchDatastreamWriter
+
+Writes check result metrics and performance data to an Elasticsearch timeseries datastream.
+This configuration object is available as the [elasticsearch datastream feature](14-features.md#elasticsearchdatastream-writer).
+
+
+Example:
+
+```
+object ElasticsearchDatastreamWriter "datastreamwriter" {
+ host = "127.0.0.1"
+ port = 9200
+ datastream_namespace = "production"
+
+ enable_send_perfdata = true
+
+ host_tags_template = ["icinga-production"]
+ filter = {{ "datastream" in host.groups }}
+
+ flush_threshold = 1024
+ flush_interval = 10
+}
+```
+
+Configuration Attributes:
+
+ Name | Type | Description
+ --------------------------|-----------------------|----------------------------------
+ host | String | **Required.** Elasticsearch host address. Defaults to `127.0.0.1`.
+ port | Number | **Required.** Elasticsearch port. Defaults to `9200`.
+ enable\_tls | Boolean | **Optional.** Whether to use a TLS stream. Defaults to `false`.
+ insecure\_noverify | Boolean | **Optional.** Disable TLS peer verification.
+ ca\_path | String | **Optional.** Path to CA certificate to validate the remote host. Requires `enable_tls` set to `true`.
+ enable\_ha | Boolean | **Optional.** Enable the high availability functionality. Only valid in a [cluster setup](06-distributed-monitoring.md#distributed-monitoring-high-availability-features). Defaults to `false`.
+ flush\_interval | Duration | **Optional.** How long to buffer data points before transferring to Elasticsearch. Defaults to `10s`.
+ flush\_threshold | Number | **Optional.** How many data points to buffer before forcing a transfer to Elasticsearch. Defaults to `1024`.
+
+Auth:
+
+ Name | Type | Description
+ --------------------------|-----------------------|----------------------------------
+ username | String | **Optional.** Basic auth username for Elasticsearch
+ password | String | **Optional.** Basic auth password for Elasticsearch
+ api_token | String | **Optional.** Authorization token for Elasticsearch
+ cert\_path | String | **Optional.** Path to host certificate to present to the remote host for mutual verification. Requires `enable_tls` set to `true`.
+ key\_path | String | **Optional.** Path to host key to accompany the cert\_path. Requires `enable_tls` set to `true`.
+
+Changing the behavior of the writer:
+
+ Name | Type | Description
+ --------------------------|-----------------------|----------------------------------
+ datastream_namespace | String | **Required.** Suffix for the datastream names. Defaults to `default`.
+ manage\_index\_template | Boolean | **Optional.** Whether to create and manage the index template in Elasticsearch. This requires the user to have `manage_index_templates` permission in Elasticsearch. Defaults to `true`.
+ enable\_send\_perfdata | Boolean | **Optional.** Send parsed performance data metrics for check results. Defaults to `false`.
+ enable\_send\_thresholds | Boolean | **Optional.** Whether to send warn, crit, min & max performance data.
+ host\_tags\_template | Array | **Optional.** Allows add [tags](https://www.elastic.co/docs/reference/ecs/ecs-base#field-tags) to the document for a Host check result.
+ service\_tags\_template | Array | **Optional.** Allows add [tags](https://www.elastic.co/docs/reference/ecs/ecs-base#field-tags) to the document for a Service check result.
+ host\_labels\_template | Dictionary | **Optional.** Allows add [labels](https://www.elastic.co/docs/reference/ecs/ecs-base#field-labels) to the document for a Host check result.
+ service\_labels\_template | Dictionary | **Optional.** Allows add [labels](https://www.elastic.co/docs/reference/ecs/ecs-base#field-labels) to the document for a Service check result.
+ filter | Function | **Optional.** An expression to filter which check results should be sent to Elasticsearch. Defaults to sending all check results.
+
+#### Macro Usage (Tags, Labels & Namespace)
+
+Macros can be used inside the following template attributes:
+
+- host_tags_template (array of strings)
+- service_tags_template (array of strings)
+- host_labels_template (dictionary of key -> string value)
+- service_labels_template (dictionary of key -> string value)
+- datastream_namespace (string)
+
+Behavior:
+
+- Tags: Each array element may contain zero or more macros. If at least one macro is missing/unresolvable,
+ the entire tag element is skipped and a debug log entry is written.
+- Labels: Each dictionary value may contain macros. If at least one macro inside the value is missing, that
+ label key/value pair is skipped and a debug log entry is written.
+- Namespace: The datastream_namespace string may contain macros. If a macro is missing or resolves to an
+ empty value, the writer falls back to the default namespace "default".
+- Validation: A template string with an unterminated '$' (e.g. "$host.name") raises a configuration
+ validation error referencing the original string.
+- Macros never partially substitute: either all macros in the string resolve and the rendered value
+ is used, or (for tags/labels) the entry is skipped
+
+
+#### Normalization
+
+The resolved datastream namespace and the check name included in the datastream dataset name undergo
+normalization. any leading whitespace and leading special characters are trimmed; all remaining special
+(non-alphanumeric) characters are replaced with an underscore; consecutive underscores are collapsed and all
+characters are written in lowercase. This ensures stable index names that are
+[accepted by Elasticsearch](https://www.elastic.co/docs/api/doc/elasticsearch/operation/operation-indices-create#operation-indices-create-index).
+
+Field names for the performance data are only normalized in that they cannot contain dots (`.`) as they are
+used for field separation and would break the index templates field mapping. If any performance data label
+contains a dot, it will be replaced with an underscore.
+
+Examples:
+
+```
+object ElasticsearchDatastreamWriter "example-datastream" {
+ datastream_namespace = "$host.vars.env$" // Falls back to "default" if $host.vars.env$ is missing
+
+ host_tags_template = [
+ "env-$host.vars.env$",
+ "$host.name$"
+ ]
+
+ service_tags_template = [
+ "svc-$service.name$",
+ "$service.display_name$"
+ ]
+
+ host_labels_template = {
+ os = "$host.vars.os$"
+ fqdn = "$host.name$"
+ }
+
+ service_labels_template = {
+ check_cmd = "$service.check_command$"
+ attempted_env = "$host.vars.missing_env$" // Skipped if missing_env not set
+ }
+
+ filter = {{ service && "production" in host.groups }}
+}
+```
+
+A missing macro example for a host check result:
+- service_tags_template element "svc-$service.name$" is skipped (service not in scope).
+- service_labels_template value "$service.check_command$" is skipped for host check results.
+
+#### Filter Expression
+
+The filter accepts an expression (function literal) and only the variables host and service are available. (service is null / undefined for host check results.)
+
+Examples:
+```
+filter = {{ "production" in host.groups }}
+filter = {{ service && "linux" in host.groups }}
+```
+If the filter returns true, the check result is sent; otherwise it is skipped.
+
### ExternalCommandListener
Implements the Icinga 1.x command pipe which can be used to send commands to Icinga.
diff --git a/doc/14-features.md b/doc/14-features.md
index 5aa775c4251..ef8a4652d6d 100644
--- a/doc/14-features.md
+++ b/doc/14-features.md
@@ -439,6 +439,130 @@ The recommended way of running Elasticsearch in this scenario is a dedicated ser
where you either have the Elasticsearch HTTP API, or a TLS secured HTTP proxy,
or Logstash for additional filtering.
+
+#### Elasticsearch Datastream Writer
+
+> **Note**
+>
+> This is a newer alternative to the Elasticsearch Writer above. The Elasticsearch Datastream Writer uses
+> Elasticsearch's data stream feature and follows the Elastic Common Schema (ECS), providing better performance
+> and data organization. Use this writer for new installations. The original Elasticsearch Writer is still
+> available for backward compatibility.
+>
+> OpenSearch: The data stream mode and ECS component template usage differ slightly in OpenSearch. The
+> ElasticsearchDatastreamWriter focuses on Elasticsearch compatibility first. OpenSearch can ingest the data,
+> but you may need to adapt the installed index/component templates manually (e.g. remove time_series mode if
+> unsupported, adjust mappings). The option `manage_index_template` will not work with OpenSearch.
+
+
+This feature sends check results with performance data to an [Elasticsearch](https://www.elastic.co/products/elasticsearch) instance or cluster.
+
+> **Note**
+>
+> This feature requires Elasticsearch to support time series data streams (Elasticsearch 8.x+), and to have the ECS
+> component template installed. It was tested successfully with Elasticsearch 8.12 and 9.0.8.
+
+
+Enable the feature and restart Icinga 2.
+
+```bash
+icinga2 feature enable elasticsearchdatastream
+```
+
+The default configuration expects an Elasticsearch instance running on `localhost` on port `9200`
+ and writes to datastreams with the pattern `metrics-icinga2.-`.
+
+More configuration details can be found [here](09-object-types.md#objecttype-elasticsearchdatastreamwriter).
+
+#### Current Elasticsearch Schema
+
+The documents for the ElasticsearchDatastreamWriter try to follow the [Elastic Common Schema (ECS)](https://www.elastic.co/guide/en/ecs/current/index.html)
+version `8.0` as close as possible, with some additional changes to fit the Icinga 2 data model.
+All documents are written to a data stream of the format `metrics-icinga.-`,
+where `` is the name of the checkcommand being executed to keep the number of fields per index low
+and documents with the same performance data grouped together. `` is an optional
+configuration parameter to further separate documents, e.g. by environment like `production` or `development`.
+The `datastream_namespace` can also be used to separate documents e.g. by hostgroups or zones, by using the
+`filter` function to filter the check results and use several writers with different namespaces.
+Time‑series dimensions are applied to `host.name` and (when present) `service.name`, aligning with ECS host and service
+definitions: [ECS host fields](https://www.elastic.co/guide/en/ecs/current/ecs-host.html),
+[ECS service fields](https://www.elastic.co/guide/en/ecs/current/ecs-service.html).
+
+Icinga 2 automatically adds the following threshold metrics
+if existing:
+
+```
+perfdata..min
+perfdata..max
+perfdata..warn
+perfdata..crit
+```
+
+#### Adding additional tags and labels
+
+Additionally it is possible to configure custom tags and labels that are applied to the metrics via
+`host_tags_template`/`service_tags_template` and `host_labels_template`/`service_labels_template`
+respectively. Depending on whether the write event was triggered on a service or host object,
+additional tags are added to the ElasticSearch entries.
+
+A host metrics entry configured with the following `host_tags_template`:
+
+```
+host_tags_template = ["production", "$host.groups"]
+host_labels_template = {
+ os = "$host.vars.os$"
+}
+```
+
+Will in addition to the above mentioned lines also contain:
+
+```
+"tags": ["production", "linux-servers;group-A"],
+"labels": { "os": "Linux" }
+```
+
+#### Filtering check results
+
+You can filter which check results are sent to Elasticsearch by using the `filter` parameter.
+It takes a function (expression) evaluated for every check result and must return a boolean.
+If the function returns `true`, the check result is sent; otherwise it is skipped.
+
+Only the variables `host` and `service` are available inside this expression.
+For host check results `service` is not set (null/undefined). No other variables (such as
+the raw check result object) are exposed.
+
+Example configuration that only sends service check results for hosts in the `linux-server` hostgroup:
+
+
+```
+object ElasticsearchDatastreamWriter "elasticsearchdatastream" {
+ ...
+ datastream_namespace = "production"
+ filter = {{ service && "linux-server" in host.groups }}
+}
+```
+
+#### Elasticsearch Datastream Writer in Cluster HA Zones
+
+The Elasticsearch Datastream Writer feature supports [high availability](06-distributed-monitoring.md#distributed-monitoring-high-availability-features)
+in cluster zones.
+
+By default, all endpoints in a zone will activate the feature and start
+writing events to the Elasticsearch HTTP API. In HA enabled scenarios,
+it is possible to set `enable_ha = true` in all feature configuration
+files. This allows each endpoint to calculate the feature authority,
+and only one endpoint actively writes events, the other endpoints
+pause the feature.
+
+When the cluster connection breaks at some point, the remaining endpoint(s)
+in that zone will automatically resume the feature. This built-in failover
+mechanism ensures that events are written even if the cluster fails.
+
+The recommended way of running Elasticsearch in this scenario is a dedicated server
+where you either have the Elasticsearch HTTP API, or a TLS secured HTTP proxy,
+or Logstash for additional filtering.
+
+
### Graylog Integration
#### GELF Writer
diff --git a/etc/icinga2/features-available/elasticsearchdatastream.conf b/etc/icinga2/features-available/elasticsearchdatastream.conf
new file mode 100644
index 00000000000..668507af31a
--- /dev/null
+++ b/etc/icinga2/features-available/elasticsearchdatastream.conf
@@ -0,0 +1,82 @@
+/*
+ * The ElasticsearchDatastreamWriter feature writes Icinga 2 events to an Elasticsearch datastream.
+ * This feature requires Elasticsearch 8.12 or later.
+ */
+
+object ElasticsearchDatastreamWriter "elasticsearch" {
+ host = "127.0.0.1"
+ port = 9200
+
+ /* To enable a https connection, set enable_tls to true. */
+ // enable_tls = false
+
+ /* The datastream namespace to use. This can be used to separate different
+ * Icinga instances or let multiple Writers write to different
+ * datastreams in the same Elasticsearch cluster by using the filter option.
+ * The Elasticsearch datastream name will be
+ * "metrics-icinga2.{check}-{datastream_namespace}".
+ */
+ // datastream_namespace = "default"
+
+ /* You can authorize icinga2 through three different methods.
+ * 1. Basic authentication with username and password.
+ * 2. Bearer token authentication with api_token.
+ * 3. Client certificate authentication with cert_path and key_path.
+ */
+ // username = "icinga2"
+ // password = "changeme"
+
+ // api_token = ""
+
+ // cert_path = "/path/to/cert.pem"
+ // key_path = "/path/to/key.pem"
+ // ca_path = "/path/to/ca.pem"
+
+ /* Enable sending the threshold values as additional fields
+ * with the service check metrics. If set to true, it will
+ * send warn and crit for every performance data item.
+ */
+ // enable_send_thresholds = false
+
+ /* The flush settings control how often data is sent to Elasticsearch.
+ * You can either flush based on a time interval or the number of
+ * events in the buffer. Whichever comes first will trigger a flush.
+ */
+ // flush_threshold = 1024
+ // flush_interval = 10s
+
+ /* By default, all endpoints in a zone will activate the feature and start
+ * writing events to the Elasticsearch HTTP API. In HA enabled scenarios,
+ * it is possible to set `enable_ha = true` in all feature configuration
+ * files. This allows each endpoint to calculate the feature authority,
+ * and only one endpoint actively writes events, the other endpoints
+ * pause the feature.
+ */
+ // enable_ha = false
+
+ /* By default, the feature will create an index template in Elasticsearch
+ * for the datastreams. If you want to manage the index template yourself,
+ * set manage_index_template to false.
+ */
+ // manage_index_template = true
+
+ /* Additional tags and labels can be added to the host and service
+ * documents by using the host_tags_template, service_tags_template,
+ * host_labels_template and service_labels_template options.
+ * The tags and labels are static and will be added to every document.
+ */
+ // host_tags_template = [ "icinga", "$host.vars.os$" ]
+ // service_tags_template = [ "icinga", "$service.vars.id$" ]
+ // host_labels_template = { "env" = "production", "os" = "$host.vars.os$" }
+ // service_labels_template = { "env" = "production", "id" = "$host.vars.id$" }
+
+ /* The filter option can be used to filter which events are sent to
+ * Elasticsearch. The filter is a regular Icinga 2 filter expression.
+ * The filter is applied to both host and service events.
+ * If the filter evaluates to true, the event is sent to Elasticsearch.
+ * If the filter is not set, all events are sent to Elasticsearch.
+ * You can use any attribute of the host, service, checkable or
+ * checkresult (cr) objects in the filter expression.
+ */
+ // filter = {{ host.name == "myhost" || service.name == "myservice" }}
+}
diff --git a/lib/perfdata/CMakeLists.txt b/lib/perfdata/CMakeLists.txt
index 168938c4206..9ad8b4df97f 100644
--- a/lib/perfdata/CMakeLists.txt
+++ b/lib/perfdata/CMakeLists.txt
@@ -6,6 +6,7 @@ mkclass_target(influxdbcommonwriter.ti influxdbcommonwriter-ti.cpp influxdbcommo
mkclass_target(influxdbwriter.ti influxdbwriter-ti.cpp influxdbwriter-ti.hpp)
mkclass_target(influxdb2writer.ti influxdb2writer-ti.cpp influxdb2writer-ti.hpp)
mkclass_target(elasticsearchwriter.ti elasticsearchwriter-ti.cpp elasticsearchwriter-ti.hpp)
+mkclass_target(elasticsearchdatastreamwriter.ti elasticsearchdatastreamwriter-ti.cpp elasticsearchdatastreamwriter-ti.hpp)
mkclass_target(opentsdbwriter.ti opentsdbwriter-ti.cpp opentsdbwriter-ti.hpp)
mkclass_target(perfdatawriter.ti perfdatawriter-ti.cpp perfdatawriter-ti.hpp)
@@ -18,6 +19,7 @@ set(perfdata_SOURCES
influxdb2writer.cpp influxdb2writer.hpp influxdb2writer-ti.hpp
opentsdbwriter.cpp opentsdbwriter.hpp opentsdbwriter-ti.hpp
perfdatawriter.cpp perfdatawriter.hpp perfdatawriter-ti.hpp
+ elasticsearchdatastreamwriter.cpp elasticsearchdatastreamwriter.hpp elasticsearchdatastreamwriter-ti.hpp
)
if(ICINGA2_UNITY_BUILD)
@@ -58,6 +60,15 @@ install_if_not_exists(
${ICINGA2_CONFIGDIR}/features-available
)
+install_if_not_exists(
+ ${PROJECT_SOURCE_DIR}/usr/elasticsearch/index-template.json
+ ${ICINGA2_PKGDATADIR}/elasticsearch
+)
+install_if_not_exists(
+ ${PROJECT_SOURCE_DIR}/etc/icinga2/features-available/elasticsearchdatastream.conf
+ ${ICINGA2_CONFIGDIR}/features-available
+)
+
install_if_not_exists(
${PROJECT_SOURCE_DIR}/etc/icinga2/features-available/opentsdb.conf
${ICINGA2_CONFIGDIR}/features-available
diff --git a/lib/perfdata/elasticsearchdatastreamwriter.cpp b/lib/perfdata/elasticsearchdatastreamwriter.cpp
new file mode 100644
index 00000000000..946adc3c183
--- /dev/null
+++ b/lib/perfdata/elasticsearchdatastreamwriter.cpp
@@ -0,0 +1,897 @@
+/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
+
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+
+#include "base/application.hpp"
+#include "base/base64.hpp"
+#include "base/defer.hpp"
+#include "base/dictionary.hpp"
+#include "base/exception.hpp"
+#include "base/io-engine.hpp"
+#include "base/json.hpp"
+#include "base/logger.hpp"
+#include "base/perfdatavalue.hpp"
+#include "base/statsfunction.hpp"
+#include "base/stream.hpp"
+#include "base/string.hpp"
+#include "base/tcpsocket.hpp"
+#include "base/tlsstream.hpp"
+#include "base/utility.hpp"
+#include "icinga/compatutility.hpp"
+#include "icinga/macroprocessor.hpp"
+#include "icinga/service.hpp"
+#include "remote/url.hpp"
+
+#include "perfdata/elasticsearchdatastreamwriter.hpp"
+#include "perfdata/elasticsearchdatastreamwriter-ti.cpp"
+
+using namespace icinga;
+namespace beast = boost::beast;
+namespace http = beast::http;
+
+static void ExceptionHandler(const boost::exception_ptr& exp);
+static Array::Ptr ExtractTemplateTags(const MacroProcessor::ResolverList& resolvers, const Array::Ptr& tagsTmpl,
+ const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
+static Dictionary::Ptr ExtractTemplateLabels(const MacroProcessor::ResolverList& resolvers, const Dictionary::Ptr& labelsTmpl,
+ const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
+static String NormalizeElasticsearchFieldName(const String& fieldName);
+static String NormalizeElasticsearchIndexPart(const String& fieldName);
+
+REGISTER_TYPE(ElasticsearchDatastreamWriter);
+
+REGISTER_STATSFUNCTION(ElasticsearchDatastreamWriter, &ElasticsearchDatastreamWriter::StatsFunc);
+
+void ElasticsearchDatastreamWriter::OnConfigLoaded()
+{
+ ObjectImpl::OnConfigLoaded();
+
+ m_WorkQueue.SetName("ElasticsearchDatastreamWriter, " + GetName());
+
+ if (!GetEnableHa()) {
+ Log(LogDebug, "ElasticsearchDatastreamWriter")
+ << "HA functionality disabled. Won't pause connection: " << GetName();
+
+ SetHAMode(HARunEverywhere);
+ } else {
+ SetHAMode(HARunOnce);
+ }
+}
+
+void ElasticsearchDatastreamWriter::StatsFunc(const Dictionary::Ptr& status, const Array::Ptr&)
+{
+ for (const ElasticsearchDatastreamWriter::Ptr& writer : ConfigType::GetObjectsByType()) {
+ status->Set(
+ writer->GetName(),
+ new Dictionary({
+ { "work_queue_items", writer->m_WorkQueue.GetTaskCount(60) / 60.0 },
+ { "work_queue_item_rate", writer->m_WorkQueue.GetLength() },
+ { "documents_sent", writer->m_DocumentsSent },
+ { "documents_sent_error", writer->m_DocumentsFailed },
+ { "checkresults_filtered_out", writer->m_ItemsFilteredOut.load() }
+ })
+ );
+ }
+}
+
+void ElasticsearchDatastreamWriter::Resume()
+{
+ ObjectImpl::Resume();
+
+ Log(LogInformation, "ElasticsearchDatastreamWriter")
+ << "'" << GetName() << "' resumed.";
+ m_Paused = false;
+ m_WorkQueue.SetExceptionCallback([](boost::exception_ptr exp) { ExceptionHandler(exp); });
+
+ if (GetManageIndexTemplate()) {
+ // Ensure index template exists/is updated as the first item on the work queue.
+ m_WorkQueue.Enqueue([this]() { ManageIndexTemplate(); });
+ }
+
+ /* Setup timer for periodically flushing m_DataBuffer */
+ m_FlushTimer = Timer::Create();
+ m_FlushTimer->SetInterval(GetFlushInterval());
+ m_FlushTimer->OnTimerExpired.connect([this](const Timer * const&) {
+ m_WorkQueue.Enqueue([this]() { Flush(); });
+ });
+ m_FlushTimer->Start();
+
+ /* Register for new metrics. */
+ m_HandleCheckResults = Checkable::OnNewCheckResult.connect(
+ [this](const Checkable::Ptr& checkable, const CheckResult::Ptr& cr, const MessageOrigin::Ptr&) {
+ CheckResultHandler(checkable, cr);
+ }
+ );
+}
+
+/* Pause is equivalent to Stop, but with HA capabilities to resume at runtime. */
+void ElasticsearchDatastreamWriter::Pause()
+{
+ m_HandleCheckResults.disconnect();
+ m_FlushTimer->Stop(true);
+ m_Paused = true;
+ m_WorkQueue.Enqueue([this]() { Flush(); });
+ m_WorkQueue.Join();
+
+ Log(LogInformation, "ElasticsearchDatastreamWriter")
+ << "'" << GetName() << "' paused.";
+
+ ObjectImpl::Pause();
+}
+
+void ElasticsearchDatastreamWriter::ManageIndexTemplate()
+{
+ AssertOnWorkQueue();
+
+ String templatePath = ICINGA_PKGDATADIR "/elasticsearch/index-template.json";
+ std::ifstream templateFile{ templatePath, std::ifstream::in };
+ if (!templateFile.is_open()) {
+ Log(LogCritical, "ElasticsearchDatastreamWriter")
+ << "Could not open index template file: " << templatePath;
+ return;
+ }
+ std::ostringstream templateStream;
+ templateStream << templateFile.rdbuf();
+ String templateJson = templateStream.str();
+ templateFile.close();
+ Log(LogDebug, "ElasticsearchDatastreamWriter")
+ << "Read index template from " << templatePath;
+ Log(LogDebug, "ElasticsearchDatastreamWriter") << templateJson;
+
+ Url::Ptr indexTemplateUrl = new Url();
+ indexTemplateUrl->SetScheme(GetEnableTls() ? "https" : "http");
+ indexTemplateUrl->SetHost(GetHost());
+ indexTemplateUrl->SetPort(GetPort());
+ indexTemplateUrl->SetPath({ "_index_template", "icinga2-metrics" });
+
+ Url::Ptr componentTemplateUrl = new Url();
+ componentTemplateUrl->SetScheme(GetEnableTls() ? "https" : "http");
+ componentTemplateUrl->SetHost(GetHost());
+ componentTemplateUrl->SetPort(GetPort());
+ componentTemplateUrl->SetPath({ "_component_template", "metrics-icinga2@custom" });
+ componentTemplateUrl->SetQuery({ {"create", "true"} });
+
+ while (true) {
+ if (m_Paused) {
+ Log(LogInformation, "ElasticsearchDatastreamWriter")
+ << "Shutdown in progress, aborting index template management.";
+ return;
+ }
+
+ try {
+ Dictionary::Ptr jsonResponse = TrySend(componentTemplateUrl, "{\"template\":{}}");
+ Log(LogInformation, "ElasticsearchDatastreamWriter")
+ << "Successfully installed component template 'icinga2@custom'.";
+ } catch (const StatusCodeException& es) {
+ if (es.GetStatusCode() == http::status::bad_request) {
+ Log(LogInformation, "ElasticsearchDatastreamWriter")
+ << "Component template 'metrics-icinga2@custom' already exists, skipping creation.";
+ // Continue to install/update index template
+ } else {
+ Log(LogWarning, "ElasticsearchDatastreamWriter")
+ << "Failed to install component template 'icinga2@custom', retrying in 5 seconds: " << DiagnosticInformation(es, false);
+ Log(LogDebug, "ElasticsearchDatastreamWriter")
+ << "Additional information:\n" << DiagnosticInformation(es, true);
+ Utility::Sleep(5);
+ continue;
+ }
+ } catch (const std::exception& ex) {
+ Log(LogWarning, "ElasticsearchDatastreamWriter")
+ << "Failed to install component template 'icinga2@custom', retrying in 5 seconds: " << DiagnosticInformation(ex, false);
+ Log(LogDebug, "ElasticsearchDatastreamWriter")
+ << "Additional information:\n" << DiagnosticInformation(ex, true);
+ Utility::Sleep(5);
+ continue;
+ }
+
+ try {
+ // don't move, as we retry in a loop if it fails.
+ Dictionary::Ptr jsonResponse = TrySend(indexTemplateUrl, templateJson);
+ Log(LogInformation, "ElasticsearchDatastreamWriter")
+ << "Successfully installed/updated index template 'icinga2-metrics'.";
+ Log(LogDebug, "ElasticsearchDatastreamWriter")
+ << "Response: " << JsonEncode(jsonResponse);
+ break;
+ } catch (const std::exception& ex) {
+ Log(LogWarning, "ElasticsearchDatastreamWriter")
+ << "Failed to install/update index template 'icinga2-metrics', retrying in 5 seconds: " << DiagnosticInformation(ex, false);
+ Log(LogDebug, "ElasticsearchDatastreamWriter")
+ << "Additional information:\n" << DiagnosticInformation(ex, true);
+ Utility::Sleep(5);
+ }
+ }
+}
+
+Dictionary::Ptr ElasticsearchDatastreamWriter::ExtractPerfData(const Checkable::Ptr& checkable, const Array::Ptr& perfdata)
+{
+ Dictionary::Ptr pdFields = new Dictionary();
+ if (!perfdata)
+ return pdFields;
+
+ ObjectLock olock(perfdata);
+ for (const Value& val : perfdata) {
+ PerfdataValue::Ptr pdv;
+
+ if (val.IsObjectType())
+ pdv = val;
+ else {
+ try {
+ pdv = PerfdataValue::Parse(val);
+ } catch (const std::exception&) {
+ Log(LogWarning, "ElasticsearchDatastreamWriter")
+ << "Ignoring invalid perfdata for checkable '"
+ << checkable->GetName() << "' with value: " << val;
+ continue;
+ }
+ }
+
+ Dictionary::Ptr metric = new Dictionary();
+ metric->Set("value", pdv->GetValue());
+ if (pdv->GetCounter()) metric->Set("counter", pdv->GetCounter());
+
+ if (!pdv->GetMin().IsEmpty()) metric->Set("min", pdv->GetMin());
+ if (!pdv->GetMax().IsEmpty()) metric->Set("max", pdv->GetMax());
+ if (!pdv->GetUnit().IsEmpty()) metric->Set("unit", pdv->GetUnit());
+
+ if (!pdv->GetWarn().IsEmpty() && GetEnableSendThresholds())
+ metric->Set("warn", pdv->GetWarn());
+ if (!pdv->GetCrit().IsEmpty() && GetEnableSendThresholds())
+ metric->Set("crit", pdv->GetCrit());
+
+ String label = NormalizeElasticsearchFieldName(pdv->GetLabel());
+ pdFields->Set(label, metric);
+ }
+
+ return pdFields;
+}
+
+// user-defined tags, as specified in the ECS specification: https://www.elastic.co/docs/reference/ecs/ecs-base#field-tags
+static Array::Ptr ExtractTemplateTags(const MacroProcessor::ResolverList& resolvers,
+ const Array::Ptr& tagsTmpl, const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
+{
+ if (tagsTmpl == nullptr) {
+ return nullptr;
+ }
+
+ ObjectLock olock(tagsTmpl);
+ Array::Ptr tags = new Array();
+ for (const String tag : tagsTmpl) {
+ String missingMacro;
+ Value value = MacroProcessor::ResolveMacros(tag, resolvers, cr, &missingMacro);
+ if (!missingMacro.IsEmpty()) {
+ Log(LogDebug, "ElasticsearchDatastreamWriter")
+ << "Missing macro for tag: " << tag
+ << ". Missing: " << missingMacro
+ << " for checkable '" << checkable->GetName() << "'. Skipping.";
+ continue;
+ }
+ tags->Add(value);
+ }
+
+ return tags;
+}
+
+// user-defined labels, as specified in the ECS specification: https://www.elastic.co/docs/reference/ecs/ecs-base#field-labels
+static Dictionary::Ptr ExtractTemplateLabels(const MacroProcessor::ResolverList& resolvers,
+ const Dictionary::Ptr& labelsTmpl, const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
+{
+ if (labelsTmpl == nullptr) {
+ return nullptr;
+ }
+
+ ObjectLock olock(labelsTmpl);
+ Dictionary::Ptr labels = new Dictionary();
+ for (const Dictionary::Pair& labelKv : labelsTmpl) {
+ String missingMacro;
+ Value value = MacroProcessor::ResolveMacros(labelKv.second, resolvers, cr, &missingMacro);
+ if (!missingMacro.IsEmpty()) {
+ Log(LogDebug, "ElasticsearchDatastreamWriter")
+ << "Missing macro for label: " << labelKv.first
+ << "Label: " << labelKv.second
+ << ". Missing: " << missingMacro
+ << " for checkable '" << checkable->GetName() << "'. Skipping.";
+ continue;
+ }
+ labels->Set(labelKv.first, value);
+ }
+
+ return labels;
+}
+
+String ElasticsearchDatastreamWriter::ExtractDatastreamNamespace(const MacroProcessor::ResolverList& resolvers,
+ const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
+{
+ String namespaceTmpl = GetDatastreamNamespace();
+
+ String missingMacro;
+ Value value = MacroProcessor::ResolveMacros(namespaceTmpl, resolvers, cr, &missingMacro);
+ if (!missingMacro.IsEmpty() || value.IsEmpty()) {
+ Log(LogDebug, "ElasticsearchDatastreamWriter")
+ << "Missing value for namespace. Missing: " << missingMacro
+ << " for checkable '" << checkable->GetName() << "'. Skipping.";
+ return "default";
+ }
+
+ return NormalizeElasticsearchIndexPart(value);
+}
+
+bool ElasticsearchDatastreamWriter::Filter(const Checkable::Ptr& checkable)
+{
+ if (!m_CompiledFilter)
+ return true;
+
+ auto [host, service] = GetHostService(checkable);
+
+ Namespace::Ptr frameNS = new Namespace();
+ frameNS->Set("host", host);
+ frameNS->Set("service", service);
+
+ ScriptFrame frame(true, frameNS);
+ return Convert::ToBool(m_CompiledFilter->Evaluate(frame));
+}
+
+void ElasticsearchDatastreamWriter::CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr)
+{
+ if (IsPaused())
+ return;
+
+ if (!IcingaApplication::GetInstance()->GetEnablePerfdata() || !checkable->GetEnablePerfdata())
+ return;
+
+ if (!Filter(checkable)) {
+ m_ItemsFilteredOut.fetch_add(1);
+ Log(LogDebug, "ElasticsearchDatastreamWriter")
+ << "Check result for checkable '" << checkable->GetName() << "' filtered out.";
+ return;
+ }
+
+ auto [host, service] = GetHostService(checkable);
+
+ /* ECS host object population
+ * References:
+ * https://www.elastic.co/guide/en/ecs/current/ecs-host.html
+ * The classic ECS host fields (name, hostname) are extended here with
+ * Icinga-specific state information (soft_state, hard_state) to aid consumers
+ * correlating monitoring state with metrics.
+ */
+ Dictionary::Ptr ecsHost = new Dictionary({
+ {"name", host->GetDisplayName()},
+ {"hostname", host->GetName()},
+ {"soft_state", host->GetState()},
+ {"hard_state", host->GetLastHardState()}
+ });
+ if (!host->GetZoneName().IsEmpty()) ecsHost->Set("zone", host->GetZoneName());
+
+ Dictionary::Ptr ecsService;
+ if (service) {
+ /* ECS service object population
+ * References:
+ * https://www.elastic.co/guide/en/ecs/current/ecs-service.html
+ * We include ECS service 'name' plus Icinga display/state details.
+ * The added state fields (soft_state, hard_state) are Icinga-centric
+ * extensions.
+ */
+ ecsService = new Dictionary({
+ {"name", service->GetName()},
+ {"display_name", service->GetDisplayName()},
+ {"soft_state", service->GetState()},
+ {"hard_state", service->GetLastHardState()}
+ });
+ if (!service->GetZoneName().IsEmpty()) ecsService->Set("zone", service->GetZoneName());
+ }
+
+
+ Dictionary::Ptr checkResult = new Dictionary({
+ {"current_check_attempts", checkable->GetCheckAttempt()},
+ {"reachable", checkable->IsReachable()}
+ });
+
+ m_WorkQueue.Enqueue([this, cr, checkable, host = std::move(host), service = std::move(service), ecsHost = std::move(ecsHost),
+ ecsService = std::move(ecsService), checkResult = std::move(checkResult)]()
+ {
+ MacroProcessor::ResolverList resolvers;
+ resolvers.emplace_back("host", host);
+ if (service) {
+ resolvers.emplace_back("service", service);
+ }
+
+ Dictionary::Ptr ecs_metadata = new Dictionary({
+ { "version", "8.0.0" }
+ });
+
+ String datastreamNamespace = ExtractDatastreamNamespace(resolvers, checkable, cr);
+ String datastreamDataset = "icinga2." + NormalizeElasticsearchIndexPart(checkable->GetCheckCommandRaw());
+ String datastreamName = "metrics-" + datastreamDataset + "-" + datastreamNamespace;
+ Dictionary::Ptr data_stream = new Dictionary({
+ {"type", "metrics"},
+ {"dataset", datastreamDataset},
+ {"namespace", datastreamNamespace}
+ });
+
+ char _addr[16];
+ if (!host->GetAddress().IsEmpty() && inet_pton(AF_INET, host->GetAddress().CStr(), _addr) == 1) {
+ ecsHost->Set("ip", host->GetAddress());
+ } else if (!host->GetAddress6().IsEmpty() && inet_pton(AF_INET6, host->GetAddress6().CStr(), _addr) == 1) {
+ ecsHost->Set("ip", host->GetAddress6());
+ } else if (!host->GetAddress().IsEmpty()) {
+ ecsHost->Set("fqdn", host->GetAddress());
+ }
+
+ Dictionary::Ptr ecsAgent = new Dictionary({
+ {"type", "icinga2"},
+ {"name", cr->GetCheckSource()}
+ });
+ Endpoint::Ptr endpoint = checkable->GetCommandEndpoint();
+ if (endpoint) {
+ ecsAgent->Set("id", endpoint->GetName());
+ ecsAgent->Set("version", endpoint->GetIcingaVersionString());
+ } else {
+ ecsAgent->Set("id", IcingaApplication::GetInstance()->GetName());
+ ecsAgent->Set("version", IcingaApplication::GetAppSpecVersion());
+ }
+
+ Dictionary::Ptr ecsEvent = new Dictionary({
+ {"created", FormatTimestamp(cr->GetScheduleEnd())},
+ {"start", FormatTimestamp(cr->GetExecutionStart())},
+ {"end", FormatTimestamp(cr->GetExecutionEnd())},
+ {"kind", "metric"},
+ {"module", "icinga2"}
+ });
+
+ checkResult->Set("exit_status", cr->GetExitStatus());
+ checkResult->Set("execution_time", cr->CalculateExecutionTime());
+ checkResult->Set("latency", cr->CalculateLatency());
+ checkResult->Set("schedule_start", FormatTimestamp(cr->GetScheduleStart()));
+ checkResult->Set("schedule_end", FormatTimestamp(cr->GetScheduleEnd()));
+ checkResult->Set("active", cr->GetActive());
+ checkResult->Set("max_attempts", checkable->GetMaxCheckAttempts());
+
+ Dictionary::Ptr document = new Dictionary({
+ {"@timestamp", FormatTimestamp(cr->GetExecutionEnd())},
+ { "ecs", ecs_metadata },
+ { "data_stream", data_stream },
+ { "host", ecsHost },
+ { "agent", ecsAgent },
+ { "event", ecsEvent },
+ { "check", checkResult },
+ { "message", cr->GetOutput() },
+ });
+
+ Dictionary::Ptr perfdata = ExtractPerfData(checkable, cr->GetPerformanceData());
+ if (perfdata->GetLength() != 0) {
+ document->Set("perfdata", perfdata);
+ }
+
+ Array::Ptr ecsTags = ExtractTemplateTags(
+ resolvers, service ? GetServiceTagsTemplate() : GetHostTagsTemplate(), checkable, cr);
+ if (ecsTags && ecsTags->GetLength() != 0 ) {
+ document->Set("tags", ecsTags);
+ }
+
+ Dictionary::Ptr ecsLabels = ExtractTemplateLabels(
+ resolvers, service ? GetServiceLabelsTemplate() : GetHostLabelsTemplate(), checkable, cr);
+ if (ecsLabels && ecsLabels->GetLength() != 0 ) {
+ document->Set("labels", ecsLabels);
+ }
+
+ if (ecsService && ecsService->GetLength() != 0 ) {
+ document->Set("service", ecsService);
+ }
+
+ m_DataBuffer.emplace_back(new EcsDocument(datastreamName,document));
+ if (static_cast(m_DataBuffer.size()) >= GetFlushThreshold()) {
+ Flush();
+ }
+ });
+}
+
+void ElasticsearchDatastreamWriter::Flush()
+{
+ AssertOnWorkQueue();
+
+ /* Flush can be called from 1) Timeout 2) Threshold 3) on shutdown/reload. */
+ if (m_DataBuffer.empty())
+ return;
+
+ Url::Ptr url = new Url();
+ url->SetScheme(GetEnableTls() ? "https" : "http");
+ url->SetHost(GetHost());
+ url->SetPort(GetPort());
+ url->SetPath({ "_bulk" });
+
+ String body = String();
+ for (const auto &document : m_DataBuffer) {
+ Dictionary::Ptr index = new Dictionary({
+ { "create", new Dictionary({ { "_index", document->GetIndex() } }) }
+ });
+
+ body += JsonEncode(index) + "\n";
+ body += JsonEncode(document->GetDocument()) + "\n";
+ }
+
+ Dictionary::Ptr jsonResponse;
+ while (true) {
+ try {
+ jsonResponse = TrySend(url, std::move(body));
+ m_DocumentsSent += m_DataBuffer.size();
+ break;
+ } catch (const std::exception& ex) {
+ if (m_Paused) {
+ // We are shutting down, don't retry.
+ Log(LogWarning, "ElasticsearchDatastreamWriter")
+ << "Flush failed during shutdown, dropping " << m_DataBuffer.size() << " documents: " << DiagnosticInformation(ex, false);
+ m_DataBuffer.clear();
+ break;
+ }
+
+ Log(LogWarning, "ElasticsearchDatastreamWriter")
+ << "Flush failed, retrying in 5 seconds: " << DiagnosticInformation(ex, false);
+ Log(LogDebug, "ElasticsearchDatastreamWriter")
+ << "Additional information:\n" << DiagnosticInformation(ex, true);
+ Utility::Sleep(5);
+ }
+ }
+
+ Log(LogInformation, "ElasticsearchDatastreamWriter")
+ << "Successfully sent " << m_DataBuffer.size()
+ << " documents to Elasticsearch. Took "
+ << jsonResponse->Get("took") << "ms.";
+
+ Value errors = jsonResponse->Get("errors");
+ if (!errors.ToBool()) {
+ Log(LogDebug, "ElasticsearchDatastreamWriter") << "No errors during write operation.";
+ m_DataBuffer.clear();
+ return;
+ }
+
+ Array::Ptr items = jsonResponse->Get("items");
+ int c = 0;
+ ObjectLock olock(items);
+ for (const Dictionary::Ptr value : items) {
+ Dictionary::Ptr itemResult = value->Get("create");
+ int itemResultStatus = itemResult->Get("status");
+ if (itemResultStatus > 299) {
+ m_DocumentsFailed += 1;
+ Dictionary::Ptr itemError = itemResult->Get("error");
+ String value = itemError->Get("type") + ": " + itemError->Get("reason");
+ Log(LogWarning, "ElasticsearchDatastreamWriter")
+ << "Error during document creation: " << value;
+
+ if (c >= m_DataBuffer.size()) {
+ Log(LogCritical, "ElasticsearchDatastreamWriter")
+ << "Got an error response from elasticsearch ouside the sent messages";
+ break;
+ }
+
+ Log(LogDebug, "ElasticsearchDatastreamWriter")
+ << "Error response: " << JsonEncode(itemResult) << "\n"
+ << "Document: " << JsonEncode(m_DataBuffer[c]->GetDocument());
+ }
+ ++c;
+ }
+ m_DataBuffer.clear();
+}
+
+Value ElasticsearchDatastreamWriter::TrySend(const Url::Ptr& url, String body)
+{
+ // Make sure we are not randomly flushing from a timeout under load.
+ m_FlushTimer->Reschedule(-1);
+
+ http::request request (http::verb::post, std::string(url->Format(true)), 10);
+ request.set(http::field::user_agent, "Icinga/" + Application::GetAppSpecVersion());
+ request.set(http::field::host, url->GetHost() + ":" + url->GetPort());
+
+ /* Specify required headers by Elasticsearch. */
+ request.set(http::field::accept, "application/json");
+
+ /* Use application/x-ndjson for bulk streams. While ES
+ * is able to handle application/json, the newline separator
+ * causes problems with Logstash (#6609).
+ */
+ request.set(http::field::content_type, "application/x-ndjson");
+
+ /* Send authentication if configured. */
+ String username = GetUsername();
+ String password = GetPassword();
+ String apiToken = GetApiToken();
+
+ if (!username.IsEmpty() && !password.IsEmpty()) {
+ request.set(http::field::authorization, "Basic " + Base64::Encode(username + ":" + password));
+ } else if (!apiToken.IsEmpty()) {
+ request.set(http::field::authorization, "ApiKey " + apiToken);
+ }
+
+ request.body() = std::move(body);
+ request.content_length(request.body().size());
+
+ Log(LogDebug, "ElasticsearchDatastreamWriter")
+ << "Sending " << request.method_string() << " request" << ((!username.IsEmpty() && !password.IsEmpty()) ? " with basic auth" : "" )
+ << " to '" << url->Format() << "'.";
+
+ http::parser parser;
+ beast::flat_buffer buf;
+
+ OptionalTlsStream stream = Connect();
+ Defer closeStream([&stream]() {
+ if (stream.first) {
+ stream.first->lowest_layer().close();
+ } else if (stream.second) {
+ stream.second->lowest_layer().close();
+ }
+ });
+
+ try {
+ if (stream.first) {
+ http::write(*stream.first, request);
+ stream.first->flush();
+ http::read(*stream.first, buf, parser);
+ } else {
+ http::write(*stream.second, request);
+ stream.second->flush();
+ http::read(*stream.second, buf, parser);
+ }
+ } catch (const std::exception&) {
+ Log(LogWarning, "ElasticsearchDatastreamWriter")
+ << "Cannot perform http request API on host '" << GetHost() << "' port '" << GetPort() << "'.";
+ throw;
+ }
+
+ auto& response (parser.get());
+ if (response.result_int() > 299) {
+ Log(LogDebug, "ElasticsearchDatastreamWriter")
+ << "Unexpected response code " << static_cast(response.result()) << " from URL '" << url->Format() << "'. Error: " << response.body();
+ BOOST_THROW_EXCEPTION(StatusCodeException(
+ response.result(),
+ "Unexpected response code from Elasticsearch",
+ response.body()
+ ));
+ }
+
+ auto& contentType (response[http::field::content_type]);
+ /* Accept application/json with optional charset (any variant), case-insensitive. */
+ std::string ctLower = std::string(contentType.data(), contentType.size());
+ boost::trim(ctLower);
+ boost::to_lower(ctLower);
+ if (!(ctLower == "application/json" || ctLower == "application/json; charset=utf-8")) {
+ Log(LogCritical, "ElasticsearchDatastreamWriter") << "Unexpected Content-Type: '" << ctLower << "'";
+ BOOST_THROW_EXCEPTION(std::runtime_error(String("Unexpected Content-Type '" + ctLower + "'")));
+ }
+
+ auto& responseBody (response.body());
+
+ Dictionary::Ptr jsonResponse;
+ try {
+ return JsonDecode(responseBody);
+ } catch (...) {
+ Log(LogWarning, "ElasticsearchDatastreamWriter")
+ << "Unable to parse JSON response:\n" << responseBody;
+ throw;
+ }
+}
+
+OptionalTlsStream ElasticsearchDatastreamWriter::Connect()
+{
+ Log(LogNotice, "ElasticsearchDatastreamWriter")
+ << "Connecting to Elasticsearch on host '" << GetHost() << "' port '" << GetPort() << "'.";
+
+ OptionalTlsStream stream;
+ bool tls = GetEnableTls();
+
+ if (tls) {
+ Shared::Ptr sslContext;
+
+ try {
+ sslContext = MakeAsioSslContext(GetCertPath(), GetKeyPath(), GetCaPath());
+ } catch (const std::exception&) {
+ Log(LogWarning, "ElasticsearchDatastreamWriter")
+ << "Unable to create SSL context.";
+ throw;
+ }
+
+ stream.first = Shared::Make(IoEngine::Get().GetIoContext(), *sslContext, GetHost());
+
+ } else {
+ stream.second = Shared::Make(IoEngine::Get().GetIoContext());
+ }
+
+ try {
+ icinga::Connect(tls ? stream.first->lowest_layer() : stream.second->lowest_layer(), GetHost(), GetPort());
+ } catch (const std::exception&) {
+ Log(LogWarning, "ElasticsearchDatastreamWriter")
+ << "Can't connect to Elasticsearch on host '" << GetHost() << "' port '" << GetPort() << "'.";
+ throw;
+ }
+
+ if (tls) {
+ auto& tlsStream (stream.first->next_layer());
+
+ try {
+ tlsStream.handshake(boost::asio::ssl::stream_base::client);
+ } catch (const std::exception&) {
+ Log(LogWarning, "ElasticsearchDatastreamWriter")
+ << "TLS handshake with host '" << GetHost() << "' on port " << GetPort() << " failed.";
+ throw;
+ }
+
+ if (!GetInsecureNoverify()) {
+ if (!tlsStream.GetPeerCertificate()) {
+ BOOST_THROW_EXCEPTION(std::runtime_error("Elasticsearch didn't present any TLS certificate."));
+ }
+
+ if (!tlsStream.IsVerifyOK()) {
+ BOOST_THROW_EXCEPTION(std::runtime_error(
+ "TLS certificate validation failed: " + std::string(tlsStream.GetVerifyError())
+ ));
+ }
+ }
+ }
+
+ return stream;
+}
+
+void ElasticsearchDatastreamWriter::AssertOnWorkQueue()
+{
+ ASSERT(m_WorkQueue.IsWorkerThread());
+}
+
+static void ExceptionHandler(const boost::exception_ptr& exp)
+{
+ Log(LogCritical, "ElasticsearchDatastreamWriter", "Exception during Elastic operation: Verify that your backend is operational!");
+
+ Log(LogDebug, "ElasticsearchDatastreamWriter")
+ << "Exception during Elasticsearch operation: " << DiagnosticInformation(exp);
+}
+
+String ElasticsearchDatastreamWriter::FormatTimestamp(double ts)
+{
+ /* The date format must match the default dynamic date detection
+ * pattern in indexes. This enables applications like Kibana to
+ * detect a qualified timestamp index for time-series data.
+ *
+ * Example: 2017-09-11T10:56:21.463+0200
+ *
+ * References:
+ * https://www.elastic.co/guide/en/elasticsearchdatastream/reference/current/dynamic-field-mapping.html#date-detection
+ * https://www.elastic.co/guide/en/elasticsearchdatastream/reference/current/mapping-date-format.html
+ * https://www.elastic.co/guide/en/elasticsearchdatastream/reference/current/date.html
+ */
+ auto milliSeconds = static_cast((ts - static_cast(ts)) * 1000);
+
+ return Utility::FormatDateTime("%Y-%m-%dT%H:%M:%S", ts) + "." + Convert::ToString(milliSeconds) + Utility::FormatDateTime("%z", ts);
+}
+
+void ElasticsearchDatastreamWriter::ValidateTagsTemplate(const Array::Ptr& tags)
+{
+ ObjectLock olock(tags);
+ for (const Value& tag : tags) {
+ if (!MacroProcessor::ValidateMacroString(tag)) {
+ BOOST_THROW_EXCEPTION(ValidationError(this, { "host_tags_template" }, "Closing $ not found in macro format string '" + tag + "'."));
+ }
+ }
+}
+
+void ElasticsearchDatastreamWriter::ValidateHostTagsTemplate(const Lazy& lvalue, const ValidationUtils& utils)
+{
+ ObjectImpl::ValidateHostTagsTemplate(lvalue, utils);
+ auto& tags = lvalue();
+ if (tags) {
+ ValidateTagsTemplate(tags);
+ }
+}
+
+void ElasticsearchDatastreamWriter::ValidateServiceTagsTemplate(const Lazy& lvalue, const ValidationUtils& utils)
+{
+ ObjectImpl::ValidateServiceTagsTemplate(lvalue, utils);
+ auto& tags = lvalue();
+ if (tags) {
+ ValidateTagsTemplate(tags);
+ }
+}
+
+void ElasticsearchDatastreamWriter::ValidateLabelsTemplate(const Dictionary::Ptr& labels)
+{
+ ObjectLock olock(labels);
+ for (const Dictionary::Pair& kv : labels) {
+ if (!MacroProcessor::ValidateMacroString(kv.second)) {
+ BOOST_THROW_EXCEPTION(ValidationError(this, { "host_tags_template", kv.first }, "Closing $ not found in macro format string '" + kv.second + "'."));
+ }
+ }
+}
+
+void ElasticsearchDatastreamWriter::ValidateHostLabelsTemplate(const Lazy& lvalue, const ValidationUtils& utils)
+{
+ ObjectImpl::ValidateHostLabelsTemplate(lvalue, utils);
+ auto& labels = lvalue();
+ if (labels) {
+ ValidateLabelsTemplate(labels);
+ }
+}
+
+void ElasticsearchDatastreamWriter::ValidateServiceLabelsTemplate(const Lazy& lvalue, const ValidationUtils& utils)
+{
+ ObjectImpl::ValidateServiceLabelsTemplate(lvalue, utils);
+ auto& labels = lvalue();
+ if (labels) {
+ ValidateLabelsTemplate(labels);
+ }
+}
+
+void ElasticsearchDatastreamWriter::ValidateFilter(const Lazy &lvalue, const ValidationUtils &)
+{
+ const Value &filter = lvalue();
+ if (filter.IsEmpty()) {
+ return;
+ }
+
+ if (!filter.IsObjectType()) {
+ BOOST_THROW_EXCEPTION(ValidationError(this, { "filter" }, "Filter must be an expression."));
+ }
+
+ std::vector > args;
+ args.emplace_back(new GetScopeExpression(ScopeThis));
+ std::unique_ptr indexer{new IndexerExpression(
+ std::unique_ptr(MakeLiteral(filter)),
+ std::unique_ptr(MakeLiteral("call"))
+ )};
+ FunctionCallExpression *fexpr = new FunctionCallExpression(std::move(indexer), std::move(args));
+
+ m_CompiledFilter = fexpr;
+}
+
+static String NormalizeElasticsearchIndexPart(const String& fieldName)
+{
+ String normalized("");
+
+ bool first_char = true;
+ bool preceding_invalid = false;
+ for (char& c : boost::trim_copy(fieldName)) {
+ if (!std::isalnum(static_cast(c))) {
+ if (first_char || preceding_invalid) continue;
+ preceding_invalid = true;
+ c = '_';
+ } else {
+ first_char = false;
+ preceding_invalid = false;
+ }
+
+ normalized += tolower(c);
+ }
+
+ return normalized;
+}
+
+static String NormalizeElasticsearchFieldName(const String& fieldName)
+{
+ String normalized("");
+
+ for (char& c : boost::trim_copy(fieldName)) {
+ // Elasticsearch uses dots for field separation, so we need to avoid them here.
+ if (c == '.') {
+ c = '_';
+ }
+
+ normalized += tolower(c);
+ }
+
+ return normalized;
+}
+
+EcsDocument::EcsDocument(String index, Dictionary::Ptr document)
+ : m_Index(std::move(index)), m_Document(std::move(document))
+{}
diff --git a/lib/perfdata/elasticsearchdatastreamwriter.hpp b/lib/perfdata/elasticsearchdatastreamwriter.hpp
new file mode 100644
index 00000000000..da3234c0be9
--- /dev/null
+++ b/lib/perfdata/elasticsearchdatastreamwriter.hpp
@@ -0,0 +1,115 @@
+/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
+
+#ifndef ELASTICSEARCHDATASTREAMWRITER_H
+#define ELASTICSEARCHDATASTREAMWRITER_H
+
+#include
+#include
+
+#include "base/configobject.hpp"
+#include "base/dictionary.hpp"
+#include "base/shared-object.hpp"
+#include "base/workqueue.hpp"
+#include "base/timer.hpp"
+#include "base/tlsstream.hpp"
+#include "config/expression.hpp"
+#include "icinga/checkable.hpp"
+#include "icinga/checkresult.hpp"
+#include "icinga/macroprocessor.hpp"
+#include "remote/url.hpp"
+
+#include "perfdata/elasticsearchdatastreamwriter-ti.hpp"
+
+namespace icinga
+{
+
+class EcsDocument final : public SharedObject {
+ public:
+ DECLARE_PTR_TYPEDEFS(EcsDocument);
+
+ EcsDocument() = default;
+ EcsDocument(String index, Dictionary::Ptr document);
+
+ String GetIndex() const { return m_Index; }
+ void SetIndex(const String& index) { m_Index = index; }
+ Dictionary::Ptr GetDocument() const { return m_Document; }
+ void SetDocument(Dictionary::Ptr document) { m_Document = document; }
+
+ private:
+ String m_Index;
+ Dictionary::Ptr m_Document;
+};
+
+class ElasticsearchDatastreamWriter final : public ObjectImpl
+{
+public:
+ DECLARE_OBJECT(ElasticsearchDatastreamWriter);
+ DECLARE_OBJECTNAME(ElasticsearchDatastreamWriter);
+
+ static void StatsFunc(const Dictionary::Ptr& status, const Array::Ptr& perfdata);
+
+ static String FormatTimestamp(double ts);
+
+ void ValidateHostTagsTemplate(const Lazy &lvalue, const ValidationUtils &utils) override;
+ void ValidateServiceTagsTemplate(const Lazy &lvalue, const ValidationUtils &utils) override;
+ void ValidateHostLabelsTemplate(const Lazy &lvalue, const ValidationUtils &utils) override;
+ void ValidateServiceLabelsTemplate(const Lazy &lvalue, const ValidationUtils &utils) override;
+ void ValidateFilter(const Lazy &lvalue, const ValidationUtils &utils) override;
+
+protected:
+ void OnConfigLoaded() override;
+ void Resume() override;
+ void Pause() override;
+ Value TrySend(const Url::Ptr& url, String body);
+
+private:
+ WorkQueue m_WorkQueue{10000000, 1};
+ boost::signals2::connection m_HandleCheckResults;
+ Timer::Ptr m_FlushTimer;
+ bool m_Paused = false;
+
+ // This buffer should only be accessed from the worker thread.
+ // Every other access will lead to a race-condition.
+ std::vector m_DataBuffer;
+
+ std::uint64_t m_DocumentsSent = 0;
+ std::uint64_t m_DocumentsFailed = 0;
+ std::atomic_uint64_t m_ItemsFilteredOut = 0;
+
+ Expression::Ptr m_CompiledFilter;
+
+ void CheckResultHandler(const Checkable::Ptr& checkable, const CheckResult::Ptr& cr);
+ void ManageIndexTemplate();
+
+ Dictionary::Ptr ExtractPerfData(const Checkable::Ptr& checkable, const Array::Ptr& perfdata);
+ String ExtractDatastreamNamespace(const MacroProcessor::ResolverList& resolvers, const Checkable::Ptr& checkable,
+ const CheckResult::Ptr& cr);
+ bool Filter(const Checkable::Ptr& checkable);
+
+ OptionalTlsStream Connect();
+ void AssertOnWorkQueue();
+ void Flush();
+ void SendRequest(const String& body);
+
+ void ValidateTagsTemplate(const Array::Ptr& tags);
+ void ValidateLabelsTemplate(const Dictionary::Ptr& labels);
+};
+
+class StatusCodeException : public std::runtime_error
+{
+public:
+ StatusCodeException(boost::beast::http::status statusCode, String message, String body)
+ : std::runtime_error((message + " (HTTP " + std::to_string(static_cast(statusCode)) + ")").CStr()),
+ m_StatusCode(statusCode), m_Body(std::move(body)) {}
+
+ boost::beast::http::status GetStatusCode() const { return m_StatusCode; }
+ const String& GetBody() const { return m_Body; }
+
+private:
+ boost::beast::http::status m_StatusCode;
+ String m_Body;
+};
+
+}
+
+#endif /* ELASTICSEARCHDATASTREAM_H */
diff --git a/lib/perfdata/elasticsearchdatastreamwriter.ti b/lib/perfdata/elasticsearchdatastreamwriter.ti
new file mode 100644
index 00000000000..04bd3edde9a
--- /dev/null
+++ b/lib/perfdata/elasticsearchdatastreamwriter.ti
@@ -0,0 +1,82 @@
+/* Icinga 2 | (c) 2012 Icinga GmbH | GPLv2+ */
+
+#include "base/configobject.hpp"
+
+library perfdata;
+
+namespace icinga
+{
+
+class ElasticsearchDatastreamWriter : ConfigObject
+{
+ activation_priority 100;
+
+ [config, required] String host {
+ default {{{ return "127.0.0.1"; }}}
+ };
+ [config, required] String port {
+ default {{{ return "9200"; }}}
+ };
+ [config] bool enable_send_perfdata {
+ default {{{ return false; }}}
+ };
+ [config] String username;
+ [config, no_user_view, no_user_modify] String password;
+
+ [config, no_user_view, no_user_modify] String api_token;
+
+ [config] bool enable_tls {
+ default {{{ return false; }}}
+ };
+ [config] String datastream_namespace {
+ default {{{ return "default"; }}}
+ };
+ [config] bool enable_send_thresholds {
+ default {{{ return false; }}}
+ };
+ [config] bool manage_index_template {
+ default {{{ return true; }}}
+ };
+ [config] bool insecure_noverify {
+ default {{{ return false; }}}
+ };
+ [config] String ca_path;
+ [config] String cert_path;
+ [config] String key_path;
+
+ [config] int flush_interval {
+ default {{{ return 10; }}}
+ };
+ [config] int flush_threshold {
+ default {{{ return 1024; }}}
+ };
+ [config] bool enable_ha {
+ default {{{ return false; }}}
+ };
+
+ [config] Array::Ptr host_tags_template;
+ [config] Array::Ptr service_tags_template;
+ [config] Dictionary::Ptr host_labels_template;
+ [config] Dictionary::Ptr service_labels_template;
+ [config] Value filter;
+};
+
+validator ElasticsearchDatastreamWriter {
+ Array host_tags_template {
+ String "*";
+ };
+ Array service_tags_template {
+ String "*";
+ };
+
+ Dictionary host_labels_template {
+ String "*";
+ };
+ Dictionary service_labels_template {
+ String "*";
+ };
+
+ Value filter;
+};
+
+}
diff --git a/lib/remote/endpoint.cpp b/lib/remote/endpoint.cpp
index 751f895396e..32b109f4028 100644
--- a/lib/remote/endpoint.cpp
+++ b/lib/remote/endpoint.cpp
@@ -6,10 +6,8 @@
#include "remote/apilistener.hpp"
#include "remote/jsonrpcconnection.hpp"
#include "remote/zone.hpp"
-#include "base/configtype.hpp"
#include "base/utility.hpp"
#include "base/exception.hpp"
-#include "base/convert.hpp"
using namespace icinga;
@@ -172,3 +170,12 @@ double Endpoint::GetSecondsProcessingMessages() const
{
return m_InputProcessingTime;
}
+
+String Endpoint::GetIcingaVersionString() const {
+ unsigned long version = GetIcingaVersion();
+ auto bugfix = version % 100;
+ version /= 100;
+ auto minor = version % 100;
+ auto major = version / 100;
+ return String() + std::to_string(major) + "." + std::to_string(minor) + "." + std::to_string(bugfix);
+}
diff --git a/lib/remote/endpoint.hpp b/lib/remote/endpoint.hpp
index 0cd015bf9eb..f59a03a464a 100644
--- a/lib/remote/endpoint.hpp
+++ b/lib/remote/endpoint.hpp
@@ -61,6 +61,8 @@ class Endpoint final : public ObjectImpl
double GetSecondsProcessingMessages() const override;
+ String GetIcingaVersionString() const;
+
protected:
void OnAllConfigLoaded() override;
diff --git a/tools/syntax/nano/icinga2.nanorc b/tools/syntax/nano/icinga2.nanorc
index cff78be3b64..4388ab05891 100644
--- a/tools/syntax/nano/icinga2.nanorc
+++ b/tools/syntax/nano/icinga2.nanorc
@@ -1,5 +1,5 @@
### Nano synteax file
-### Icinga2 object configuration file
+### Icinga2 object configuration file
syntax "icinga2" "/etc/icinga2/.*\.conf$" "/usr/share/icinga2/include/(plugin|itl|.*\.conf$)"
@@ -7,6 +7,7 @@ syntax "icinga2" "/etc/icinga2/.*\.conf$" "/usr/share/icinga2/include/(plugin|i
icolor brightgreen "object[ \t]+(host|hostgroup|service|servicegroup|user|usergroup)"
icolor brightgreen "object[ \t]+(checkcommand|notificationcommand|eventcommand|notification)"
icolor brightgreen "object[ \t]+(timeperiod|scheduleddowntime|dependency|perfdatawriter)"
+icolor brightgreen "object[ \t]+(elasticsearchwriter|elasticsearchdatastreamwriter)"
icolor brightgreen "object[ \t]+(graphitewriter|idomysqlconnection|idomysqlconnection)"
icolor brightgreen "object[ \t]+(livestatuslistener|externalcommandlistener)"
icolor brightgreen "object[ \t]+(compatlogger|checkcomponent|notificationcomponent)"
@@ -32,6 +33,9 @@ icolor red "(^|^\s+)(port|ranges|retry_interval|rotation_interval|rotation_met
icolor red "(^|^\s+)(service_format_template|service_name|service_name_template|service_perfdata_path|service_temp_path)"
icolor red "(^|^\s+)(severity|socket_path|socket_type|spool_dir|states|status_path|table_prefix)"
icolor red "(^|^\s+)(timeout|times|types|update_interval|user|user_groups|users|volatile|zone)"
+icolor red "(^|^\s+)(insecure_noverify|flush_interval|flush_threshold|filter|api_token|username|enable_tls)"
+icolor red "(^|^\s+)(enable_send_perfdata|datastream_namespace|enable_send_thresholds|manage_index_template)"
+icolor red "(^|^\s+)(host_labels_template|service_labels_template|host_tags_template|service_tags_template)"
icolor red "(^|^\s+)(vars\.\w+)"
diff --git a/tools/syntax/vim/syntax/icinga2.vim b/tools/syntax/vim/syntax/icinga2.vim
index 5201d6fc836..5f4a65a8088 100644
--- a/tools/syntax/vim/syntax/icinga2.vim
+++ b/tools/syntax/vim/syntax/icinga2.vim
@@ -54,6 +54,7 @@ syn match Lambda "{{}}"
" Object types
syn keyword icinga2ObjType ApiListener ApiUser CheckCommand CheckerComponent
syn keyword icinga2ObjType Comment Dependency Downtime ElasticsearchWriter
+syn keyword icinga2ObjType ElasticsearchDatastreamWriter
syn keyword icinga2ObjType Endpoint EventCommand ExternalCommandListener
syn keyword icinga2ObjType FileLogger GelfWriter GraphiteWriter Host HostGroup
syn keyword icinga2ObjType IcingaApplication IdoMysqlConnection IdoPgsqlConnection
@@ -95,6 +96,8 @@ syn keyword icinga2ObjAttr contained ssl_ca ssl_capath ssl_ca_cert ssl_cert ss
syn keyword icinga2ObjAttr contained states status_path table_prefix ticket_salt
syn keyword icinga2ObjAttr contained timeout times tls_handshake_timeout tls_protocolmin
syn keyword icinga2ObjAttr contained types update_interval user user_groups username users volatile zone
+syn keyword icinga2ObjAttr contained api_token datastream_namespace manage_index_template insecure_noverify
+syn keyword icinga2ObjAttr contained host_tags_template service_tags_template host_labels_template service_labels_template
syn match icinga2ObjAttr contained "\(vars.\w\+\)"
" keywords: https://icinga.com/docs/icinga2/latest/doc/17-language-reference/#reserved-keywords
diff --git a/usr/elasticsearch/index-template.json b/usr/elasticsearch/index-template.json
new file mode 100644
index 00000000000..a99d2360f72
--- /dev/null
+++ b/usr/elasticsearch/index-template.json
@@ -0,0 +1,130 @@
+{
+ "index_patterns": ["metrics-icinga2.*-*"],
+ "_meta": {
+ "managed": true,
+ "managed_by": "icinga2"
+ },
+ "composed_of": ["ecs@mappings", "metrics-icinga2@custom"],
+ "priority": 500,
+ "data_stream": {},
+ "template": {
+ "settings": {
+ "index": {
+ "mode": "time_series",
+ "routing_path": ["host.name", "service.name"],
+ "mapping": {
+ "total_fields.limit": 2000
+ }
+ }
+ },
+ "mappings": {
+ "dynamic_templates": [
+ {
+ "perfdata_fields": {
+ "path_match": "perfdata.*",
+ "match_mapping_type": "object",
+ "mapping": {
+ "type": "object",
+ "properties": {
+ "value": {
+ "type": "double"
+ },
+ "min": {
+ "type": "double"
+ },
+ "max": {
+ "type": "double"
+ },
+ "warn": {
+ "type": "double"
+ },
+ "crit": {
+ "type": "double"
+ },
+ "unit": {
+ "type": "keyword"
+ }
+ }
+ }
+ }
+ }
+ ],
+ "properties": {
+ "perfdata": {
+ "type": "object"
+ },
+ "check": {
+ "properties": {
+ "exit_status": {
+ "type": "integer"
+ },
+ "execution_time": {
+ "type": "double"
+ },
+ "latency": {
+ "type": "double"
+ },
+ "schedule_start": {
+ "type": "date"
+ },
+ "schedule_end": {
+ "type": "date"
+ },
+ "active": {
+ "type": "boolean"
+ },
+ "current_attempt": {
+ "type": "integer"
+ },
+ "max_attempts": {
+ "type": "integer"
+ },
+ "reachable": {
+ "type": "boolean"
+ }
+ }
+ },
+ "host": {
+ "properties": {
+ "name": {
+ "time_series_dimension": true,
+ "type": "keyword"
+ },
+ "hostname": {
+ "type": "keyword"
+ },
+ "soft_state": {
+ "type": "keyword"
+ },
+ "hard_state": {
+ "type": "keyword"
+ },
+ "zone": {
+ "type": "keyword"
+ }
+ }
+ },
+ "service": {
+ "properties": {
+ "name": {
+ "time_series_dimension": true,
+ "type": "keyword"
+ },
+ "display_name": {
+ "type": "keyword"
+ },
+ "zone": {
+ "type": "keyword"
+ },
+ "hard_state": {
+ "type": "keyword"
+ },
+ "soft_state": {
+ "type": "keyword"
+ }
+ }
+ }
+ }
+ }
+ }
+}