FrameWorks/Spring & Boot

[Spring Object] TaskExcutor

ABCD 2025. 2. 6.

TaskExecutor

TaskExecutor는 단순히 비동기 작업을 시행하기 위한 Interface입니다.


Spring에서는 Executor 인터페이스를 구현하는 것으로 단일화 했는데요.

Executor API는 의외로 간단합니다.

package java.util.concurrent;

public interface Executor {
    void execute(Runnable command);
}

ExecutorService

ExecutorService는 Thread의 관리 기능이 강화된 인터페이스로 shutdown()처럼 Thread에 이벤트를 일으키는 메서드를 제공합니다.

 

ExecutorService에는 Future<T>형 객체를 반환하는 submit() 메서드가 있습니다.


Future<T> 인스턴스는 대게 비동기 실행 Thread의 진행 상황을 추적하는 용도로 사용됩니다. 예를 들어 Future.isDone(), Future.isCancleed()는 각각 어떠한 작업이 끝나고 취소되었는지를 확인하는 용도로 사용됩니다.

Runnable task = new Runnable() {  
    @Override  
    public void run() {  
        try {  
            Thread.sleep(1000);  
            System.out.println("finish");  
        } catch (Exception e) {  
            throw new RuntimeException(e);  
        }  
    }  
};  

ExecutorService executorService = Executors.newCachedThreadPool();  
if(executorService.submit(task, Boolean.TRUE).get().equals(Boolean.TRUE)){  
    System.out.println("Job has finished");  
}

 

아직 이해가 잘 안될 수 있습니다.

 

다른 예시를 들어보겠습니다.

Runnable을 응용해 시간 경과를 나타내는 Class 입니다.

public class DemonstrationRunnable implements Runnable {

    @Override
    public void run() {
        try {
            Thread.sleep(1000)
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println(Thread.currentThread().getName());
        System.out.printf("Hello at %s \n", new Data());
    }
}
public class ExecutorsDemo {  
    public static void main(String[] args) throws Throwable{  
        Runnable task = new DemonstrationRunnable();  

        ExecutorService cachedThreadPoolExecutorService = Executors.newCachedThreadPool();  

        if(cachedThreadPoolExecutorService.submit(task).get() == null) {  
            System.out.printf("The cachedThreadPoolExecutorService has succeeded at %s \n", new Date());  
        }  

        ExecutorService fixedThreadPool = Executors.newFixedThreadPool(100);  
        if(fixedThreadPool.submit(task).get() == null) {  
            System.out.printf("fixedThreadPool has succeeded at %s \n", new Date());  
        }  

        ExecutorService singleThreadExecutorService = Executors.newSingleThreadExecutor();  
        if(singleThreadExecutorService.submit(task).get() == null) {  
            System.out.printf("singleThreadExecutorService has succeeded at %s \n", new Date());  
        }  

        ExecutorService es = Executors.newCachedThreadPool();  
        if(es.submit(task, Boolean.TRUE).get().equals(Boolean.TRUE)) {  
            System.out.println("Job has finished");  
        }  

        // 10초 후에 해당 Thread를 동작시키는 .schedule() 함수  
        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(10);  
        if(scheduledExecutorService.schedule(task, 10, TimeUnit.SECONDS).get() == null) {  
            System.out.printf("scheduledExecutorService has succeeded at %s \n", new Date());  
        }  

        // 5초마다 구현된 Runnable을 동작시키는 .scheduleAtFixedRate() 함수  
        scheduledExecutorService.scheduleAtFixedRate(task, 0, 5, TimeUnit.SECONDS);
    }
}

 

결과가 다음과 같이 나온다면 정상적으로 동작한 것입니다.

pool-1-thread-1
Hello at Mon Jun 03 13:15:17 KST 2024 
The cachedThreadPoolExecutorService has succeeded at Mon Jun 03 13:15:17 KST 2024 
pool-2-thread-1
Hello at Mon Jun 03 13:15:20 KST 2024 
fixedThreadPool has succeeded at Mon Jun 03 13:15:20 KST 2024 
pool-3-thread-1
Hello at Mon Jun 03 13:15:23 KST 2024 
singleThreadExecutorService has succeeded at Mon Jun 03 13:15:23 KST 2024 
pool-4-thread-1
Hello at Mon Jun 03 13:15:26 KST 2024 
Job has finished
pool-4-thread-2
Hello at Mon Jun 03 13:15:26 KST 2024 
Job has finished
pool-4-thread-3
Hello at Mon Jun 03 13:15:36 KST 2024 
Job has finished

이해가 되셨나요??
Thread 실제로 많이 사용해보지 않으면 이해하기 어렵다고들 합니다.
저도 아직 완벽하게 이해가 가지는 않는 부분이라... 더 자세히 설명하기는 벅차지만,
조금 더 설명을 이어나가 볼게요!

이번에는 Callable\<T\>에 관한 설명입니다.
Callable\<T\>자체 보다는 ExecutorService에 있는submit(Callable\<T\> call)이라는 메서드를 설명하려고 합니다.

Runnable이 아닌 Callable을 인자로 받은 submit()함수를 실행하게 되면 Callablecall() 메서드가 반환한 값을 그대로 다시 반환하게 이는 실행한 작업의 결과를 반환하는데 중점을 두고 있습니다.

package java.util.concurrent;

public intrerface Callable<V> {
    V call() throws Exception;
}

// Callable을 구현하게 되면 다음과 같이 구현하게 됩니다.
Callable<String> str = new Callable<String>() {  
    @Override  
    public String call() throws Exception {  
        return null;  
    }  
}

TaskExecutor

TaskExecutorExcutor를 베이스로 한 Interface입니다.
기존 Java SE의 ExecutorExecutorServiceTaskExecutor에서 사용할 수 있게 지원하는 경우도 있지만, 어차피 스프링 관점에서는 베이스 클래스가 Executor이므로 별로 중요하치 않습니다.

package org.springframwork.task;
...

public interface TaskExecutor extends Executor {
    void execute(Runnable task);
}

 

앞서 사용했던 DemonstrationRunnableTaskExecutor에 넣어 한번 살표보도록 하죠.

@Component
public class SpringExecutorsDemo {
    @Autowired  
    private SimpleAsyncTaskExecutor asyncTaskExecutor;  

    @Autowired  
    private SyncTaskExecutor syncTaskExecutor;  

    @Autowired  
    private TaskExecutorAdapter taskExecutorAdapter;  

    @Autowired  
    private ThreadPoolTaskExecutor threadPoolTaskExecutor;  

    @Autowired  
    private DemonstrationRunnable task;  

    @PostConstruct  
    public void submitJobs() {  
        syncTaskExecutor.execute(task);  
        taskExecutorAdapter.submit(task);  
        asyncTaskExecutor.submit(task);  

        for (int i = 0; i < 500; i++) {  
            threadPoolTaskExecutor.submit(task);  
        }  
    }  

    public static void main(String[] args) {  
        new AnnotationConfigApplicationContext(ExecutorsConfiguration.class).registerShutdownHook();  
    }
}
@Configuration  
@ComponentScan  
public class ExecutorsConfiguration {  

    @Bean  
    public TaskExecutorAdapter taskExecutorAdapter() {  
        return new TaskExecutorAdapter(Executors.newCachedThreadPool());  
    }  

    @Bean  
    public SimpleAsyncTaskExecutor simpleAsyncTaskExecutor() {  
        return new SimpleAsyncTaskExecutor();  
    }  

    @Bean  
    public SyncTaskExecutor syncTaskExecutor() {  
        return new SyncTaskExecutor();  
    }  

    @Bean  
    public ScheduledExecutorFactoryBean scheduledExecutorFactoryBean(ScheduledExecutorTask scheduledExecutorTask) {  
        ScheduledExecutorFactoryBean scheduledExecutorFactoryBean = new ScheduledExecutorFactoryBean();  
        scheduledExecutorFactoryBean.setScheduledExecutorTasks(scheduledExecutorTask);  
        return scheduledExecutorFactoryBean;  
    }  

    @Bean  
    public ScheduledExecutorTask scheduledExecutorTask(Runnable runnable) {  
        ScheduledExecutorTask scheduledExecutorTask = new ScheduledExecutorTask();  
        scheduledExecutorTask.setPeriod(1000);  
        scheduledExecutorTask.setRunnable(runnable);  
        return scheduledExecutorTask;  
    }  

    @Bean  
    public ThreadPoolTaskExecutor threadPoolTaskExecutor() {  
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();  
        taskExecutor.setCorePoolSize(50);  
        taskExecutor.setMaxPoolSize(100);  
        taskExecutor.setAllowCoreThreadTimeOut(true);  
        taskExecutor.setWaitForTasksToCompleteOnShutdown(true);  
        return taskExecutor;  
    }

 

위에 SpringExecutorsDemo.java에서 사용한 객체들이 보이시나요??

TaskExecutor Interface의 다양한 구현체들이죠.

  • TaskExcutorAdapter
    • java.util.concurrence.Executors 인스턴스를 감싼 단순 래퍼라서 Spring을 이용해 TaskExecutor Interface와 같은 방식으로 다룰 수 있습니다.
    • Executor 인스턴스를 구성하고 생성자에 인수로 전달하고 있습니다.
  • SimpleAsyncTaskExecutor
    • 전송한 Job마다 새로 Thread를 제공합니다.
    • Thread를 풀링하거나 재사용하지 않습니다.
    • 전송한 각각의 Job은 Thread에서 비동기로 실행됩니다.
  • SyncTaskExecutor
    • 가장 단순한 TaskExecutor구현체로써 동기적으로 Thread를 띄워 Job을 실행하고 join() 메서드로 연결합니다.
    • 사실상 Threading은 완전히 건너띄고 호출 Thread에서 run()메서드를 수동 실행한 것이나 다를바 없습니다.
  • ScheduledExecutorFactoryBean
    • ScheduledExecutorTask 빈으로 정의된 Job을 자동 트리거합니다.
    • ScheduledExecutorTask 인스턴스 목록을 지정해서 여러 Job을 동시에 실행할 수도 있고, 작업 실행 간 공백 시간을 인수로 넣을 수도 있습니다.
  • ThreadPoolTaskExecutor
    • java.util.concurrent.ThreadPoolExecutor를 기반으로 모든 기능이 완비된 Thread Pool 구현체입니다.

참고사항

TreadFactory

Tread를 할당하고 실행할 때 지정한 동작을 할 수 있도록 만들어 주는 Interface입니다.
현재 제가 이해한 바로는 Log를 작성할 때 찾기 쉽게 넘버링을 해준다던가, 네이밍을 해주는 용도로 사용하는 것으로 파악 됩니다.
물론 사전에 필요한 동작들을 설정해 줄 수 있겠지만, 특별한 사례가 생각 나지는 않습니다.

그럼 TreadFactory를 사용한 방법의 예시를 한번 볼까요??

public class ThreadFactorySample implements ThreadFactory {  
    private final String threadPrefix;  
    private long threadCounter;  
    private StringBuilder builder;  

    public ThreadFactorySample(String threadName) {  
        this.threadPrefix = threadName;  
        this.threadCounter = 0;  
        this.builder = new StringBuilder();  
    }  

    @Override  
    public Thread newThread(Runnable r) {  
        Thread thread = new Thread(r, threadPrefix + "-" + threadCounter++);  
        builder.append(String.format("Create Thread %d with name on %s, Active thread %s %n", thread.getId(), thread.getName(), new Date(), Thread.activeCount()));  
        return thread;  
    }  

    public String getThreadLog () {  
        String s = builder.toString();  
        builder.setLength(0);  
        return s;  
    }  
}

 

우리가 사용하려는 Thread의 이름이 재 할당 된 것을 확인 할 수 있는데요.

 

또한, 해당 Thread의 로그 정보를 가져올 수 있게 builder에 append()하여 해당 문구를 가져 올 수 있습니다.

아래에서 한번 실행해 보겠습니다.

public class ExecutorsDemo {  
    public static void main(String[] args) throws Throwable{  
        Runnable task = new DemonstrationRunnable();  
        ThreadFactorySample tfs = new ThreadFactorySample("테스트");

        ExecutorService cachedThreadPoolExecutorService = Executors.newCachedThreadPool(tfs);  

        if(cachedThreadPoolExecutorService.submit(task).get() == null) {  
            System.out.printf("The cachedThreadPoolExecutorService has succeeded at %s \n", new Date());  
            System.out.println(tfs.getThreadLog());
        }  

        ExecutorService fixedThreadPool = Executors.newFixedThreadPool(100);  
        if(fixedThreadPool.submit(task).get() == null) {  
            System.out.printf("fixedThreadPool has succeeded at %s \n", new Date());  
        }  

        ExecutorService singleThreadExecutorService = Executors.newSingleThreadExecutor();  
        if(singleThreadExecutorService.submit(task).get() == null) {  
            System.out.printf("singleThreadExecutorService has succeeded at %s \n", new Date());  
        }  

        ExecutorService es = Executors.newCachedThreadPool();  
        if(es.submit(task, Boolean.TRUE).get().equals(Boolean.TRUE)) {  
            System.out.println("Job has finished");  
        }  

        // 10초 후에 해당 Thread를 동작시키는 .schedule() 함수  
        ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(10);  
        if(scheduledExecutorService.schedule(task, 10, TimeUnit.SECONDS).get() == null) {  
            System.out.printf("scheduledExecutorService has succeeded at %s \n", new Date());  
        }  

        // 5초마다 구현된 Runnable을 동작시키는 .scheduleAtFixedRate() 함수  
        scheduledExecutorService.scheduleAtFixedRate(task, 0, 5, TimeUnit.SECONDS);
    }
}

 

아래와 같이 첫번째 Thread의 이름이 나왔다면 성공입니다.
해당 Thread에서 builder를 통해 얻은 String에 대한 Log도 정상적으로 나온 모습을 확인 할 수 있습니다.

테스트중-0
Hello at Mon Jun 03 17:02:37 KST 2024 
The cachedThreadPoolExecutorService has succeeded at Mon Jun 03 17:02:37 KST 2024 
Create Thread 24 with name on ThreadFactory테스트중-0, Active thread Mon Jun 03 17:02:34 KST 2024 

pool-1-thread-1
Hello at Mon Jun 03 17:02:40 KST 2024 
fixedThreadPool has succeeded at Mon Jun 03 17:02:40 KST 2024 
pool-2-thread-1
Hello at Mon Jun 03 17:02:43 KST 2024 
singleThreadExecutorService has succeeded at Mon Jun 03 17:02:43 KST 2024 
pool-3-thread-1
Hello at Mon Jun 03 17:02:46 KST 2024 
Job has finished
pool-4-thread-1
Hello at Mon Jun 03 17:02:59 KST 2024 
scheduledExecutorService has succeeded at Mon Jun 03 17:02:59 KST 2024 
pool-4-thread-1
Hello at Mon Jun 03 17:03:02 KST 2024 
pool-4-thread-2
Hello at Mon Jun 03 17:03:07 KST 2024 
728x90
반응형

'FrameWorks > Spring & Boot' 카테고리의 다른 글

[Spring Object] MessageSource  (0) 2025.02.06
[Spring Object] Resource  (0) 2025.02.06
[Spring Object] ApplicationEvent  (0) 2025.02.03
[Spring Object] Aware  (0) 2025.02.03
[Spring Object] AbstractFactoryBean  (0) 2025.01.21

댓글

💲 추천 글