Skip to content

Commit 37a8ca8

Browse files
authored
[enhance](job) terminate streaming task execute threads promptly when idle (#58041)
1. terminate streaming task execute threads promptly when idle. 2. set job_streaming_task_exec_thread_num default value from 10 to 100.
1 parent 8e0f77a commit 37a8ca8

File tree

2 files changed

+18
-12
lines changed

2 files changed

+18
-12
lines changed

fe/fe-common/src/main/java/org/apache/doris/common/Config.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1988,10 +1988,10 @@ public class Config extends ConfigBase {
19881988
+ " greater than 0, otherwise it defaults to 3." })
19891989
public static int job_dictionary_task_consumer_thread_num = 3;
19901990

1991-
@ConfField(masterOnly = true, description = {"用于执行 Streaming 任务的线程数,值应该大于0,否则默认为10",
1991+
@ConfField(masterOnly = true, description = {"用于执行 Streaming 任务的线程数,值应该大于0,否则默认为100",
19921992
"The number of threads used to execute Streaming Tasks, "
1993-
+ "the value should be greater than 0, if it is <=0, default is 10."})
1994-
public static int job_streaming_task_exec_thread_num = 10;
1993+
+ "the value should be greater than 0, if it is <=0, default is 100."})
1994+
public static int job_streaming_task_exec_thread_num = 100;
19951995

19961996
@ConfField(masterOnly = true, description = {"最大的 Streaming 作业数量,值应该大于0,否则默认为1024",
19971997
"The maximum number of Streaming jobs, "

fe/fe-core/src/main/java/org/apache/doris/job/scheduler/StreamingTaskScheduler.java

Lines changed: 15 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -40,15 +40,21 @@
4040

4141
@Log4j2
4242
public class StreamingTaskScheduler extends MasterDaemon {
43-
private final ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
44-
Config.job_streaming_task_exec_thread_num,
45-
Config.job_streaming_task_exec_thread_num,
46-
0,
47-
TimeUnit.SECONDS,
48-
new ArrayBlockingQueue<>(Config.max_streaming_job_num),
49-
new CustomThreadFactory("streaming-task-execute"),
50-
new ThreadPoolExecutor.AbortPolicy()
51-
);
43+
private final ThreadPoolExecutor threadPool;
44+
45+
{
46+
threadPool = new ThreadPoolExecutor(
47+
Config.job_streaming_task_exec_thread_num,
48+
Config.job_streaming_task_exec_thread_num,
49+
60L,
50+
TimeUnit.SECONDS,
51+
new ArrayBlockingQueue<>(Config.max_streaming_job_num),
52+
new CustomThreadFactory("streaming-task-execute"),
53+
new ThreadPoolExecutor.AbortPolicy()
54+
);
55+
threadPool.allowCoreThreadTimeOut(true);
56+
}
57+
5258
private final ScheduledThreadPoolExecutor delayScheduler
5359
= new ScheduledThreadPoolExecutor(1, new CustomThreadFactory("streaming-task-delay-scheduler"));
5460

0 commit comments

Comments
 (0)