Skip to content

Commit 2422a9d

Browse files
committed
add formatFields intrinsic transform function
Apache Parquet doesn't like spaces and some characters in column names, this makes renaming fields a bit easier.
1 parent 471c799 commit 2422a9d

File tree

14 files changed

+291
-79
lines changed

14 files changed

+291
-79
lines changed

tessellate-main/src/main/antora/modules/reference/pages/transforms.adoc

Lines changed: 30 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,14 @@
22

33
== Fields
44

5-
Input and output files/objects (also referred to as sources and sinks) are made of both rows and columns. Or tuples and fields.
5+
Input and output files/objects (also referred to as sources and sinks) are made of both rows and columns.
6+
Or tuples and fields.
67

78
A tuple has a set of fields, and a field has an optional xref:types.adoc[type] (and any associated metadata).
89

9-
Data files, or objects, have paths and names. Field values can be parsed from the paths and embedded in the tuple stream
10-
as fields. This is common when data has been partitioned into files where common values (like month and/or day) can
11-
be embedded in the path name to help select relevant files (push down predicates are applied to path values by many
12-
query engines).
10+
Data files, or objects, have paths and names.
11+
Field values can be parsed from the paths and embedded in the tuple stream as fields.
12+
This is common when data has been partitioned into files where common values (like month and/or day) can be embedded in the path name to help select relevant files (push down predicates are applied to path values by many query engines).
1313

1414
Declared fields in a pipeline have the following format: `<field_name>|<field_type>`, where `<field_name>` is a string, or an ordinal (number representing the position).
1515
The `<field_name>` may be quoted by a single quote (`'`) .
@@ -20,7 +20,8 @@ The actual supported types and associated metadata are described in xref:types.a
2020

2121
== Transforms
2222

23-
Transforms manipulate the tuple stream. They are applied to every tuple in the tuple stream.
23+
Transforms manipulate the tuple stream.
24+
They are applied to every tuple in the tuple stream.
2425

2526
Insert literal:: Insert a literal value into a field.
2627
Coerce field:: Transform a field, in every tuple.
@@ -49,14 +50,16 @@ For example:
4950
- `1689820455 pass:[=>] time|DateTime|yyyyMMdd` - convert the long value to a date time using the format `yyyyMMdd` and assign the result to the field `time`.
5051
- `ratio +> ratio|Double` - Coerces the string field "ratio" to a double, `null` ok.
5152
- `ratio|Double` - Same as above, coerces the string field "ratio" to a double, `null` ok.
52-
- `name +> firstName|String` - assigns the value of the field "name" to the field "firstName" as a string. The field `name` is retained.
53-
- `name pass:[->] firstName|String` - assigns the value of the field "name" to the field "firstName" as a string. The field `name` is discarded (dropped from the tuple stream).
53+
- `name +> firstName|String` - assigns the value of the field "name" to the field "firstName" as a string.
54+
The field `name` is retained.
55+
- `name pass:[->] firstName|String` - assigns the value of the field "name" to the field "firstName" as a string.
56+
The field `name` is discarded (dropped from the tuple stream).
5457
- `password pass:[->]` - discards the field `password` from the tuple stream.
5558

5659
==== Expressions
5760

58-
Expressions are applied to incoming fields and the results are assigned to a new field. Expressions can have zero or
59-
more field arguments.
61+
Expressions are applied to incoming fields and the results are assigned to a new field.
62+
Expressions can have zero or more field arguments.
6063

6164
There are two types of expression:
6265

@@ -65,8 +68,7 @@ There are two types of expression:
6568

6669
NOTE: Many more expression types are planned, including native support for regular expressions and JSON paths.
6770

68-
Current only `intrinsic` functions are supported. `intrinsic` functions are built-in functions, with optional
69-
parameters
71+
Current only `intrinsic` functions are supported. `intrinsic` functions are built-in functions, with optional parameters
7072

7173
No arguments:: `^intrinsic{} +> new_field|type`
7274
No arguments, with parameters:: `^intrinsic{param1:value1, param2:value2} +> new_field|type`
@@ -84,14 +86,16 @@ Built-in functions on fields can be applied to one or more fields in every tuple
8486
`tsid`:: create a unique id as a long or string (using https://github.com/f4b6a3/tsid-creator)
8587
Def:::
8688
`^tsid{node:...,nodeCount:...,epoch:...,format:...,counterToZero:...} +> intoField|type`
87-
`type`:::: must be `string` or `long`, defaults to `long`. When `string`, the `format` is honored.
89+
`type`:::: must be `string` or `long`, defaults to `long`.
90+
When `string`, the `format` is honored.
8891
Params:::
8992
`node`:::: The node id, defaults to a random int.
9093
* If a string is provided, it is hashed to an int.
9194
* `SIP_HASHER.hashString(s, StandardCharsets.UTF_8).asInt() % nodeCount;`
9295
`nodeCount`:::: The number of nodes, defaults to `1024`
9396
`epoch`:::: - The epoch, defaults to `Instant.parse("2020-01-01T00:00:00.000Z").toEpochMilli()`
94-
`format`:::: The format, defaults to `null`. Example: `K%S` where `%S` is a placeholder.
97+
`format`:::: The format, defaults to `null`.
98+
Example: `K%S` where `%S` is a placeholder.
9599
Placeholders:::::
96100
- `%S`: canonical string in upper case
97101
- `%s`: canonical string in lower case
@@ -118,3 +122,15 @@ Def:::
118122
`fromJson`:: converts a json string to a row/tuple
119123
Def:::
120124
- `^fromJson{} -> to_field1 + to_field2` - the fields names must match the json properties at the root node
125+
126+
`formatFields`:: reformats all the argument fields to a new format.
127+
This is especially useful for remaining compatible with Apache Parquet.
128+
Def:::
129+
- `^formatFields{format:...} ->` - replace ALL fields with the formatted result
130+
- `^formatFields{format:...} +>` - append ALL fields with the formatted result
131+
Params:::
132+
`format`:::: The format string.
133+
- `lowerUnderscore` - converts to lower case and replaces `/\ .-` with underscores.
134+
The default.
135+
- `upperUnderscore` - converts to upper case and replaces `/\ .-` with underscores
136+
- `camelCase` - converts to camel case

tessellate-main/src/main/java/io/clusterless/tessellate/parser/ast/IntrinsicParams.java

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -25,46 +25,46 @@ public Map<String, String> params() {
2525
return params;
2626
}
2727

28-
public String getString(String param) {
29-
return params.get(param);
28+
public Optional<String> getString(String param) {
29+
return Optional.ofNullable(params.get(param));
3030
}
3131

32-
public Boolean getBoolean(String param) {
32+
public Optional<Boolean> getBoolean(String param) {
3333
String value = params.get(param);
3434

3535
if (value == null) {
36-
return null;
36+
return Optional.empty();
3737
}
3838

39-
return Boolean.parseBoolean(value);
39+
return Optional.of(Boolean.parseBoolean(value));
4040
}
4141

4242
public Integer getInteger(String param, Function<String, Integer> otherwise) {
4343
try {
44-
return getInteger(param);
44+
return getInteger(param).orElse(null);
4545
} catch (NumberFormatException e) {
46-
return otherwise.apply(getString(param));
46+
return otherwise.apply(getString(param).orElse(null));
4747
}
4848
}
4949

50-
public Integer getInteger(String param) {
50+
public Optional<Integer> getInteger(String param) {
5151
String value = params.get(param);
5252

5353
if (value == null) {
54-
return null;
54+
return Optional.empty();
5555
}
5656

57-
return Integer.parseInt(value);
57+
return Optional.of(Integer.parseInt(value));
5858
}
5959

60-
public Long getLong(String param) {
60+
public Optional<Long> getLong(String param) {
6161
String value = params.get(param);
6262

6363
if (value == null) {
64-
return null;
64+
return Optional.empty();
6565
}
6666

67-
return Long.parseLong(value);
67+
return Optional.of(Long.parseLong(value));
6868
}
6969

7070
@Override

tessellate-main/src/main/java/io/clusterless/tessellate/pipeline/Intrinsics.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ private static void add(IntrinsicBuilder intrinsicBuilder) {
2626
add(new FixedWidthIntrinsic());
2727
add(new ToJsonIntrinsic());
2828
add(new FromJsonIntrinsic());
29+
add(new FormatFieldsIntrinsic());
2930
}
3031

3132
public static Map<String, IntrinsicBuilder> builders() {

tessellate-main/src/main/java/io/clusterless/tessellate/pipeline/Transformer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ private IntrinsicBuilder.Result create(PipelineContext context, Operation operat
128128
throw new IllegalArgumentException("unknown intrinsic function: " + intrinsic.name());
129129
}
130130

131-
IntrinsicBuilder.Result result = intrinsicBuilder.create(operation);
131+
IntrinsicBuilder.Result result = intrinsicBuilder.create(context.currentFields, operation);
132132

133133
context.log.info("transform {}: from: {}, to: {}, having: {}", intrinsicBuilder.name(), result.arguments(), result.results(), ((Intrinsic) operation.exp()).params());
134134

tessellate-main/src/main/java/io/clusterless/tessellate/pipeline/intrinsic/FixedWidthIntrinsic.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,12 +23,12 @@ public FixedWidthIntrinsic() {
2323
}
2424

2525
@Override
26-
public Result create(Operation operation) {
26+
public Result create(Fields currentFields, Operation operation) {
2727
Fields toFields = fieldsParser().asFields(operation.results());
2828
Intrinsic intrinsic = operation.exp();
2929

30-
Integer width = intrinsic.params().getInteger(WIDTH);
31-
Integer insertAt = intrinsic.params().getInteger(INSERT_AT);
30+
Integer width = intrinsic.params().getInteger(WIDTH).orElse(null);
31+
Integer insertAt = intrinsic.params().getInteger(INSERT_AT).orElse(null);
3232

3333
// if not given, we will insert at the end
3434
if (insertAt == null) {
Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,89 @@
1+
/*
2+
* Copyright (c) 2023-2025 Chris K Wensel <[email protected]>. All Rights Reserved.
3+
*
4+
* This Source Code Form is subject to the terms of the Mozilla Public
5+
* License, v. 2.0. If a copy of the MPL was not distributed with this
6+
* file, You can obtain one at http://mozilla.org/MPL/2.0/.
7+
*/
8+
9+
package io.clusterless.tessellate.pipeline.intrinsic;
10+
11+
import cascading.operation.Identity;
12+
import cascading.tuple.Fields;
13+
import clusterless.commons.util.Strings;
14+
import io.clusterless.tessellate.parser.ast.Intrinsic;
15+
import io.clusterless.tessellate.parser.ast.Operation;
16+
import io.clusterless.tessellate.util.StringHelper;
17+
18+
import java.lang.reflect.Type;
19+
import java.util.function.BiFunction;
20+
21+
public class FormatFieldsIntrinsic extends IntrinsicBuilder {
22+
23+
public static final String FORMAT = "format";
24+
public static final String REGEX = "[/\\\\ .-]";
25+
26+
public FormatFieldsIntrinsic() {
27+
super("formatFields", FORMAT);
28+
}
29+
30+
@Override
31+
public Result create(Fields currentFields, Operation operation) {
32+
Fields fromFields = fieldsParser().asFields(operation.arguments());
33+
34+
// doesn't make sense to make an empty json object
35+
if (fromFields.isNone()) {
36+
fromFields = currentFields;
37+
}
38+
39+
Intrinsic intrinsic = operation.exp();
40+
String format = intrinsic.params().getString(FORMAT).orElse("lowerUnderscore");
41+
42+
BiFunction<Comparable, Type, Comparable> function;
43+
44+
switch (format) {
45+
case "camelCase":
46+
function = FormatFieldsIntrinsic::camelCase;
47+
break;
48+
case "upperUnderscore":
49+
function = FormatFieldsIntrinsic::upperUnderscore;
50+
break;
51+
case "lowerUnderscore":
52+
function = FormatFieldsIntrinsic::lowerUnderscore;
53+
break;
54+
default:
55+
throw new IllegalStateException("Unexpected value: " + format);
56+
}
57+
58+
Fields toFields = fromFields.rename(function);
59+
60+
Identity identity = new Identity(toFields);
61+
62+
return new Result(fromFields, identity, toFields);
63+
}
64+
65+
public static Comparable<?> lowerUnderscore(Comparable<?> comparable, Type type) {
66+
String string = comparable.toString().replaceAll(REGEX, "_");
67+
68+
if (!string.contains("_")) {
69+
return Strings.camelToLowerUnderscore(StringHelper.convertConsecutiveUpperCase(string));
70+
}
71+
72+
return string.toLowerCase();
73+
}
74+
75+
public static Comparable<?> upperUnderscore(Comparable<?> comparable, Type type) {
76+
String string = comparable.toString().replaceAll(REGEX, "_");
77+
78+
if (!string.contains("_")) {
79+
return Strings.camelToUpperUnderscore(StringHelper.convertConsecutiveUpperCase(string));
80+
}
81+
82+
return string.toUpperCase();
83+
}
84+
85+
public static Comparable<?> camelCase(Comparable<?> comparable, Type type) {
86+
String string = comparable.toString().replaceAll(REGEX, "");
87+
return Strings.lowerUnderscoreToCamelCase(lowerUnderscore(string, type).toString());
88+
}
89+
}

tessellate-main/src/main/java/io/clusterless/tessellate/pipeline/intrinsic/FromJsonIntrinsic.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ public FromJsonIntrinsic() {
2222
}
2323

2424
@Override
25-
public Result create(Operation operation) {
25+
public Result create(Fields currentFields, Operation operation) {
2626
Fields fromFields = fieldsParser().asFields(operation.arguments());
2727

2828
// doesn't make sense to make an empty json object

tessellate-main/src/main/java/io/clusterless/tessellate/pipeline/intrinsic/IntrinsicBuilder.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@ public FieldsParser fieldsParser() {
7070
return FieldsParser.INSTANCE;
7171
}
7272

73-
public abstract Result create(Operation operation);
73+
public abstract Result create(Fields currentFields, Operation operation);
7474

7575
protected static void requireParam(Object value, String message) {
7676
if (value == null) {

tessellate-main/src/main/java/io/clusterless/tessellate/pipeline/intrinsic/ToJsonIntrinsic.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ public ToJsonIntrinsic() {
1919
}
2020

2121
@Override
22-
public Result create(Operation operation) {
22+
public Result create(Fields currentFields, Operation operation) {
2323
Fields fromFields = fieldsParser().asFields(operation.arguments());
2424

2525
// doesn't make sense to make an empty json object

tessellate-main/src/main/java/io/clusterless/tessellate/pipeline/intrinsic/TsidIntrinsic.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,20 +32,20 @@ public TsidIntrinsic() {
3232
}
3333

3434
@Override
35-
public Result create(Operation operation) {
35+
public Result create(Fields currentFields, Operation operation) {
3636
// default to long if not provided
3737
Fields toFields = fieldsParser().asFields(operation.results(), Long.TYPE);
3838
Intrinsic intrinsic = operation.exp();
3939

40-
Integer nodeCount = intrinsic.params().getInteger(NODE_COUNT);
40+
Integer nodeCount = intrinsic.params().getInteger(NODE_COUNT).orElse(null);
4141
Integer node = intrinsic.params().getInteger(NODE, s -> {
4242
requireParam(nodeCount, "nodeCount param required when providing a string node value");
4343
return Math.abs(SIP.hashString(s, StandardCharsets.UTF_8).asInt()) % nodeCount;
4444
});
4545

46-
Long epoch = intrinsic.params().getLong(EPOCH);
47-
String format = intrinsic.params().getString(FORMAT);
48-
Boolean counterZero = intrinsic.params().getBoolean(COUNTER_TO_ZERO);
46+
Long epoch = intrinsic.params().getLong(EPOCH).orElse(null);
47+
String format = intrinsic.params().getString(FORMAT).orElse(null);
48+
Boolean counterZero = intrinsic.params().getBoolean(COUNTER_TO_ZERO).orElse(null);
4949

5050
TsidFunction function = new TsidFunction(toFields, node, nodeCount, epoch, format, counterZero);
5151

0 commit comments

Comments
 (0)