4949 * Fragment to handle all file name extraction operations.
5050 */
5151public final class FileNameFragment extends ConfigFragment {
52+ /**
53+ * Flag to support Prefix Template as opposed to a prefix string. TODO To be removed when all implementations
54+ * support the prefix template.
55+ */
56+ public enum PrefixTemplateSupport {
57+ TRUE , FALSE
58+ }
5259 /**
5360 * The name of the group that this fragment places items in.
5461 */
@@ -67,6 +74,8 @@ public final class FileNameFragment extends ConfigFragment {
6774 static final String DEFAULT_FILENAME_TEMPLATE = "{{topic}}-{{partition}}-{{start_offset}}" ;
6875 @ VisibleForTesting
6976 public static final String FILE_PATH_PREFIX_TEMPLATE_CONFIG = "file.prefix.template" ;
77+ @ VisibleForTesting
78+ public static final String FILE_NAME_PREFIX_CONFIG = "file.name.prefix" ;
7079
7180 public static final ConfigDef .Validator COMPRESSION_TYPE_VALIDATOR = new PredicateGatedValidator (Objects ::nonNull ,
7281 ConfigDef .CaseInsensitiveValidString .in (CompressionType .names ().toArray (new String [0 ])));
@@ -77,7 +86,7 @@ public final class FileNameFragment extends ConfigFragment {
7786 .collect (Collectors .toList ())
7887 .toArray (new String [0 ]));
7988
80- public static final ConfigDef .Validator TEMPLATE_VALIDATOR = ConfigDef .LambdaValidator .with ((name , value ) -> {
89+ public static final ConfigDef .Validator PREFIX_VALIDATOR = ConfigDef .LambdaValidator .with ((name , value ) -> {
8190 if (value == null ) {
8291 return ;
8392 }
@@ -90,18 +99,24 @@ public final class FileNameFragment extends ConfigFragment {
9099 } catch (IllegalArgumentException e ) {
91100 throw new ConfigException (name , value , e .getMessage ());
92101 }
93- }, () -> "See documentation for proper template construction. " );
102+ }, () -> "may not start with '.well-known/acme-challenge' " );
94103
95104 /**
96105 * The flag that indicates this fragment is being used as for a sink connector.
97106 */
98107 private final boolean isSink ;
99108
109+ /** Map of template variable name to the template variable definition */
100110 private static final Map <String , FilenameTemplateVariable > FILENAME_VARIABLES = new TreeMap <>();
111+ private static final String TEMPLATE_GROUPINGS ;
101112
102113 static {
103114 Arrays .stream (FilenameTemplateVariable .values ())
104115 .forEach (variable -> FILENAME_VARIABLES .put (variable .name , variable ));
116+ TEMPLATE_GROUPINGS = "[" + RecordGrouperFactory .getSupportedVariableGroups ()
117+ .stream ()
118+ .map (strings -> FilePatternUtils .asPatterns (strings , ", " ))
119+ .collect (Collectors .joining ("] \n [" )) + "]" ;
105120 }
106121
107122 /**
@@ -253,51 +268,44 @@ private void validateSink(final Map<String, ConfigValue> configMap, final Templa
253268 }
254269 }
255270
256- /**
257- * Adds the FileName properties to the configuration definition.
258- *
259- * @param configDef
260- * the configuration definition to update.
261- * @return the updated configuration definition.
262- */
263- public static int update (final ConfigDef configDef ) {
264- return update (configDef , CompressionType .NONE );
265- }
266-
267271 /**
268272 * Adds the FileName properties to the configuration definition.
269273 *
270274 * @param configDef
271275 * the configuration definition to update.
272276 * @param defaultCompressionType
273277 * The default compression type. May be {@code null}.
274- * @return number of items in the file group..
278+ * @return number of items in the file group.
275279 */
276- public static int update (final ConfigDef configDef , final CompressionType defaultCompressionType ) {
280+ public static int update (final ConfigDef configDef , final CompressionType defaultCompressionType ,
281+ final PrefixTemplateSupport prefixTemplateSupport ) {
277282 int fileGroupCounter = 0 ;
278283
279- configDef .define (FILE_NAME_TEMPLATE_CONFIG , ConfigDef .Type .STRING , null , TEMPLATE_VALIDATOR ,
284+ configDef .define (FILE_NAME_TEMPLATE_CONFIG , ConfigDef .Type .STRING , null , PREFIX_VALIDATOR ,
280285 ConfigDef .Importance .MEDIUM ,
281286 "The template for file names on storage system. "
282287 + "Supports `{{ variable }}` placeholders for substituting variables. "
283- + "Currently supported variables are `topic`, `partition`, and `start_offset` "
284- + "(the offset of the first record in the file). "
285- + "Only some combinations of variables are valid, which currently are:\n "
286- + "- `topic`, `partition`, `start_offset`."
287- + "There is also `key` only variable {{key}} for grouping by keys" ,
288+ + "Currently supported variables are "
289+ + String .join (", " , FilePatternUtils .asPatterns (FILENAME_VARIABLES .keySet (), ", " ))
290+ + ". Only some combinations of variables are valid, which currently are: " + TEMPLATE_GROUPINGS ,
288291 GROUP_NAME , ++fileGroupCounter , ConfigDef .Width .LONG , FILE_NAME_TEMPLATE_CONFIG );
289292
290- configDef .define (FILE_PATH_PREFIX_TEMPLATE_CONFIG , ConfigDef .Type .STRING , null , TEMPLATE_VALIDATOR ,
291- ConfigDef .Importance .MEDIUM ,
292- "The template for file names prefixes on storage system. "
293- + "Supports `{{ variable }}` placeholders for substituting variables. "
294- + "Currently supported variables are `topic`, `partition`, and `start_offset` "
295- + "(the offset of the first record in the file). "
296- + "Only some combinations of variables are valid, which currently are:\n "
297- + "- `topic`, `partition`, `start_offset`."
298- + "There is also `key` only variable {{key}} for grouping by keys" ,
299- GROUP_NAME , ++fileGroupCounter , ConfigDef .Width .LONG , FILE_PATH_PREFIX_TEMPLATE_CONFIG );
300-
293+ if (prefixTemplateSupport .equals (PrefixTemplateSupport .TRUE )) {
294+ configDef .define (FILE_PATH_PREFIX_TEMPLATE_CONFIG , ConfigDef .Type .STRING , null , PREFIX_VALIDATOR ,
295+ ConfigDef .Importance .MEDIUM ,
296+ "The template for file names prefixes on storage system. "
297+ + "Supports `{{ variable }}` placeholders for substituting variables. "
298+ + "Currently supported variables are `topic`, `partition`, and `start_offset` "
299+ + "(the offset of the first record in the file). "
300+ + "Only some combinations of variables are valid, which currently are:\n "
301+ + "- `topic`, `partition`, `start_offset`."
302+ + "There is also `key` only variable {{key}} for grouping by keys" ,
303+ GROUP_NAME , ++fileGroupCounter , ConfigDef .Width .LONG , FILE_PATH_PREFIX_TEMPLATE_CONFIG );
304+ } else {
305+ configDef .define (FILE_NAME_PREFIX_CONFIG , ConfigDef .Type .STRING , "" , PREFIX_VALIDATOR ,
306+ ConfigDef .Importance .MEDIUM , "The prefix to be added to the name of each file." ,
307+ FileNameFragment .GROUP_NAME , ++fileGroupCounter , ConfigDef .Width .LONG , FILE_NAME_PREFIX_CONFIG );
308+ }
301309 configDef .define (FILE_COMPRESSION_TYPE_CONFIG , ConfigDef .Type .STRING , defaultCompressionType .name (),
302310 COMPRESSION_TYPE_VALIDATOR , ConfigDef .Importance .MEDIUM , "The compression type used for files." ,
303311 GROUP_NAME , ++fileGroupCounter , ConfigDef .Width .NONE , FILE_COMPRESSION_TYPE_CONFIG ,
@@ -394,6 +402,9 @@ public String getPrefixTemplate() {
394402 return getString (FILE_PATH_PREFIX_TEMPLATE_CONFIG );
395403 }
396404
405+ public String getPrefix () {
406+ return getString (FILE_NAME_PREFIX_CONFIG );
407+ }
397408 public static void replaceYyyyUppercase (final String name , final Map <String , String > properties ) {
398409 String template = properties .get (name );
399410 if (template != null ) {
@@ -522,5 +533,16 @@ public Setter template(final String template) {
522533 public Setter prefixTemplate (final String prefixTemplate ) {
523534 return setValue (FILE_PATH_PREFIX_TEMPLATE_CONFIG , prefixTemplate );
524535 }
536+
537+ /**
538+ * Sets the file name prefix template.
539+ *
540+ * @param prefix
541+ * the prefix to use.
542+ * @return this
543+ */
544+ public Setter prefix (final String prefix ) {
545+ return setValue (FILE_NAME_PREFIX_CONFIG , prefix );
546+ }
525547 }
526548}
0 commit comments