Skip to content

Commit 9ce565e

Browse files
authored
[Fix-17701][SqlTask] handle duplicate column aliases in SQL result by appending column index suffix (#17702)
1 parent 45a3c8b commit 9ce565e

2 files changed

Lines changed: 159 additions & 1 deletion

File tree

  • dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src

‎dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/main/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTask.java‎

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,9 +49,11 @@
4949
import java.sql.Statement;
5050
import java.util.ArrayList;
5151
import java.util.HashMap;
52+
import java.util.HashSet;
5253
import java.util.List;
5354
import java.util.Map;
5455
import java.util.Optional;
56+
import java.util.Set;
5557
import java.util.regex.Matcher;
5658
import java.util.regex.Pattern;
5759
import java.util.stream.Collectors;
@@ -246,11 +248,23 @@ private String resultProcess(ResultSet resultSet) throws Exception {
246248
if (resultSet != null) {
247249
ResultSetMetaData md = resultSet.getMetaData();
248250
int num = md.getColumnCount();
251+
String[] columnLabels = new String[num];
252+
253+
// Check for duplicates in column definitions (across all columns)
254+
Set<String> uniqueLabels = new HashSet<>(num);
255+
for (int i = 1; i <= num; i++) {
256+
String label = md.getColumnLabel(i);
257+
columnLabels[i - 1] = label;
258+
if (!uniqueLabels.add(label)) {
259+
throw new TaskException("SQL column name conflict: duplicate column name '" + label
260+
+ "'. Please use aliases to ensure unique column names.");
261+
}
262+
}
249263

250264
while (resultSet.next()) {
251265
ObjectNode mapOfColValues = JSONUtils.createObjectNode();
252266
for (int i = 1; i <= num; i++) {
253-
mapOfColValues.set(md.getColumnLabel(i), JSONUtils.toJsonNode(resultSet.getObject(i)));
267+
mapOfColValues.set(columnLabels[i - 1], JSONUtils.toJsonNode(resultSet.getObject(i)));
254268
}
255269
resultJSONArray.add(mapOfColValues);
256270
}

‎dolphinscheduler-task-plugin/dolphinscheduler-task-sql/src/test/java/org/apache/dolphinscheduler/plugin/task/sql/SqlTaskTest.java‎

Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,12 @@
1717

1818
package org.apache.dolphinscheduler.plugin.task.sql;
1919

20+
import static org.mockito.Mockito.mock;
21+
import static org.mockito.Mockito.when;
22+
2023
import org.apache.dolphinscheduler.common.utils.JSONUtils;
2124
import org.apache.dolphinscheduler.plugin.task.api.TaskConstants;
25+
import org.apache.dolphinscheduler.plugin.task.api.TaskException;
2226
import org.apache.dolphinscheduler.plugin.task.api.TaskExecutionContext;
2327
import org.apache.dolphinscheduler.plugin.task.api.enums.DataType;
2428
import org.apache.dolphinscheduler.plugin.task.api.enums.Direct;
@@ -30,13 +34,19 @@
3034
import org.apache.dolphinscheduler.plugin.task.api.utils.ParameterUtils;
3135
import org.apache.dolphinscheduler.spi.enums.DbType;
3236

37+
import java.lang.reflect.InvocationTargetException;
38+
import java.lang.reflect.Method;
39+
import java.sql.ResultSet;
40+
import java.sql.ResultSetMetaData;
3341
import java.util.HashMap;
3442
import java.util.Map;
3543

3644
import org.junit.jupiter.api.Assertions;
3745
import org.junit.jupiter.api.BeforeEach;
3846
import org.junit.jupiter.api.Test;
3947

48+
import com.fasterxml.jackson.databind.node.ArrayNode;
49+
import com.fasterxml.jackson.databind.node.ObjectNode;
4050
import com.google.common.collect.Lists;
4151

4252
class SqlTaskTest {
@@ -197,4 +207,138 @@ void testVarPoolSetting() {
197207
Assertions.assertEquals("1", varPoolParam.getValue());
198208
Assertions.assertEquals(Direct.OUT, varPoolParam.getDirect());
199209
}
210+
211+
@Test
212+
void testGenerateEmptyRow_WithNonNullResultSet_ReturnsEmptyValuesForAllColumns() throws Exception {
213+
// Arrange
214+
ResultSet mockResultSet = mock(ResultSet.class);
215+
ResultSetMetaData mockMetaData = mock(ResultSetMetaData.class);
216+
217+
when(mockResultSet.getMetaData()).thenReturn(mockMetaData);
218+
when(mockMetaData.getColumnCount()).thenReturn(2);
219+
when(mockMetaData.getColumnLabel(1)).thenReturn("id");
220+
when(mockMetaData.getColumnLabel(2)).thenReturn("name");
221+
222+
Method method = SqlTask.class.getDeclaredMethod("generateEmptyRow", ResultSet.class);
223+
method.setAccessible(true);
224+
225+
// Act
226+
ArrayNode result = (ArrayNode) method.invoke(sqlTask, mockResultSet);
227+
228+
// Assert
229+
Assertions.assertNotNull(result);
230+
Assertions.assertEquals(1, result.size());
231+
232+
ObjectNode row = (ObjectNode) result.get(0);
233+
Assertions.assertEquals("", row.get("id").asText());
234+
Assertions.assertEquals("", row.get("name").asText());
235+
}
236+
237+
@Test
238+
void testGenerateEmptyRow_WithNullResultSet_ReturnsErrorObject() throws Exception {
239+
// Arrange
240+
Method method = SqlTask.class.getDeclaredMethod("generateEmptyRow", ResultSet.class);
241+
method.setAccessible(true);
242+
243+
// Act
244+
ArrayNode result = (ArrayNode) method.invoke(sqlTask, (ResultSet) null);
245+
246+
// Assert
247+
Assertions.assertNotNull(result);
248+
Assertions.assertEquals(1, result.size());
249+
250+
ObjectNode row = (ObjectNode) result.get(0);
251+
Assertions.assertTrue(row.has("error"));
252+
Assertions.assertEquals("resultSet is null", row.get("error").asText());
253+
}
254+
255+
@Test
256+
void testGenerateEmptyRow_WithDuplicateColumns_DeduplicatesLabels() throws Exception {
257+
ResultSet mockResultSet = mock(ResultSet.class);
258+
ResultSetMetaData mockMetaData = mock(ResultSetMetaData.class);
259+
260+
when(mockResultSet.getMetaData()).thenReturn(mockMetaData);
261+
when(mockMetaData.getColumnCount()).thenReturn(3);
262+
when(mockMetaData.getColumnLabel(1)).thenReturn("id");
263+
when(mockMetaData.getColumnLabel(2)).thenReturn("id"); // duplicate
264+
when(mockMetaData.getColumnLabel(3)).thenReturn("name");
265+
266+
Method method = SqlTask.class.getDeclaredMethod("generateEmptyRow", ResultSet.class);
267+
method.setAccessible(true);
268+
269+
ArrayNode result = (ArrayNode) method.invoke(sqlTask, mockResultSet);
270+
271+
Assertions.assertNotNull(result);
272+
Assertions.assertEquals(1, result.size());
273+
274+
ObjectNode row = (ObjectNode) result.get(0);
275+
Assertions.assertTrue(row.has("id"));
276+
Assertions.assertTrue(row.has("name"));
277+
}
278+
279+
@Test
280+
void testResultProcess_NullResultSet_ReturnsEmptyResult() throws Exception {
281+
Method resultProcessMethod = SqlTask.class.getDeclaredMethod("resultProcess", ResultSet.class);
282+
resultProcessMethod.setAccessible(true);
283+
284+
// Mock a null ResultSet
285+
String result = (String) resultProcessMethod.invoke(sqlTask, (ResultSet) null);
286+
287+
Assertions.assertNotNull(result);
288+
Assertions.assertTrue(result.equalsIgnoreCase("[{\"error\":\"resultSet is null\"}]"));
289+
}
290+
291+
@Test
292+
void testResultProcess_EmptyResultSet_ReturnsEmptyResult() throws Exception {
293+
// Mock a non-null ResultSet that contains no data rows
294+
ResultSet mockResultSet = mock(ResultSet.class);
295+
ResultSetMetaData mockMetaData = mock(ResultSetMetaData.class);
296+
297+
when(mockResultSet.getMetaData()).thenReturn(mockMetaData);
298+
when(mockMetaData.getColumnCount()).thenReturn(2);
299+
when(mockMetaData.getColumnLabel(1)).thenReturn("id");
300+
when(mockMetaData.getColumnLabel(2)).thenReturn("name");
301+
when(mockResultSet.next()).thenReturn(false); // no rows available
302+
303+
Method resultProcessMethod = SqlTask.class.getDeclaredMethod("resultProcess", ResultSet.class);
304+
resultProcessMethod.setAccessible(true);
305+
306+
String result = (String) resultProcessMethod.invoke(sqlTask, mockResultSet);
307+
308+
Assertions.assertNotNull(result);
309+
// Verify the result contains empty string values for all columns and is a valid JSON array
310+
Assertions.assertTrue(result.contains("\"id\":\"\""));
311+
Assertions.assertTrue(result.contains("\"name\":\"\""));
312+
Assertions.assertTrue(result.startsWith("[{"));
313+
Assertions.assertTrue(result.endsWith("}]"));
314+
}
315+
316+
@Test
317+
void testResultProcess_DuplicateColumnLabels_ThrowsTaskException() throws Exception {
318+
ResultSet mockRs = mock(ResultSet.class);
319+
ResultSetMetaData mockMd = mock(ResultSetMetaData.class);
320+
321+
when(mockRs.getMetaData()).thenReturn(mockMd);
322+
when(mockMd.getColumnCount()).thenReturn(2);
323+
when(mockMd.getColumnLabel(1)).thenReturn("id");
324+
when(mockMd.getColumnLabel(2)).thenReturn("id"); // duplicate column name
325+
326+
Method method = SqlTask.class.getDeclaredMethod("resultProcess", ResultSet.class);
327+
method.setAccessible(true);
328+
329+
// Assert that InvocationTargetException is thrown
330+
InvocationTargetException thrown = Assertions.assertThrows(
331+
InvocationTargetException.class,
332+
() -> method.invoke(sqlTask, mockRs));
333+
334+
// Check the actual cause
335+
Throwable cause = thrown.getCause();
336+
Assertions.assertNotNull(cause);
337+
Assertions.assertInstanceOf(TaskException.class, cause,
338+
"Cause should be TaskException, but was: " + cause.getClass());
339+
Assertions.assertTrue(
340+
cause.getMessage().contains("duplicate column name"),
341+
"TaskException message should mention duplicate column name");
342+
}
343+
200344
}

0 commit comments

Comments
 (0)