Skip to content

Commit e0b5e9c

Browse files
committed
support for writing directly to stdout
this allows reading parquet to stdout directly from the command line via the --output option --output-fields may also be provided to project only the declared fields
1 parent 5262030 commit e0b5e9c

File tree

9 files changed

+190
-3
lines changed

9 files changed

+190
-3
lines changed

tessellate-main/src/main/java/io/clusterless/tessellate/factory/TapFactories.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import io.clusterless.tessellate.factory.hdfs.ParquetFactory;
1414
import io.clusterless.tessellate.factory.hdfs.TextFSFactory;
1515
import io.clusterless.tessellate.factory.local.LocalDirectoryFactory;
16+
import io.clusterless.tessellate.factory.local.StdOutFactory;
1617
import io.clusterless.tessellate.model.Sink;
1718
import io.clusterless.tessellate.model.Source;
1819
import io.clusterless.tessellate.options.PipelineOptions;
@@ -101,7 +102,7 @@ public static SinkFactory findSinkFactory(Sink sinkModel) {
101102
List<URI> inputUris = sinkModel.uris();
102103

103104
if (inputUris.isEmpty()) {
104-
return null;
105+
return new StdOutFactory();
105106
}
106107

107108
Format format = sinkModel.schema().format();
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
/*
2+
* Copyright (c) 2023-2025 Chris K Wensel <[email protected]>. All Rights Reserved.
3+
*
4+
* This Source Code Form is subject to the terms of the Mozilla Public
5+
* License, v. 2.0. If a copy of the MPL was not distributed with this
6+
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
7+
*/
8+
9+
package io.clusterless.tessellate.factory.local;
10+
11+
import cascading.nested.json.local.JSONTextLine;
12+
import cascading.scheme.Scheme;
13+
import cascading.scheme.local.CompressorScheme;
14+
import cascading.scheme.local.TextDelimited;
15+
import cascading.scheme.local.TextLine;
16+
import cascading.scheme.util.DelimitedParser;
17+
import cascading.tap.Tap;
18+
import cascading.tap.local.StdOutTap;
19+
import cascading.tuple.Fields;
20+
import io.clusterless.tessellate.factory.SinkFactory;
21+
import io.clusterless.tessellate.model.Dataset;
22+
import io.clusterless.tessellate.model.Schema;
23+
import io.clusterless.tessellate.model.Sink;
24+
import io.clusterless.tessellate.options.PipelineOptions;
25+
import io.clusterless.tessellate.util.Compression;
26+
import io.clusterless.tessellate.util.Format;
27+
import io.clusterless.tessellate.util.Models;
28+
import io.clusterless.tessellate.util.Protocol;
29+
import io.clusterless.tessellate.util.json.JSONUtil;
30+
import org.jetbrains.annotations.NotNull;
31+
32+
import java.io.IOException;
33+
import java.io.InputStream;
34+
import java.io.OutputStream;
35+
import java.util.Properties;
36+
import java.util.Set;
37+
38+
public class StdOutFactory implements SinkFactory {
39+
@Override
40+
public Set<Protocol> getSinkProtocols() {
41+
return Set.of(Protocol.stdout);
42+
}
43+
44+
@Override
45+
public Set<Format> getFormats() {
46+
return Set.of(Format.tsv, Format.csv, Format.json);
47+
}
48+
49+
@NotNull
50+
protected Scheme<Properties, InputStream, OutputStream, ?, ?> createScheme(Dataset dataset, Fields declaredFields) {
51+
CompressorScheme.Compressor compressor = null;
52+
53+
Scheme<Properties, InputStream, OutputStream, ?, ?> scheme;
54+
55+
Schema schema = dataset.schema();
56+
Format format = schema.format() == null ? Format.csv : schema.format();
57+
58+
switch (format) {
59+
default:
60+
case text:
61+
Fields sinkFields = new Fields("line");
62+
Fields sourceFields = dataset.schema().embedsSchema() ? new Fields("num", "line").applyTypes(Long.TYPE, String.class) : sinkFields;
63+
scheme = new TextLine(sourceFields, sinkFields, compressor);
64+
break;
65+
case delimited:
66+
DelimitedParser delimited = new DelimitedParser(schema.delimiterChar(), schema.quoteChar(), null, schema.strictParsing(), true);
67+
return new TextDelimited(declaredFields, compressor, schema.embedsSchema(), schema.embedsSchema(), delimited);
68+
case csv:
69+
DelimitedParser csv = new DelimitedParser(",", schema.quoteChar(), null, schema.strictParsing(), true);
70+
scheme = new TextDelimited(declaredFields, compressor, schema.embedsSchema(), csv);
71+
break;
72+
case tsv:
73+
DelimitedParser tsv = new DelimitedParser("\t", schema.quoteChar(), null, schema.strictParsing(), true);
74+
scheme = new TextDelimited(declaredFields, compressor, schema.embedsSchema(), tsv);
75+
break;
76+
case json:
77+
scheme = new JSONTextLine(JSONUtil.DATA_MAPPER, declaredFields, compressor) {
78+
@Override
79+
public String getExtension() {
80+
return format.extension();
81+
}
82+
};
83+
break;
84+
}
85+
86+
return scheme;
87+
}
88+
89+
@Override
90+
public Tap<Properties, ?, ?> getSink(PipelineOptions pipelineOptions, Sink sinkModel, Fields currentFields) throws IOException {
91+
Fields declaredFields = Models.fieldAsFields(sinkModel.schema().declared(), String.class, Fields.ALL);
92+
93+
Fields resultFields = declaredFields.isAll() ? currentFields : declaredFields;
94+
95+
return new StdOutTap(createScheme(sinkModel, resultFields));
96+
}
97+
98+
@Override
99+
public Set<Compression> getCompressions() {
100+
return Set.of(Compression.none);
101+
}
102+
}

tessellate-main/src/main/java/io/clusterless/tessellate/model/Field.java

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import cascading.tuple.Fields;
1212
import com.fasterxml.jackson.annotation.JsonCreator;
1313
import com.fasterxml.jackson.annotation.JsonIgnore;
14+
import com.fasterxml.jackson.annotation.JsonProperty;
1415
import io.clusterless.tessellate.parser.FieldsParser;
1516

1617
import java.util.Arrays;
@@ -27,8 +28,15 @@ public static List<Field> asField(String... declarations) {
2728
return Arrays.stream(declarations).map(Field::new).collect(Collectors.toList());
2829
}
2930

31+
// Constructor for deserialization from JSON string
3032
@JsonCreator
31-
public Field(String declaration) {
33+
public static Field fromString(String declaration) {
34+
Objects.requireNonNull(declaration, "field may not be null");
35+
return new Field(declaration);
36+
}
37+
38+
@JsonCreator
39+
public Field(@JsonProperty("declaration") String declaration) {
3240
Objects.requireNonNull(declaration, "field may not be null");
3341

3442
this.declaration = declaration;
@@ -39,8 +47,24 @@ public Fields fields() {
3947
return fields;
4048
}
4149

50+
public String declaration() {
51+
return declaration;
52+
}
53+
4254
@Override
4355
public String toString() {
4456
return declaration;
4557
}
58+
59+
@Override
60+
public boolean equals(Object o) {
61+
if (o == null || getClass() != o.getClass()) return false;
62+
Field field = (Field) o;
63+
return Objects.equals(fields, field.fields) && Objects.equals(declaration, field.declaration);
64+
}
65+
66+
@Override
67+
public int hashCode() {
68+
return Objects.hash(declaration);
69+
}
4670
}

tessellate-main/src/main/java/io/clusterless/tessellate/model/Sink.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212

1313
import java.net.URI;
1414
import java.util.ArrayList;
15+
import java.util.Collections;
1516
import java.util.List;
1617

1718
public class Sink implements Dataset, Model {
@@ -55,6 +56,10 @@ public boolean hasManifest() {
5556

5657
@Override
5758
public List<URI> uris() {
59+
if (output() == null) {
60+
return Collections.emptyList();
61+
}
62+
5863
return List.of(output());
5964
}
6065

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
/*
2+
* Copyright (c) 2023-2025 Chris K Wensel <[email protected]>. All Rights Reserved.
3+
*
4+
* This Source Code Form is subject to the terms of the Mozilla Public
5+
* License, v. 2.0. If a copy of the MPL was not distributed with this
6+
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
7+
*/
8+
9+
package io.clusterless.tessellate.options;
10+
11+
import io.clusterless.tessellate.model.Field;
12+
import picocli.CommandLine;
13+
14+
public class FieldConverter implements CommandLine.ITypeConverter<Field> {
15+
@Override
16+
public Field convert(String value) throws Exception {
17+
return new Field(value);
18+
}
19+
}

tessellate-main/src/main/java/io/clusterless/tessellate/options/OutputOptions.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,20 @@
88

99
package io.clusterless.tessellate.options;
1010

11+
import io.clusterless.tessellate.model.Field;
12+
import io.clusterless.tessellate.util.Format;
1113
import picocli.CommandLine;
1214

1315
import java.net.URI;
16+
import java.util.List;
1417

1518
public class OutputOptions implements AWSOptions {
1619
@CommandLine.Option(names = {"-o", "--output"}, description = "output uris")
1720
private URI output;
21+
@CommandLine.Option(names = {"--output-fields"}, description = "output fields", converter = FieldConverter.class, split = "[\\+,]")
22+
private List<Field> outputFields;
23+
@CommandLine.Option(names = {"--output-format"}, description = "output format")
24+
private Format outputFormat;
1825
@CommandLine.Option(names = {"-t", "--output-manifest-template"}, description = "output manifest uri template")
1926
private String outputManifestTemplate;
2027
@CommandLine.Option(names = {"-l", "--output-manifest-lot"}, description = "output lot")
@@ -37,6 +44,22 @@ public OutputOptions setOutput(URI output) {
3744
return this;
3845
}
3946

47+
public List<Field> outputFields() {
48+
return outputFields;
49+
}
50+
51+
public void setOutputFields(List<Field> outputFields) {
52+
this.outputFields = outputFields;
53+
}
54+
55+
public Format outputFormat() {
56+
return outputFormat;
57+
}
58+
59+
public void setOutputFormat(Format outputFormat) {
60+
this.outputFormat = outputFormat;
61+
}
62+
4063
public String outputManifestTemplate() {
4164
return outputManifestTemplate;
4265
}

tessellate-main/src/main/java/io/clusterless/tessellate/options/PipelineOptionsMerge.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,8 @@ public class PipelineOptionsMerge {
4848
.putInto("inputManifestLot", "/source/manifestLot")
4949
.putInto("inputErrors", "/source/errorPath")
5050
.putInto("output", "/sink/output")
51+
.putInto("outputFields", "/sink/schema/declared")
52+
.putInto("outputFormat", "/sink/schema/format")
5153
.putInto("outputManifestTemplate", "/sink/manifestTemplate")
5254
.putInto("outputManifestLot", "/sink/manifestLot")
5355
.putInto("outputErrors", "/sink/errorPath");
@@ -68,6 +70,8 @@ public class PipelineOptionsMerge {
6870
argumentLookups.put("inputManifestLot", pipelineOptions -> nullOrNode(pipelineOptions.inputOptions().inputLot()));
6971
argumentLookups.put("inputErrors", pipelineOptions -> nullOrNode(pipelineOptions.inputOptions().inputErrors()));
7072
argumentLookups.put("output", pipelineOptions -> nullOrNode(pipelineOptions.outputOptions().output()));
73+
argumentLookups.put("outputFields", pipelineOptions -> nullOrNode(pipelineOptions.outputOptions().outputFields()));
74+
argumentLookups.put("outputFormat", pipelineOptions -> nullOrNode(pipelineOptions.outputOptions().outputFormat()));
7175
argumentLookups.put("outputManifestTemplate", pipelineOptions -> nullOrNode(pipelineOptions.outputOptions().outputManifestTemplate()));
7276
argumentLookups.put("outputManifestLot", pipelineOptions -> nullOrNode(pipelineOptions.outputOptions().outputLot()));
7377
argumentLookups.put("outputErrors", pipelineOptions -> nullOrNode(pipelineOptions.outputOptions().outputErrors()));

tessellate-main/src/main/java/io/clusterless/tessellate/util/Protocol.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
package io.clusterless.tessellate.util;
1010

1111
public enum Protocol {
12-
file, hdfs, s3, http, https;
12+
file, hdfs, s3, http, https, stdout;
1313

1414
public static Protocol fromString(String protocol) {
1515
switch (protocol) {
@@ -23,6 +23,8 @@ public static Protocol fromString(String protocol) {
2323
return http;
2424
case "https":
2525
return https;
26+
case "-":
27+
return stdout;
2628
default:
2729
throw new IllegalArgumentException("Unknown protocol: " + protocol);
2830
}

tessellate-main/src/test/java/io/clusterless/tessellate/pipeline/PipelineOptionsMergerTest.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,13 @@
1010

1111
import com.adelean.inject.resources.junit.jupiter.GivenTextResource;
1212
import com.adelean.inject.resources.junit.jupiter.TestWithResources;
13+
import io.clusterless.tessellate.model.Field;
1314
import io.clusterless.tessellate.model.PipelineDef;
1415
import io.clusterless.tessellate.options.PipelineOptions;
1516
import io.clusterless.tessellate.options.PipelineOptionsMerge;
1617
import io.clusterless.tessellate.parser.ast.AssignmentStatement;
1718
import io.clusterless.tessellate.parser.ast.UnaryOperation;
19+
import io.clusterless.tessellate.util.Format;
1820
import io.clusterless.tessellate.util.json.JSONUtil;
1921
import org.junit.jupiter.api.Test;
2022

@@ -50,17 +52,22 @@ void usingOptions(@GivenTextResource("/config/pipeline-mvel.json") String pipeli
5052
void usingOptionsMissing() throws IOException {
5153
List<URI> inputs = List.of(URI.create("s3://foo/input"));
5254
URI output = URI.create("s3://foo/output");
55+
List<Field> declared = List.of(new Field("json"));
5356

5457
PipelineOptions pipelineOptions = new PipelineOptions();
5558
pipelineOptions.inputOptions().setInputs(inputs);
5659
pipelineOptions.outputOptions().setOutput(output);
60+
pipelineOptions.outputOptions().setOutputFields(declared);
61+
pipelineOptions.outputOptions().setOutputFormat(Format.json);
5762

5863
PipelineOptionsMerge merger = new PipelineOptionsMerge(pipelineOptions);
5964

6065
PipelineDef merged = merger.merge();
6166

6267
assertEquals(inputs, merged.source().inputs());
6368
assertEquals(output, merged.sink().output());
69+
assertEquals(declared, merged.sink().schema().declared());
70+
assertEquals(Format.json, merged.sink().schema().format());
6471
}
6572

6673
@Test

0 commit comments

Comments
 (0)