Skip to content

Commit 55c9df0

Browse files
committed
fix hive concurrent mode load udf error problems
1 parent 1373050 commit 55c9df0

File tree

4 files changed

+20
-23
lines changed

4 files changed

+20
-23
lines changed

linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/conf/EnvConfiguration.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@ package org.apache.linkis.manager.engineplugin.common.conf
1919

2020
import org.apache.linkis.common.conf.{ByteType, CommonVars, Configuration}
2121

22-
import org.apache.commons.lang3.{JavaVersion, SystemUtils}
23-
2422
object EnvConfiguration {
2523

2624
val HIVE_CONF_DIR = CommonVars[String](

linkis-computation-governance/linkis-entrance/src/test/java/org/apache/linkis/entrance/interceptor/impl/SQLExplainTest.java

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.linkis.entrance.interceptor.impl;
1919

2020
import org.apache.linkis.governance.common.entity.job.JobRequest;
21+
2122
import org.junit.jupiter.api.Assertions;
2223
import org.junit.jupiter.api.Test;
2324

@@ -55,24 +56,21 @@ void isSelectOverLimit() {
5556
}
5657

5758
/**
58-
* 未修复前代码进行拼接sql时,输出的sql为
59-
* select
60-
* id,
61-
* name,
62-
* array_join(array_intersect(map_keys(info),array['abs','oda'],' limit 5000;
63-
* ') as infos
64-
* from ods.dim_ep22
59+
* 未修复前代码进行拼接sql时,输出的sql为 select id, name,
60+
* array_join(array_intersect(map_keys(info),array['abs','oda'],' limit 5000; ') as infos from
61+
* ods.dim_ep22
6562
*/
6663
@Test
6764
void splicingLimitSql() {
68-
String code = "select\n" +
69-
"id,\n" +
70-
"name,\n" +
71-
"array_join(array_intersect(map_keys(info),array['abs','oda'],';') as infos\n" +
72-
"from ods.dim_ep22";
65+
String code =
66+
"select\n"
67+
+ "id,\n"
68+
+ "name,\n"
69+
+ "array_join(array_intersect(map_keys(info),array['abs','oda'],';') as infos\n"
70+
+ "from ods.dim_ep22";
7371
StringBuilder logAppender = new StringBuilder();
7472
JobRequest jobRequest = new JobRequest();
7573
SQLExplain.dealSQLLimit(code, jobRequest, logAppender);
76-
Assertions.assertEquals(code+" limit 5000", jobRequest.getExecutionCode());
74+
Assertions.assertEquals(code + " limit 5000", jobRequest.getExecutionCode());
7775
}
7876
}

linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConcurrentConnExecutor.scala

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ class HiveEngineConcurrentConnExecutor(
135135
code: String
136136
): ExecuteResponse = {
137137
LOG.info(s"HiveEngineConcurrentConnExecutor Ready to executeLine: $code")
138-
val taskId: String = engineExecutorContext.getJobId.get
138+
val taskId: String = engineExecutorContext.getJobId.getOrElse("udf_init")
139139
CSHiveHelper.setContextIDInfoToHiveConf(engineExecutorContext, hiveConf)
140140

141141
val realCode = code.trim()
@@ -166,12 +166,7 @@ class HiveEngineConcurrentConnExecutor(
166166

167167
val driver = new HiveDriverProxy(any)
168168
driverCache.put(taskId, driver)
169-
executeHQL(
170-
engineExecutorContext.getJobId.get,
171-
engineExecutorContext,
172-
realCode,
173-
driver
174-
)
169+
executeHQL(taskId, engineExecutorContext, realCode, driver)
175170
case _ =>
176171
val resp = proc.run(realCode.substring(tokens(0).length).trim)
177172
val result = new String(baos.toByteArray)

linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/hook/HiveAddJarsEngineHook.scala

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,10 @@ import org.apache.linkis.engineconn.common.engineconn.EngineConn
2424
import org.apache.linkis.engineconn.common.hook.EngineConnHook
2525
import org.apache.linkis.engineconn.computation.executor.execute.EngineExecutionContext
2626
import org.apache.linkis.engineconn.core.executor.ExecutorManager
27-
import org.apache.linkis.engineplugin.hive.executor.HiveEngineConnExecutor
27+
import org.apache.linkis.engineplugin.hive.executor.{
28+
HiveEngineConcurrentConnExecutor,
29+
HiveEngineConnExecutor
30+
}
2831
import org.apache.linkis.manager.engineplugin.common.launch.process.Environment
2932
import org.apache.linkis.manager.label.entity.Label
3033
import org.apache.linkis.manager.label.entity.engine.{CodeLanguageLabel, RunType}
@@ -78,7 +81,10 @@ class HiveAddJarsEngineHook extends EngineConnHook with Logging {
7881
ExecutorManager.getInstance.getExecutorByLabels(labels) match {
7982
case executor: HiveEngineConnExecutor =>
8083
executor.executeLine(new EngineExecutionContext(executor), sql)
84+
case executor: HiveEngineConcurrentConnExecutor =>
85+
executor.executeLine(new EngineExecutionContext(executor), sql)
8186
case _ =>
87+
logger.warn(s"Executor is not a ComputationExecutor, skip adding jar: $jar")
8288
}
8389
} catch {
8490
case t: Throwable => logger.error(s"run hive sql ${addSql + jar} failed", t)

0 commit comments

Comments
 (0)