Skip to content

Commit 76aeae5

Browse files
committed
fix issue where traps unnecessarily inherited sink meta-data
1 parent 2422a9d commit 76aeae5

File tree

1 file changed

+9
-7
lines changed
  • tessellate-main/src/main/java/io/clusterless/tessellate/pipeline

1 file changed

+9
-7
lines changed

tessellate-main/src/main/java/io/clusterless/tessellate/pipeline/Pipeline.java

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -204,13 +204,13 @@ public void build() throws IOException {
204204
Map<String, Tap> traps = new HashMap<>();
205205

206206
if (primarySource.errorPath() != null) {
207-
Tap<Properties, ?, ?> errorTap = createTrap(primarySource.errorPath(), "errors-source", sinkFactory);
207+
Tap<Properties, ?, ?> errorTap = createTrap(primarySource.errorPath(), "errors-source");
208208
traps.put(HEAD, errorTap);
209209
LOG.info("trapping input errors at: {}", errorTap.getIdentifier());
210210
}
211211

212212
if (pipelineDef.sink().errorPath() != null) {
213-
Tap<Properties, ?, ?> errorTap = createTrap(pipelineDef.sink().errorPath(), "errors-sink", sinkFactory);
213+
Tap<Properties, ?, ?> errorTap = createTrap(pipelineDef.sink().errorPath(), "errors-sink");
214214
traps.put(TAIL, errorTap);
215215
LOG.info("trapping output errors at: {}", errorTap.getIdentifier());
216216
}
@@ -223,7 +223,7 @@ public void build() throws IOException {
223223
.buildProperties(commonProperties);
224224
}
225225

226-
Map<String, Tap> joinSources = createJoinSources(sinkFactory, traps);
226+
Map<String, Tap> joinSources = createJoinSources(traps);
227227

228228
FlowDef flowDef = flowDef()
229229
.setName("pipeline")
@@ -238,7 +238,7 @@ public void build() throws IOException {
238238
state = State.READY;
239239
}
240240

241-
private Map<String, Tap> createJoinSources(SinkFactory sinkFactory, Map<String, Tap> traps) throws IOException {
241+
private Map<String, Tap> createJoinSources(Map<String, Tap> traps) throws IOException {
242242
Map<String, Tap> results = new HashMap<>();
243243

244244
Map<String, Source> secondarySources = findSecondarySources();
@@ -251,7 +251,7 @@ private Map<String, Tap> createJoinSources(SinkFactory sinkFactory, Map<String,
251251
results.put(name, sourceFactory.getSource(pipelineOptions, source));
252252

253253
if (source.errorPath() != null) {
254-
Tap<Properties, ?, ?> errorTap = createTrap(source.errorPath(), "errors-source-" + name, sinkFactory);
254+
Tap<Properties, ?, ?> errorTap = createTrap(source.errorPath(), "errors-source-" + name);
255255
traps.put(HEAD, errorTap);
256256
LOG.info("trapping input errors for: {}, at: {}", name, errorTap.getIdentifier());
257257
}
@@ -323,7 +323,7 @@ private Map<String, Source> findSecondarySources() {
323323
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
324324
}
325325

326-
private Tap<Properties, ?, ?> createTrap(URI errorPath, String prefix, SinkFactory sinkFactory) throws IOException {
326+
private Tap<Properties, ?, ?> createTrap(URI errorPath, String prefix) throws IOException {
327327
Sink errorSink = Sink.builder()
328328
// for FileTap the final / gets removed
329329
.withOutput(URIs.copyAsDirectory(URIs.cleanFileUrls(errorPath)))
@@ -337,7 +337,9 @@ private Map<String, Source> findSecondarySources() {
337337
.build())
338338
.build();
339339

340-
return sinkFactory.getSink(pipelineOptions, errorSink, Fields.ALL);
340+
SinkFactory errorSinkFactory = TapFactories.findSinkFactory(errorSink);
341+
342+
return errorSinkFactory.getSink(pipelineOptions, errorSink, Fields.ALL);
341343
}
342344

343345
private static void logCurrentFields(Fields currentFields) {

0 commit comments

Comments
 (0)