Skip to content

Commit 4bc5622

Browse files
committed
refactor: Add Lombok annotations to hudi-utilities (Part 3)
1 parent ac4bb3a commit 4bc5622

33 files changed

+280
-504
lines changed

hudi-utilities/pom.xml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,6 +230,12 @@
230230
<artifactId>log4j-1.2-api</artifactId>
231231
</dependency>
232232

233+
<!-- Lombok -->
234+
<dependency>
235+
<groupId>org.projectlombok</groupId>
236+
<artifactId>lombok</artifactId>
237+
</dependency>
238+
233239
<!-- Fasterxml -->
234240
<dependency>
235241
<groupId>com.fasterxml.jackson.module</groupId>

hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/BootstrapExecutor.java

Lines changed: 6 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -44,12 +44,12 @@
4444
import org.apache.hudi.utilities.UtilHelpers;
4545
import org.apache.hudi.utilities.schema.SchemaProvider;
4646

47+
import lombok.Getter;
48+
import lombok.extern.slf4j.Slf4j;
4749
import org.apache.hadoop.conf.Configuration;
4850
import org.apache.hadoop.fs.FileSystem;
4951
import org.apache.hadoop.fs.Path;
5052
import org.apache.spark.api.java.JavaSparkContext;
51-
import org.slf4j.Logger;
52-
import org.slf4j.LoggerFactory;
5353

5454
import java.io.IOException;
5555
import java.io.Serializable;
@@ -71,10 +71,9 @@
7171
/**
7272
* Performs bootstrap from a non-hudi source.
7373
*/
74+
@Slf4j
7475
public class BootstrapExecutor implements Serializable {
7576

76-
private static final Logger LOG = LoggerFactory.getLogger(BootstrapExecutor.class);
77-
7877
/**
7978
* Config.
8079
*/
@@ -103,6 +102,7 @@ public class BootstrapExecutor implements Serializable {
103102
/**
104103
* Bootstrap Configuration.
105104
*/
105+
@Getter
106106
private final HoodieWriteConfig bootstrapConfig;
107107

108108
/**
@@ -146,7 +146,7 @@ public BootstrapExecutor(HoodieStreamer.Config cfg, JavaSparkContext jssc, FileS
146146
builder = builder.withSchema(schemaProvider.getTargetHoodieSchema().toString());
147147
}
148148
this.bootstrapConfig = builder.build();
149-
LOG.info("Created bootstrap executor with configs : " + bootstrapConfig.getProps());
149+
log.info("Created bootstrap executor with configs: {}", bootstrapConfig.getProps());
150150
}
151151

152152
/**
@@ -189,7 +189,7 @@ private void initializeTable() throws IOException {
189189
Path basePath = new Path(cfg.targetBasePath);
190190
if (fs.exists(basePath)) {
191191
if (cfg.bootstrapOverwrite) {
192-
LOG.info("Target base path already exists, overwrite it");
192+
log.info("Target base path already exists, overwrite it");
193193
fs.delete(basePath, true);
194194
} else {
195195
throw new HoodieException("target base path already exists at " + cfg.targetBasePath
@@ -244,8 +244,4 @@ private void initializeTable() throws IOException {
244244

245245
builder.initTable(HadoopFSUtils.getStorageConfWithCopy(jssc.hadoopConfiguration()), cfg.targetBasePath);
246246
}
247-
248-
public HoodieWriteConfig getBootstrapConfig() {
249-
return bootstrapConfig;
250-
}
251247
}

hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/DefaultStreamContext.java

Lines changed: 5 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -21,28 +21,18 @@
2121
import org.apache.hudi.common.util.Option;
2222
import org.apache.hudi.utilities.schema.SchemaProvider;
2323

24+
import lombok.AllArgsConstructor;
25+
import lombok.Getter;
26+
2427
/**
2528
* The default implementation for the StreamContext interface,
2629
* composes SchemaProvider and SourceProfileSupplier currently,
2730
* can be extended for other arguments in the future.
2831
*/
32+
@AllArgsConstructor
33+
@Getter
2934
public class DefaultStreamContext implements StreamContext {
3035

3136
private final SchemaProvider schemaProvider;
3237
private final Option<SourceProfileSupplier> sourceProfileSupplier;
33-
34-
public DefaultStreamContext(SchemaProvider schemaProvider, Option<SourceProfileSupplier> sourceProfileSupplier) {
35-
this.schemaProvider = schemaProvider;
36-
this.sourceProfileSupplier = sourceProfileSupplier;
37-
}
38-
39-
@Override
40-
public SchemaProvider getSchemaProvider() {
41-
return schemaProvider;
42-
}
43-
44-
@Override
45-
public Option<SourceProfileSupplier> getSourceProfileSupplier() {
46-
return sourceProfileSupplier;
47-
}
4838
}

hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/ErrorEvent.java

Lines changed: 4 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -19,45 +19,16 @@
1919

2020
package org.apache.hudi.utilities.streamer;
2121

22-
import java.util.Objects;
22+
import lombok.Value;
2323

2424
/**
2525
* Error event is an event triggered during write or processing failure of a record.
2626
*/
27+
@Value
2728
public class ErrorEvent<T> {
2829

29-
private final ErrorReason reason;
30-
private final T payload;
31-
32-
public ErrorEvent(T payload, ErrorReason reason) {
33-
this.payload = payload;
34-
this.reason = reason;
35-
}
36-
37-
public T getPayload() {
38-
return payload;
39-
}
40-
41-
public ErrorReason getReason() {
42-
return reason;
43-
}
44-
45-
@Override
46-
public boolean equals(Object o) {
47-
if (this == o) {
48-
return true;
49-
}
50-
if (o == null || getClass() != o.getClass()) {
51-
return false;
52-
}
53-
ErrorEvent<?> that = (ErrorEvent<?>) o;
54-
return reason == that.reason && Objects.equals(payload, that.payload);
55-
}
56-
57-
@Override
58-
public int hashCode() {
59-
return Objects.hash(reason, payload);
60-
}
30+
T payload;
31+
ErrorReason reason;
6132

6233
/**
6334
* The reason behind write or processing failure of a record

hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieMultiTableStreamer.java

Lines changed: 12 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -39,12 +39,13 @@
3939

4040
import com.beust.jcommander.JCommander;
4141
import com.beust.jcommander.Parameter;
42+
import lombok.AccessLevel;
43+
import lombok.Getter;
44+
import lombok.extern.slf4j.Slf4j;
4245
import org.apache.hadoop.fs.FileSystem;
4346
import org.apache.hadoop.fs.FileUtil;
4447
import org.apache.hadoop.fs.Path;
4548
import org.apache.spark.api.java.JavaSparkContext;
46-
import org.slf4j.Logger;
47-
import org.slf4j.LoggerFactory;
4849

4950
import java.io.IOException;
5051
import java.io.Serializable;
@@ -69,11 +70,12 @@
6970
* Helps with ingesting incremental data into hoodie datasets for multiple tables.
7071
* Supports COPY_ON_WRITE and MERGE_ON_READ storage types.
7172
*/
73+
@Getter
74+
@Slf4j
7275
public class HoodieMultiTableStreamer {
7376

74-
private static final Logger LOG = LoggerFactory.getLogger(HoodieMultiTableStreamer.class);
75-
7677
private final List<TableExecutionContext> tableExecutionContexts;
78+
@Getter(AccessLevel.NONE)
7779
private transient JavaSparkContext jssc;
7880
private final Set<String> successTables;
7981
private final Set<String> failedTables;
@@ -120,7 +122,7 @@ private void checkIfTableConfigFileExists(String configFolder, FileSystem fs, St
120122
//commonProps are passed as parameter which contain table to config file mapping
121123
private void populateTableExecutionContextList(TypedProperties properties, String configFolder, FileSystem fs, Config config) throws IOException {
122124
List<String> tablesToBeIngested = getTablesToBeIngested(properties);
123-
LOG.info("tables to be ingested via MultiTableDeltaStreamer : " + tablesToBeIngested);
125+
log.info("tables to be ingested via MultiTableDeltaStreamer : {}", tablesToBeIngested);
124126
TableExecutionContext executionContext;
125127
for (String table : tablesToBeIngested) {
126128
String[] tableWithDatabase = table.split("\\.");
@@ -271,11 +273,11 @@ public static void main(String[] args) throws IOException {
271273
}
272274

273275
if (config.enableHiveSync) {
274-
LOG.warn("--enable-hive-sync will be deprecated in a future release; please use --enable-sync instead for Hive syncing");
276+
log.warn("--enable-hive-sync will be deprecated in a future release; please use --enable-sync instead for Hive syncing");
275277
}
276278

277279
if (config.targetTableName != null) {
278-
LOG.warn("--target-table is deprecated and will be removed in a future release due to it's useless;"
280+
log.warn("--target-table is deprecated and will be removed in a future release due to it's useless;"
279281
+ " please use {} to configure multiple target tables", HoodieStreamerConfig.TABLES_TO_BE_INGESTED.key());
280282
}
281283

@@ -466,7 +468,7 @@ public void sync() {
466468
successTables.add(Helpers.getTableWithDatabase(context));
467469
streamer.shutdownGracefully();
468470
} catch (Exception e) {
469-
LOG.error("error while running MultiTableDeltaStreamer for table: " + context.getTableName(), e);
471+
log.error("error while running MultiTableDeltaStreamer for table: {}", context.getTableName(), e);
470472
failedTables.add(Helpers.getTableWithDatabase(context));
471473
} finally {
472474
if (streamer != null) {
@@ -475,9 +477,9 @@ public void sync() {
475477
}
476478
}
477479

478-
LOG.info("Ingestion was successful for topics: " + successTables);
480+
log.info("Ingestion was successful for topics: {}", successTables);
479481
if (!failedTables.isEmpty()) {
480-
LOG.info("Ingestion failed for topics: " + failedTables);
482+
log.info("Ingestion failed for topics: {}", failedTables);
481483
}
482484
}
483485

@@ -493,16 +495,4 @@ public static class Constants {
493495
private static final String UNDERSCORE = "_";
494496
private static final String COMMA_SEPARATOR = ",";
495497
}
496-
497-
public Set<String> getSuccessTables() {
498-
return successTables;
499-
}
500-
501-
public Set<String> getFailedTables() {
502-
return failedTables;
503-
}
504-
505-
public List<TableExecutionContext> getTableExecutionContexts() {
506-
return this.tableExecutionContexts;
507-
}
508498
}

0 commit comments

Comments
 (0)