Skip to content

Commit 415d76b

Browse files
committed
fix(lakehouse): prevent ClassCastException on mixed numeric types in comparisons
Normalize Number types to double before comparison in TopKMerger.compareValues() and AggregationReducer min/max. Integer.compareTo(Long) throws ClassCastException because compareTo expects the same type. DataFusion can return different numeric widths across workers for the same column.
1 parent c9f58c7 commit 415d76b

4 files changed

Lines changed: 56 additions & 10 deletions

File tree

sandbox/plugins/lakehouse-iceberg/src/main/java/org/opensearch/lakehouse/distributed/AggregationReducer.java

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -84,13 +84,12 @@ static Object sumColumn(List<WorkerQueryResponse> responses, int colIdx) {
8484
* @param colIdx the column index to minimize
8585
* @return the minimum value, or null if all values are null
8686
*/
87-
@SuppressWarnings("unchecked")
8887
static Object minColumn(List<WorkerQueryResponse> responses, int colIdx) {
89-
Comparable<Object> min = null;
88+
Object min = null;
9089
for (WorkerQueryResponse r : responses) {
9190
if (hasValue(r, colIdx)) {
92-
Comparable<Object> val = (Comparable<Object>) r.getColumnData()[colIdx][0];
93-
if (min == null || val.compareTo((Object) min) < 0) {
91+
Object val = r.getColumnData()[colIdx][0];
92+
if (min == null || compareForAgg(val, min) < 0) {
9493
min = val;
9594
}
9695
}
@@ -106,20 +105,35 @@ static Object minColumn(List<WorkerQueryResponse> responses, int colIdx) {
106105
* @param colIdx the column index to maximize
107106
* @return the maximum value, or null if all values are null
108107
*/
109-
@SuppressWarnings("unchecked")
110108
static Object maxColumn(List<WorkerQueryResponse> responses, int colIdx) {
111-
Comparable<Object> max = null;
109+
Object max = null;
112110
for (WorkerQueryResponse r : responses) {
113111
if (hasValue(r, colIdx)) {
114-
Comparable<Object> val = (Comparable<Object>) r.getColumnData()[colIdx][0];
115-
if (max == null || val.compareTo((Object) max) > 0) {
112+
Object val = r.getColumnData()[colIdx][0];
113+
if (max == null || compareForAgg(val, max) > 0) {
116114
max = val;
117115
}
118116
}
119117
}
120118
return max;
121119
}
122120

121+
/**
122+
* Compares two values for aggregation, handling mixed numeric types safely.
123+
* Normalizes Number types to double to avoid ClassCastException when comparing
124+
* different widths (e.g., Integer vs Long from different workers).
125+
*/
126+
@SuppressWarnings("unchecked")
127+
private static int compareForAgg(Object v1, Object v2) {
128+
if (v1 instanceof Number && v2 instanceof Number) {
129+
return Double.compare(((Number) v1).doubleValue(), ((Number) v2).doubleValue());
130+
}
131+
if (v1 instanceof Comparable && v2 instanceof Comparable) {
132+
return ((Comparable<Object>) v1).compareTo(v2);
133+
}
134+
return v1.toString().compareTo(v2.toString());
135+
}
136+
123137
/**
124138
* Finds the first non-null value in a column across all worker responses.
125139
*/

sandbox/plugins/lakehouse-iceberg/src/main/java/org/opensearch/lakehouse/distributed/TopKMerger.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -99,13 +99,19 @@ static Comparator<Object[]> buildComparator(int[] sortColumns, boolean[] sortAsc
9999
* @param v2 second value
100100
* @return negative if v1 &lt; v2, positive if v1 &gt; v2, zero if equal
101101
*/
102-
@SuppressWarnings("unchecked")
103102
static int compareValues(Object v1, Object v2) {
104103
if (v1 == null && v2 == null) return 0;
105104
if (v1 == null) return 1;
106105
if (v2 == null) return -1;
106+
// Normalize numeric types to avoid ClassCastException when comparing
107+
// different widths (e.g., Integer vs Long from different workers)
108+
if (v1 instanceof Number && v2 instanceof Number) {
109+
return Double.compare(((Number) v1).doubleValue(), ((Number) v2).doubleValue());
110+
}
107111
if (v1 instanceof Comparable && v2 instanceof Comparable) {
108-
return ((Comparable<Object>) v1).compareTo(v2);
112+
@SuppressWarnings("unchecked")
113+
int cmp = ((Comparable<Object>) v1).compareTo(v2);
114+
return cmp;
109115
}
110116
return v1.toString().compareTo(v2.toString());
111117
}

sandbox/plugins/lakehouse-iceberg/src/test/java/org/opensearch/lakehouse/distributed/AggregationReducerTests.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,21 @@ public void testMaxColumnSkipsEmptyResponses() {
220220
assertEquals(42, result);
221221
}
222222

223+
public void testMinColumnMixedNumericTypes() {
224+
// Integer vs Long — previously threw ClassCastException
225+
WorkerQueryResponse r1 = makeResponse(new Object[][]{{Integer.valueOf(10)}}, 1);
226+
WorkerQueryResponse r2 = makeResponse(new Object[][]{{Long.valueOf(5)}}, 1);
227+
Object result = AggregationReducer.minColumn(List.of(r1, r2), 0);
228+
assertEquals(Long.valueOf(5), result);
229+
}
230+
231+
public void testMaxColumnMixedNumericTypes() {
232+
WorkerQueryResponse r1 = makeResponse(new Object[][]{{Integer.valueOf(10)}}, 1);
233+
WorkerQueryResponse r2 = makeResponse(new Object[][]{{Long.valueOf(5)}}, 1);
234+
Object result = AggregationReducer.maxColumn(List.of(r1, r2), 0);
235+
assertEquals(Integer.valueOf(10), result);
236+
}
237+
223238
// --- Helper ---
224239

225240
private WorkerQueryResponse makeResponse(Object[][] columnData, int rowCount) {

sandbox/plugins/lakehouse-iceberg/src/test/java/org/opensearch/lakehouse/distributed/TopKMergerTests.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,17 @@ public String toString() {
149149
assertTrue(TopKMerger.compareValues(a, b) < 0);
150150
}
151151

152+
public void testCompareValuesMixedNumericTypes() {
153+
// Integer vs Long — previously threw ClassCastException
154+
assertTrue(TopKMerger.compareValues(Integer.valueOf(5), Long.valueOf(10)) < 0);
155+
assertTrue(TopKMerger.compareValues(Long.valueOf(10), Integer.valueOf(5)) > 0);
156+
assertEquals(0, TopKMerger.compareValues(Integer.valueOf(5), Long.valueOf(5)));
157+
// Integer vs Double
158+
assertTrue(TopKMerger.compareValues(Integer.valueOf(3), Double.valueOf(3.5)) < 0);
159+
// Float vs Long
160+
assertTrue(TopKMerger.compareValues(Float.valueOf(2.5f), Long.valueOf(3)) < 0);
161+
}
162+
152163
// --- buildComparator tests ---
153164

154165
public void testBuildComparatorAscending() {

0 commit comments

Comments
 (0)