Skip to content

Commit 3a0e742

Browse files
fix hive concurrent mode load udf error & code foramt (#5289)
Co-authored-by: aiceflower <[email protected]>
1 parent 0cbe383 commit 3a0e742

File tree

10 files changed

+47
-32
lines changed

10 files changed

+47
-32
lines changed

linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/conf/DWCArgumentsParser.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.linkis.common.conf
1919

20-
import org.apache.linkis.common.utils.{ParameterUtils, Logging}
20+
import org.apache.linkis.common.utils.{Logging, ParameterUtils}
2121

2222
import org.apache.commons.lang3.StringUtils
2323

linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/utils/VariableUtils.scala

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,18 @@ import org.apache.linkis.common.conf.Configuration
2121
import org.apache.linkis.common.exception.LinkisCommonErrorException
2222
import org.apache.linkis.common.variable
2323
import org.apache.linkis.common.variable._
24-
import org.apache.linkis.common.variable.DateTypeUtils.{getCurHour, getMonthDay, getToday, getYesterday}
25-
import org.apache.commons.lang3.{StringUtils, Strings}
24+
import org.apache.linkis.common.variable.DateTypeUtils.{
25+
getCurHour,
26+
getMonthDay,
27+
getToday,
28+
getYesterday
29+
}
30+
31+
import org.apache.commons.lang3.{Strings, StringUtils}
2632

2733
import java.time.ZonedDateTime
2834
import java.util
35+
2936
import scala.collection.JavaConverters._
3037
import scala.collection.mutable
3138
import scala.util.control.Exception.allCatch
@@ -115,7 +122,9 @@ object VariableUtils extends Logging {
115122
case _ =>
116123
if (!nameAndType.contains(key) && StringUtils.isNotEmpty(value)) {
117124
// if ((allCatch opt value.toDouble).isDefined) {
118-
if ((allCatch opt BigDecimal(value)).isDefined && !Strings.CS.startsWith(value, "0")) {
125+
if (
126+
(allCatch opt BigDecimal(value)).isDefined && !Strings.CS.startsWith(value, "0")
127+
) {
119128
nameAndType(key) = variable.BigDecimalValue(BigDecimal(value))
120129
} else {
121130
nameAndType(key) = variable.StringType(value)

linkis-commons/linkis-common/src/main/scala/org/apache/linkis/common/variable/VariableType.scala

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,7 @@ case class YearType(value: CustomYearType) extends VariableType {
111111
}
112112

113113
case class BigDecimalValue(value: BigDecimal) extends VariableType {
114+
114115
override def getValue: String = {
115116
val result = bigDecimalOrLong(value)
116117
result match {
@@ -126,7 +127,10 @@ case class BigDecimalValue(value: BigDecimal) extends VariableType {
126127
case "*" => val res = value * BigDecimal(bValue); formatResult(res)
127128
case "/" => val res = value / BigDecimal(bValue); formatResult(res)
128129
case _ =>
129-
throw new LinkisCommonErrorException(20050, s"BigDecimal class is not supported to use:$signal")
130+
throw new LinkisCommonErrorException(
131+
20050,
132+
s"BigDecimal class is not supported to use:$signal"
133+
)
130134
}
131135
}
132136

@@ -146,6 +150,7 @@ case class BigDecimalValue(value: BigDecimal) extends VariableType {
146150
bd
147151
}
148152
}
153+
149154
}
150155

151156
case class LongType(value: Long) extends VariableType {

linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/utils/StorageConfiguration.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,8 +50,8 @@ object StorageConfiguration {
5050
val STORAGE_BUILD_FS_CLASSES = CommonVars(
5151
"wds.linkis.storage.build.fs.classes",
5252
"org.apache.linkis.storage.factory.impl.BuildHDFSFileSystem,org.apache.linkis.storage.factory.impl.BuildLocalFileSystem," +
53-
"org.apache.linkis.storage.factory.impl.BuildOSSSystem,org.apache.linkis.storage.factory.impl.BuildS3FileSystem," +
54-
"org.apache.linkis.storage.factory.impl.BuildAzureBlobFileSystem"
53+
"org.apache.linkis.storage.factory.impl.BuildOSSSystem,org.apache.linkis.storage.factory.impl.BuildS3FileSystem," +
54+
"org.apache.linkis.storage.factory.impl.BuildAzureBlobFileSystem"
5555
)
5656

5757
val IS_SHARE_NODE = CommonVars("wds.linkis.storage.is.share.node", true)

linkis-commons/linkis-storage/src/main/scala/org/apache/linkis/storage/utils/StorageUtils.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,9 @@ object StorageUtils extends Logging {
204204
* @return
205205
*/
206206
def getFsPath(path: String): FsPath = {
207-
if (path.startsWith(FILE_SCHEMA) || path.startsWith(HDFS_SCHEMA) || path.startsWith(BLOB_SCHEMA)) new FsPath(path)
207+
if (
208+
path.startsWith(FILE_SCHEMA) || path.startsWith(HDFS_SCHEMA) || path.startsWith(BLOB_SCHEMA)
209+
) new FsPath(path)
208210
else {
209211
new FsPath(FILE_SCHEMA + path)
210212
}

linkis-computation-governance/linkis-computation-governance-common/src/main/scala/org/apache/linkis/governance/common/utils/EngineConnArguments.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717

1818
package org.apache.linkis.governance.common.utils
1919

20-
import org.apache.linkis.common.utils.{ParameterUtils, Logging}
20+
import org.apache.linkis.common.utils.{Logging, ParameterUtils}
2121

2222
import org.apache.commons.lang3.StringUtils
2323

linkis-computation-governance/linkis-engineconn/linkis-engineconn-plugin-core/src/main/scala/org/apache/linkis/manager/engineplugin/common/conf/EnvConfiguration.scala

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@ package org.apache.linkis.manager.engineplugin.common.conf
1919

2020
import org.apache.linkis.common.conf.{ByteType, CommonVars, Configuration}
2121

22-
import org.apache.commons.lang3.{JavaVersion, SystemUtils}
23-
2422
object EnvConfiguration {
2523

2624
val HIVE_CONF_DIR = CommonVars[String](

linkis-computation-governance/linkis-entrance/src/test/java/org/apache/linkis/entrance/interceptor/impl/SQLExplainTest.java

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.linkis.entrance.interceptor.impl;
1919

2020
import org.apache.linkis.governance.common.entity.job.JobRequest;
21+
2122
import org.junit.jupiter.api.Assertions;
2223
import org.junit.jupiter.api.Test;
2324

@@ -55,24 +56,21 @@ void isSelectOverLimit() {
5556
}
5657

5758
/**
58-
* 未修复前代码进行拼接sql时,输出的sql为
59-
* select
60-
* id,
61-
* name,
62-
* array_join(array_intersect(map_keys(info),array['abs','oda'],' limit 5000;
63-
* ') as infos
64-
* from ods.dim_ep22
59+
* 未修复前代码进行拼接sql时,输出的sql为 select id, name,
60+
* array_join(array_intersect(map_keys(info),array['abs','oda'],' limit 5000; ') as infos from
61+
* ods.dim_ep22
6562
*/
6663
@Test
6764
void splicingLimitSql() {
68-
String code = "select\n" +
69-
"id,\n" +
70-
"name,\n" +
71-
"array_join(array_intersect(map_keys(info),array['abs','oda'],';') as infos\n" +
72-
"from ods.dim_ep22";
65+
String code =
66+
"select\n"
67+
+ "id,\n"
68+
+ "name,\n"
69+
+ "array_join(array_intersect(map_keys(info),array['abs','oda'],';') as infos\n"
70+
+ "from ods.dim_ep22";
7371
StringBuilder logAppender = new StringBuilder();
7472
JobRequest jobRequest = new JobRequest();
7573
SQLExplain.dealSQLLimit(code, jobRequest, logAppender);
76-
Assertions.assertEquals(code+" limit 5000", jobRequest.getExecutionCode());
74+
Assertions.assertEquals(code + " limit 5000", jobRequest.getExecutionCode());
7775
}
7876
}

linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/executor/HiveEngineConcurrentConnExecutor.scala

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ class HiveEngineConcurrentConnExecutor(
135135
code: String
136136
): ExecuteResponse = {
137137
LOG.info(s"HiveEngineConcurrentConnExecutor Ready to executeLine: $code")
138-
val taskId: String = engineExecutorContext.getJobId.get
138+
val taskId: String = engineExecutorContext.getJobId.getOrElse("udf_init")
139139
CSHiveHelper.setContextIDInfoToHiveConf(engineExecutorContext, hiveConf)
140140

141141
val realCode = code.trim()
@@ -166,12 +166,7 @@ class HiveEngineConcurrentConnExecutor(
166166

167167
val driver = new HiveDriverProxy(any)
168168
driverCache.put(taskId, driver)
169-
executeHQL(
170-
engineExecutorContext.getJobId.get,
171-
engineExecutorContext,
172-
realCode,
173-
driver
174-
)
169+
executeHQL(taskId, engineExecutorContext, realCode, driver)
175170
case _ =>
176171
val resp = proc.run(realCode.substring(tokens(0).length).trim)
177172
val result = new String(baos.toByteArray)

linkis-engineconn-plugins/hive/src/main/scala/org/apache/linkis/engineplugin/hive/hook/HiveAddJarsEngineHook.scala

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,10 @@ import org.apache.linkis.engineconn.common.engineconn.EngineConn
2424
import org.apache.linkis.engineconn.common.hook.EngineConnHook
2525
import org.apache.linkis.engineconn.computation.executor.execute.EngineExecutionContext
2626
import org.apache.linkis.engineconn.core.executor.ExecutorManager
27-
import org.apache.linkis.engineplugin.hive.executor.HiveEngineConnExecutor
27+
import org.apache.linkis.engineplugin.hive.executor.{
28+
HiveEngineConcurrentConnExecutor,
29+
HiveEngineConnExecutor
30+
}
2831
import org.apache.linkis.manager.engineplugin.common.launch.process.Environment
2932
import org.apache.linkis.manager.label.entity.Label
3033
import org.apache.linkis.manager.label.entity.engine.{CodeLanguageLabel, RunType}
@@ -78,7 +81,12 @@ class HiveAddJarsEngineHook extends EngineConnHook with Logging {
7881
ExecutorManager.getInstance.getExecutorByLabels(labels) match {
7982
case executor: HiveEngineConnExecutor =>
8083
executor.executeLine(new EngineExecutionContext(executor), sql)
84+
logger.info("use hive none concurrent mode.")
85+
case executor: HiveEngineConcurrentConnExecutor =>
86+
executor.executeLine(new EngineExecutionContext(executor), sql)
87+
logger.info("use hive concurrent mode.")
8188
case _ =>
89+
logger.warn(s"Executor is not a ComputationExecutor, skip adding jar: $jar")
8290
}
8391
} catch {
8492
case t: Throwable => logger.error(s"run hive sql ${addSql + jar} failed", t)

0 commit comments

Comments
 (0)