Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions paimon-api/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -3361,6 +3361,10 @@ public boolean overwriteUpgrade() {
return options.get(OVERWRITE_UPGRADE);
}

public int readBatchSize() {
return options.get(READ_BATCH_SIZE);
}

/** Specifies the merge engine for table with primary key. */
public enum MergeEngine implements DescribedEnum {
DEDUPLICATE("deduplicate", "De-duplicate and keep the last row."),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,38 +58,140 @@ public class DataEvolutionFileReader implements RecordReader<InternalRow> {
private final int[] fieldOffsets;
private final RecordReader<InternalRow>[] readers;

/**
* Cached iterators for each inner reader. An entry is non-null if the corresponding reader
* currently has an opened batch which is not fully consumed.
*/
private final RecordIterator<InternalRow>[] pending;

/** Marks whether the corresponding reader has reached end of input. */
private final boolean[] finished;

/** Maximum number of rows produced for each outer {@link #readBatch()} call. */
private final int batchSize;

/** Number of rows emitted by current outer iterator. */
private int rowsEmittedInCurrentBatch;

/** Whether any inner reader has reached end of input. */
private boolean endOfInput;

@SuppressWarnings("unchecked")
public DataEvolutionFileReader(
int[] rowOffsets, int[] fieldOffsets, RecordReader<InternalRow>[] readers) {
int[] rowOffsets,
int[] fieldOffsets,
RecordReader<InternalRow>[] readers,
int batchSize) {
checkArgument(rowOffsets != null, "Row offsets must not be null");
checkArgument(fieldOffsets != null, "Field offsets must not be null");
checkArgument(
rowOffsets.length == fieldOffsets.length,
"Row offsets and field offsets must have the same length");
checkArgument(rowOffsets.length > 0, "Row offsets must not be empty");
checkArgument(readers != null && readers.length > 1, "Readers should be more than 1");
checkArgument(batchSize > 0, "Batch size must be greater than 0");

this.rowOffsets = rowOffsets;
this.fieldOffsets = fieldOffsets;
this.readers = readers;
this.batchSize = batchSize;

this.pending = new RecordIterator[readers.length];
this.finished = new boolean[readers.length];
this.rowsEmittedInCurrentBatch = 0;
this.endOfInput = false;
}

@Override
@Nullable
public RecordIterator<InternalRow> readBatch() throws IOException {
if (endOfInput) {
return null;
}

rowsEmittedInCurrentBatch = 0;
DataEvolutionRow row = new DataEvolutionRow(readers.length, rowOffsets, fieldOffsets);
RecordIterator<InternalRow>[] iterators = new RecordIterator[readers.length];
return new DataEvolutionAlignedIterator(this, row);
}

@Nullable
InternalRow nextRow(DataEvolutionRow row) throws IOException {
if (endOfInput) {
return null;
}

// Fetch one row from each non-null reader.
for (int i = 0; i < readers.length; i++) {
RecordReader<InternalRow> reader = readers[i];
if (reader != null) {
RecordIterator<InternalRow> batch = reader.readBatch();
if (batch == null) {
// all readers are aligned, as long as one returns null, the others will also
// have no data
if (readers[i] == null) {
// This reader does not contribute any fields.
continue;
}

InternalRow buffered = fetchNextFromReader(i);
if (buffered == null) {
markEndOfInput();
return null;
}

row.setRow(i, buffered);
}

return row;
}

private InternalRow fetchNextFromReader(int readerIndex) throws IOException {
while (true) {
if (finished[readerIndex]) {
return null;
}

RecordIterator<InternalRow> iterator = pending[readerIndex];
if (iterator == null) {
iterator = readers[readerIndex].readBatch();
if (iterator == null) {
finished[readerIndex] = true;
return null;
}
iterators[i] = batch;
pending[readerIndex] = iterator;
}

InternalRow next = iterator.next();
if (next != null) {
return next;
}

// current batch is exhausted, release and try next batch
iterator.releaseBatch();
pending[readerIndex] = null;
}
return new DataEvolutionIterator(row, iterators);
}

private void markEndOfInput() {
endOfInput = true;
// Release all pending batches.
for (int i = 0; i < pending.length; i++) {
RecordIterator<InternalRow> iterator = pending[i];
if (iterator != null) {
iterator.releaseBatch();
pending[i] = null;
}
}
}

boolean isEndOfInput() {
return endOfInput;
}

int getBatchSize() {
return batchSize;
}

int getRowsEmittedInCurrentBatch() {
return rowsEmittedInCurrentBatch;
}

void incrementRowsEmittedInCurrentBatch() {
rowsEmittedInCurrentBatch++;
}

@Override
Expand All @@ -100,4 +202,40 @@ public void close() throws IOException {
throw new IOException("Failed to close inner readers", e);
}
}

/**
* A {@link org.apache.paimon.reader.RecordReader.RecordIterator} which aligns rows from
* multiple inner readers and assembles them into a {@link DataEvolutionRow}.
*/
class DataEvolutionAlignedIterator implements RecordReader.RecordIterator<InternalRow> {

private final DataEvolutionFileReader fileReader;
private final DataEvolutionRow row;

DataEvolutionAlignedIterator(DataEvolutionFileReader fileReader, DataEvolutionRow row) {
this.fileReader = fileReader;
this.row = row;
}

@Nullable
@Override
public InternalRow next() throws IOException {
if (fileReader.isEndOfInput()
|| fileReader.getRowsEmittedInCurrentBatch() >= fileReader.getBatchSize()) {
return null;
}

InternalRow nextRow = fileReader.nextRow(row);
if (nextRow != null) {
fileReader.incrementRowsEmittedInCurrentBatch();
}
return nextRow;
}

@Override
public void releaseBatch() {
// Batches of inner readers are released when they are exhausted inside
// {@link DataEvolutionFileReader}. Nothing to do here.
}
}
}

This file was deleted.

Loading