Skip to content

Commit 7809c6b

Browse files
authored
Add option to unwrap messages replayed from DLQ to avoid nesting loops. (#2888)
* Draft * Add option to unwrap replayed messages. * Comments.
1 parent 87c6fe9 commit 7809c6b

File tree

2 files changed

+148
-2
lines changed

2 files changed

+148
-2
lines changed

v1/src/main/java/com/google/cloud/teleport/templates/PubSubToSplunk.java

Lines changed: 77 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import com.google.cloud.teleport.coders.FailsafeElementCoder;
1919
import com.google.cloud.teleport.metadata.Template;
2020
import com.google.cloud.teleport.metadata.TemplateCategory;
21+
import com.google.cloud.teleport.metadata.TemplateParameter;
2122
import com.google.cloud.teleport.splunk.SplunkEvent;
2223
import com.google.cloud.teleport.splunk.SplunkEventCoder;
2324
import com.google.cloud.teleport.splunk.SplunkIO;
@@ -60,6 +61,8 @@
6061
import org.apache.beam.sdk.values.TupleTag;
6162
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
6263
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
64+
import org.json.JSONException;
65+
import org.json.JSONObject;
6366
import org.slf4j.Logger;
6467
import org.slf4j.LoggerFactory;
6568

@@ -254,18 +257,38 @@ public static PipelineResult run(PubSubToSplunkOptions options) {
254257
.withEnableGzipHttpCompression(options.getEnableGzipHttpCompression())
255258
.build());
256259

260+
final ValueProvider<Boolean> unwrapHecProvider = options.getUnwrapHecForDeadletter();
261+
257262
// 5a) Wrap write failures into a FailsafeElement.
258263
PCollection<FailsafeElement<String, String>> wrappedSplunkWriteErrors =
259264
writeErrors.apply(
260265
"WrapSplunkWriteErrors",
261266
ParDo.of(
262267
new DoFn<SplunkWriteError, FailsafeElement<String, String>>() {
263268

269+
private boolean unwrapHecForDeadletter = false;
270+
271+
@Setup
272+
public void setup() {
273+
if (unwrapHecProvider != null) {
274+
unwrapHecForDeadletter =
275+
MoreObjects.firstNonNull(unwrapHecProvider.get(), false);
276+
}
277+
LOG.info("unwrapHecForDeadletter set to: {}", unwrapHecForDeadletter);
278+
}
279+
264280
@ProcessElement
265281
public void processElement(ProcessContext context) {
266282
SplunkWriteError error = context.element();
283+
284+
// Extract original payload if unwrap is enabled
285+
String payload = error.payload();
286+
if (unwrapHecForDeadletter) {
287+
payload = extractOriginalPayloadFromHec(payload);
288+
}
289+
267290
FailsafeElement<String, String> failsafeElement =
268-
FailsafeElement.of(error.payload(), error.payload());
291+
FailsafeElement.of(payload, payload);
269292

270293
if (error.statusMessage() != null) {
271294
failsafeElement.setErrorMessage(error.statusMessage());
@@ -304,7 +327,20 @@ public interface PubSubToSplunkOptions
304327
extends SplunkOptions,
305328
PubsubReadSubscriptionOptions,
306329
PubsubWriteDeadletterTopicOptions,
307-
JavascriptTextTransformerOptions {}
330+
JavascriptTextTransformerOptions {
331+
332+
@TemplateParameter.Boolean(
333+
optional = true,
334+
description = "Unwrap Splunk HEC format from deadletter messages.",
335+
helpText =
336+
"When enabled, if a message fails to write to Splunk and is sent to the deadletter "
337+
+ "queue, the original payload will be extracted from the Splunk HEC format before "
338+
+ "writing to the deadletter topic. This prevents event nesting when messages are "
339+
+ "replayed. Default: `false`.")
340+
ValueProvider<Boolean> getUnwrapHecForDeadletter();
341+
342+
void setUnwrapHecForDeadletter(ValueProvider<Boolean> unwrapHecForDeadletter);
343+
}
308344

309345
/**
310346
* A {@link PTransform} that reads messages from a Pub/Sub subscription, increments a counter and
@@ -411,4 +447,43 @@ private static JsonObject getAttributesJson(Map<String, String> attributesMap) {
411447

412448
return attributesJson;
413449
}
450+
451+
/**
452+
* Extracts the original payload from a Splunk HEC-formatted JSON string.
453+
*
454+
* <p>When a SplunkEvent fails to write to Splunk, it's stored in HEC format like: {"event":
455+
* "original payload", "time": ..., "host": ..., etc.}
456+
*
457+
* <p>This method extracts just the "event" field to prevent nesting when replaying messages from
458+
* the deadletter queue.
459+
*
460+
* @param hecPayload The HEC-formatted JSON string
461+
* @return The original event payload, or the input if extraction fails
462+
*/
463+
@VisibleForTesting
464+
protected static String extractOriginalPayloadFromHec(String hecPayload) {
465+
try {
466+
JSONObject json = new JSONObject(hecPayload);
467+
468+
// Check if this looks like HEC format (has "event" field)
469+
if (json.has("event")) {
470+
Object eventObj = json.get("event");
471+
472+
// The event field could be a JSON object/array or a string
473+
if (eventObj instanceof String) {
474+
return (String) eventObj;
475+
} else {
476+
// If it's a JSONObject or JSONArray, return its string
477+
// representation
478+
return eventObj.toString();
479+
}
480+
}
481+
} catch (JSONException e) {
482+
// If parsing fails, log and return the original
483+
LOG.debug("Failed to parse payload as HEC JSON, returning as-is. Error: {}", e.getMessage());
484+
}
485+
486+
// If no "event" field found or parsing failed, return original
487+
return hecPayload;
488+
}
414489
}

v1/src/test/java/com/google/cloud/teleport/templates/PubsubToSplunkTest.java

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package com.google.cloud.teleport.templates;
1717

1818
import static com.google.common.truth.Truth.assertThat;
19+
import static org.junit.Assert.assertEquals;
1920

2021
import com.google.gson.Gson;
2122
import com.google.gson.JsonObject;
@@ -88,4 +89,74 @@ public void testPubsubMessageWithComplexAttributes() {
8889
assertThat(attributesJson.get(key).getAsString()).isEqualTo(attributes.get(key));
8990
}
9091
}
92+
93+
/** Tests extraction of original payload from HEC-formatted JSON. */
94+
@Test
95+
public void testExtractOriginalPayloadFromHec_withStringEvent() {
96+
String hecPayload = "{\"event\":\"simple text message\",\"time\":1634567890}";
97+
98+
String extracted = PubSubToSplunk.extractOriginalPayloadFromHec(hecPayload);
99+
100+
assertEquals("simple text message", extracted);
101+
}
102+
103+
/** Tests extraction when event is a JSON object. */
104+
@Test
105+
public void testExtractOriginalPayloadFromHec_withJsonEvent() {
106+
String hecPayload =
107+
"{\"event\":{\"userId\":123,\"action\":\"login\"},\"time\":1634567890,\"host\":\"web-01\"}";
108+
109+
String extracted = PubSubToSplunk.extractOriginalPayloadFromHec(hecPayload);
110+
111+
// The extracted event should be the JSON object as a string
112+
JsonObject extractedJson = GSON.fromJson(extracted, JsonObject.class);
113+
assertThat(extractedJson.get("userId").getAsInt()).isEqualTo(123);
114+
assertThat(extractedJson.get("action").getAsString()).isEqualTo("login");
115+
}
116+
117+
/** Tests extraction when payload is already unwrapped (not HEC format). */
118+
@Test
119+
public void testExtractOriginalPayloadFromHec_alreadyUnwrapped() {
120+
String originalPayload = "{\"userId\":123,\"action\":\"login\"}";
121+
122+
String extracted = PubSubToSplunk.extractOriginalPayloadFromHec(originalPayload);
123+
124+
// Should return as-is since no "event" field
125+
assertEquals(originalPayload, extracted);
126+
}
127+
128+
/** Tests extraction with plain text (not JSON). */
129+
@Test
130+
public void testExtractOriginalPayloadFromHec_plainText() {
131+
String plainText = "This is just plain text, not JSON";
132+
133+
String extracted = PubSubToSplunk.extractOriginalPayloadFromHec(plainText);
134+
135+
// Should return as-is when parsing fails
136+
assertEquals(plainText, extracted);
137+
}
138+
139+
/** Tests extraction with complex HEC format including all metadata fields. */
140+
@Test
141+
public void testExtractOriginalPayloadFromHec_fullHecFormat() {
142+
String fullHecPayload =
143+
"{\"event\":{\"log\":\"Application started\",\"level\":\"INFO\"},"
144+
+ "\"time\":1634567890,"
145+
+ "\"host\":\"app-server-01\","
146+
+ "\"source\":\"application\","
147+
+ "\"sourcetype\":\"json\","
148+
+ "\"index\":\"main\","
149+
+ "\"fields\":{\"environment\":\"production\"}}";
150+
151+
String extracted = PubSubToSplunk.extractOriginalPayloadFromHec(fullHecPayload);
152+
153+
// Should extract just the event field
154+
JsonObject extractedJson = GSON.fromJson(extracted, JsonObject.class);
155+
assertThat(extractedJson.get("log").getAsString()).isEqualTo("Application started");
156+
assertThat(extractedJson.get("level").getAsString()).isEqualTo("INFO");
157+
158+
// Verify other HEC fields are NOT in the extracted payload
159+
assertThat(extractedJson.has("time")).isFalse();
160+
assertThat(extractedJson.has("host")).isFalse();
161+
}
91162
}

0 commit comments

Comments
 (0)