동시성 프로그래밍이란
현대 애플리케이션은 여러 작업을 동시에 처리해야 합니다. 파일 업로드를 받으면서 DB 쿼리를 실행하고, API 요청을 처리하는 등 멀티태스킹은 필수입니다. Java는 강력한 동시성 라이브러리를 제공하여 이런 작업을 안전하고 효율적으로 처리할 수 있게 합니다.
이 글에서는 Thread 기본부터 ThreadPool, 동기화 메커니즘까지 실전 예제와 함께 알아봅니다.
Thread와 Runnable 기본
Thread 생성 방법
Java에서 스레드를 생성하는 두 가지 방법이 있습니다.
1. Thread 클래스 상속
public class MyThread extends Thread {
@Override
public void run() {
System.out.println("Thread 실행: " + Thread.currentThread().getName());
}
}
// 사용
MyThread thread = new MyThread();
thread.start();
2. Runnable 인터페이스 구현 (권장)
public class MyRunnable implements Runnable {
@Override
public void run() {
System.out.println("Runnable 실행: " + Thread.currentThread().getName());
}
}
// 사용
Thread thread = new Thread(new MyRunnable());
thread.start();
// 람다 표현식으로 간결하게
Thread thread2 = new Thread(() -> {
System.out.println("Lambda 실행: " + Thread.currentThread().getName());
});
thread2.start();
Runnable 방식이 권장되는 이유:
- 다른 클래스를 상속받을 수 있음
- 코드 재사용성이 높음
- 함수형 인터페이스로 람다 사용 가능
간단한 병렬 다운로드 예제
public class SimpleDownloader {
public static void main(String[] args) {
String[] urls = {
"https://example.com/file1.zip",
"https://example.com/file2.zip",
"https://example.com/file3.zip"
};
for (String url : urls) {
new Thread(() -> {
downloadFile(url);
}).start();
}
}
private static void downloadFile(String url) {
System.out.println("다운로드 시작: " + url +
" [" + Thread.currentThread().getName() + "]");
try {
Thread.sleep(2000); // 다운로드 시뮬레이션
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("다운로드 완료: " + url);
}
}
하지만 이 방식은 스레드 수를 제어할 수 없어 위험합니다. 수천 개의 파일이 있다면 수천 개의 스레드가 생성되어 시스템이 멈출 수 있습니다.
ExecutorService와 ThreadPool
ThreadPool의 필요성
스레드 생성은 비용이 큽니다. ThreadPool을 사용하면:
- 스레드를 미리 생성해두고 재사용
- 동시 실행 스레드 수 제한
- 작업 큐를 통한 작업 관리
ExecutorService 기본 사용법
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class ExecutorServiceExample {
public static void main(String[] args) {
// 고정 크기 ThreadPool 생성
ExecutorService executor = Executors.newFixedThreadPool(3);
// 10개 작업 제출
for (int i = 0; i < 10; i++) {
final int taskId = i;
executor.submit(() -> {
System.out.println("Task " + taskId +
" 실행 중 [" + Thread.currentThread().getName() + "]");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println("Task " + taskId + " 완료");
});
}
// Executor 종료
executor.shutdown();
try {
// 모든 작업 완료 대기 (최대 60초)
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
}
System.out.println("모든 작업 완료");
}
}
출력 예시:
Task 0 실행 중 [pool-1-thread-1]
Task 1 실행 중 [pool-1-thread-2]
Task 2 실행 중 [pool-1-thread-3]
Task 0 완료
Task 3 실행 중 [pool-1-thread-1]
...
3개 스레드만 사용하여 10개 작업을 순차적으로 처리합니다.
newFixedThreadPool vs newCachedThreadPool
newFixedThreadPool(n): 고정된 n개의 스레드 사용
ExecutorService fixed = Executors.newFixedThreadPool(5);
// 5개 스레드로 모든 작업 처리
// 작업이 많으면 큐에 대기
newCachedThreadPool(): 필요시 스레드 생성, 60초 유휴시 제거
ExecutorService cached = Executors.newCachedThreadPool();
// 작업 수만큼 스레드 생성 (제한 없음)
// 짧고 많은 작업에 적합
선택 기준:
- CPU 집약적 작업:
newFixedThreadPool(Runtime.getRuntime().availableProcessors()) - I/O 집약적 작업:
newFixedThreadPool(n * 2)또는newCachedThreadPool() - 작업 수가 예측 가능:
newFixedThreadPool - 작업 수가 변동적이고 짧음:
newCachedThreadPool
ThreadPoolExecutor 커스터마이징
더 세밀한 제어가 필요하면 ThreadPoolExecutor를 직접 생성합니다.
import java.util.concurrent.*;
public class CustomThreadPoolExample {
public static void main(String[] args) {
int corePoolSize = 5; // 기본 스레드 수
int maxPoolSize = 10; // 최대 스레드 수
long keepAliveTime = 60L; // 유휴 스레드 대기 시간
BlockingQueue<Runnable> queue = new LinkedBlockingQueue<>(100); // 작업 큐
ThreadPoolExecutor executor = new ThreadPoolExecutor(
corePoolSize,
maxPoolSize,
keepAliveTime,
TimeUnit.SECONDS,
queue,
new ThreadPoolExecutor.CallerRunsPolicy() // 거부 정책
);
// 작업 제출
for (int i = 0; i < 200; i++) {
final int taskId = i;
executor.submit(() -> {
System.out.println("Task " + taskId + " 처리 중");
try {
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
// 모니터링
System.out.println("Active threads: " + executor.getActiveCount());
System.out.println("Pool size: " + executor.getPoolSize());
System.out.println("Queue size: " + executor.getQueue().size());
executor.shutdown();
}
}
거부 정책 (RejectedExecutionHandler)
큐가 가득 차고 최대 스레드에 도달하면:
- AbortPolicy (기본): RejectedExecutionException 발생
- CallerRunsPolicy: 호출한 스레드에서 직접 실행 (백프레셔)
- DiscardPolicy: 조용히 버림
- DiscardOldestPolicy: 가장 오래된 작업 버리고 새 작업 추가
// 백프레셔 적용 - 서버 과부하 방지
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
ConcurrentHashMap vs Collections.synchronizedMap
동기화된 컬렉션의 필요성
일반 HashMap은 멀티스레드 환경에서 안전하지 않습니다.
// 위험한 코드
Map<String, Integer> map = new HashMap<>();
// 여러 스레드가 동시에 접근
executor.submit(() -> map.put("key1", 1));
executor.submit(() -> map.put("key2", 2));
// ConcurrentModificationException 또는 데이터 손실 발생 가능
Collections.synchronizedMap
Map<String, Integer> syncMap = Collections.synchronizedMap(new HashMap<>());
syncMap.put("key1", 1); // 메서드 단위 동기화
단점:
- 모든 작업에 락이 걸려 성능 저하
- iteration시 외부에서 동기화 필요
synchronized (syncMap) {
for (Map.Entry<String, Integer> entry : syncMap.entrySet()) {
System.out.println(entry.getKey() + ": " + entry.getValue());
}
}
ConcurrentHashMap (권장)
세그먼트별 락으로 동시성 향상:
import java.util.concurrent.ConcurrentHashMap;
public class ConcurrentMapExample {
private static ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(10);
// 동시 쓰기
for (int i = 0; i < 100; i++) {
final int value = i;
executor.submit(() -> {
map.put("key" + value, value);
});
}
// 원자적 연산
executor.submit(() -> {
map.putIfAbsent("counter", 0);
map.compute("counter", (k, v) -> v + 1);
});
executor.submit(() -> {
map.computeIfAbsent("newKey", k -> 100);
});
executor.shutdown();
try {
executor.awaitTermination(1, TimeUnit.SECONDS);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
// iteration에 외부 동기화 불필요
map.forEach((key, value) -> {
System.out.println(key + ": " + value);
});
}
}
ConcurrentHashMap의 장점:
- 읽기 작업은 락 없음
- 쓰기 작업은 세그먼트별 락
putIfAbsent,compute,merge등 원자적 연산 제공
AtomicInteger와 AtomicBoolean
문제 상황: 카운터의 동시성 이슈
// 위험한 코드
private int counter = 0;
public void increment() {
counter++; // 원자적이지 않음! (read-modify-write)
}
여러 스레드가 동시에 increment()를 호출하면 값이 누락됩니다.
synchronized로 해결
private int counter = 0;
public synchronized void increment() {
counter++;
}
간단하지만 락 비용이 있습니다.
AtomicInteger 사용 (권장)
import java.util.concurrent.atomic.AtomicInteger;
public class AtomicCounterExample {
private AtomicInteger counter = new AtomicInteger(0);
public void increment() {
counter.incrementAndGet(); // 원자적 연산, 락 없음
}
public int get() {
return counter.get();
}
public static void main(String[] args) throws InterruptedException {
AtomicCounterExample example = new AtomicCounterExample();
ExecutorService executor = Executors.newFixedThreadPool(10);
// 1000번 증가 작업 제출
for (int i = 0; i < 1000; i++) {
executor.submit(example::increment);
}
executor.shutdown();
executor.awaitTermination(1, TimeUnit.SECONDS);
System.out.println("최종 카운터: " + example.get()); // 정확히 1000
}
}
AtomicBoolean으로 상태 관리
import java.util.concurrent.atomic.AtomicBoolean;
public class FileProcessor {
private AtomicBoolean isProcessing = new AtomicBoolean(false);
public void processFile(String filePath) {
// 이미 처리 중이면 스킵
if (!isProcessing.compareAndSet(false, true)) {
System.out.println("이미 처리 중입니다.");
return;
}
try {
// 파일 처리 로직
System.out.println("파일 처리 중: " + filePath);
Thread.sleep(2000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
isProcessing.set(false); // 처리 완료
}
}
}
compareAndSet(expected, update): 현재 값이 expected면 update로 변경하고 true 반환 (원자적)
Semaphore로 동시 접근 제어
Semaphore란
특정 자원에 동시 접근할 수 있는 스레드 수를 제한합니다.
import java.util.concurrent.Semaphore;
public class ConnectionPool {
private Semaphore semaphore;
public ConnectionPool(int maxConnections) {
this.semaphore = new Semaphore(maxConnections);
}
public void useConnection(String taskName) {
try {
System.out.println(taskName + " - 커넥션 대기 중...");
semaphore.acquire(); // 허가 획득 (없으면 대기)
System.out.println(taskName + " - 커넥션 획득! [가용: " +
semaphore.availablePermits() + "]");
// DB 작업 시뮬레이션
Thread.sleep(2000);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
System.out.println(taskName + " - 커넥션 반환");
semaphore.release(); // 허가 반환
}
}
public static void main(String[] args) {
ConnectionPool pool = new ConnectionPool(3); // 최대 3개 동시 접근
ExecutorService executor = Executors.newFixedThreadPool(10);
for (int i = 0; i < 10; i++) {
final String taskName = "Task-" + i;
executor.submit(() -> pool.useConnection(taskName));
}
executor.shutdown();
}
}
출력 예시:
Task-0 - 커넥션 대기 중...
Task-1 - 커넥션 대기 중...
Task-0 - 커넥션 획득! [가용: 2]
Task-1 - 커넥션 획득! [가용: 1]
Task-2 - 커넥션 대기 중...
Task-2 - 커넥션 획득! [가용: 0]
Task-3 - 커넥션 대기 중...
// Task-3는 누군가 release()할 때까지 대기
synchronized vs Lock
synchronized 키워드
public class SynchronizedExample {
private int balance = 0;
// 메서드 전체 동기화
public synchronized void deposit(int amount) {
balance += amount;
}
// 블록 단위 동기화
public void withdraw(int amount) {
synchronized (this) {
balance -= amount;
}
}
// 정적 메서드 동기화 (클래스 레벨 락)
public static synchronized void staticMethod() {
// ...
}
}
장점:
- 간단하고 명확
- 자동 락 해제 (예외 발생시에도)
단점:
- 타임아웃 불가
- 공정성 제어 불가
- 읽기/쓰기 구분 불가
ReentrantLock
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class LockExample {
private final Lock lock = new ReentrantLock();
private int balance = 0;
public void deposit(int amount) {
lock.lock();
try {
balance += amount;
} finally {
lock.unlock(); // 반드시 finally에서 해제
}
}
// 타임아웃 지원
public boolean tryDepositWithTimeout(int amount) {
try {
if (lock.tryLock(1, TimeUnit.SECONDS)) {
try {
balance += amount;
return true;
} finally {
lock.unlock();
}
} else {
System.out.println("락 획득 실패 - 타임아웃");
return false;
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return false;
}
}
}
ReadWriteLock으로 성능 향상
읽기는 동시에, 쓰기는 독점적으로:
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
public class CacheWithReadWriteLock {
private final Map<String, String> cache = new HashMap<>();
private final ReadWriteLock rwLock = new ReentrantReadWriteLock();
public String get(String key) {
rwLock.readLock().lock();
try {
return cache.get(key);
} finally {
rwLock.readLock().unlock();
}
}
public void put(String key, String value) {
rwLock.writeLock().lock();
try {
cache.put(key, value);
} finally {
rwLock.writeLock().unlock();
}
}
}
읽기가 많고 쓰기가 적으면 성능이 크게 향상됩니다.
CompletableFuture 비동기 처리
Future의 한계
ExecutorService executor = Executors.newFixedThreadPool(2);
Future<String> future = executor.submit(() -> {
Thread.sleep(2000);
return "결과";
});
// 블로킹 대기
String result = future.get(); // 2초 동안 멈춤
콜백이나 체이닝이 불가능합니다.
CompletableFuture 기본
import java.util.concurrent.CompletableFuture;
public class CompletableFutureExample {
public static void main(String[] args) {
// 비동기 작업 시작
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
System.out.println("비동기 작업 시작 [" +
Thread.currentThread().getName() + "]");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return "데이터 조회 완료";
});
// 콜백 체이닝
future.thenApply(result -> {
System.out.println("변환: " + result);
return result.toUpperCase();
}).thenAccept(result -> {
System.out.println("최종 결과: " + result);
}).exceptionally(ex -> {
System.err.println("오류 발생: " + ex.getMessage());
return null;
});
System.out.println("메인 스레드는 계속 실행");
// 완료 대기 (실전에서는 다른 방식 사용)
future.join();
}
}
여러 작업 조합
public class CombineExample {
public static void main(String[] args) {
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
sleep(1000);
return "사용자 정보";
});
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
sleep(1500);
return "주문 정보";
});
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
sleep(800);
return "배송 정보";
});
// 모두 완료 대기
CompletableFuture<Void> allOf = CompletableFuture.allOf(future1, future2, future3);
allOf.thenRun(() -> {
try {
System.out.println(future1.get());
System.out.println(future2.get());
System.out.println(future3.get());
} catch (Exception e) {
e.printStackTrace();
}
});
// 가장 빠른 것 하나만
CompletableFuture<Object> anyOf = CompletableFuture.anyOf(future1, future2, future3);
anyOf.thenAccept(result -> {
System.out.println("가장 빠른 결과: " + result);
});
allOf.join();
}
private static void sleep(long millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
순차적 의존 작업
CompletableFuture.supplyAsync(() -> {
return getUserId();
})
.thenCompose(userId -> {
return CompletableFuture.supplyAsync(() -> getUserProfile(userId));
})
.thenApply(profile -> {
return enrichProfile(profile);
})
.thenAccept(finalProfile -> {
System.out.println("프로필: " + finalProfile);
});
thenCompose는 중첩된 Future를 평탄화합니다 (flatMap과 유사).
Context Switching (컨텍스트 스위칭)
스레드를 많이 만든다고 항상 빨라지는 것은 아닙니다. CPU가 여러 스레드를 전환하는 비용을 이해해야 합니다.
컨텍스트 스위칭이란
CPU 코어 1개는 한 순간에 1개의 스레드만 실행 가능
→ 여러 스레드를 번갈아가며 실행
→ 이 전환 작업 = Context Switching
스레드 전환 1회마다 다음 작업이 발생합니다:
- 현재 스레드 상태(레지스터, 프로그램 카운터 등) 저장
- 다음 스레드 상태 불러오기
- CPU 캐시 초기화 (캐시 미스 발생)
전환 1회당 약 1~10 마이크로초가 소요됩니다.
스레드가 많을수록 나빠지는 이유
스레드 2개: 실제작업 90% / 전환비용 10%
스레드 1000개: 실제작업 20% / 전환비용 80%
스레드 수가 늘어날수록 실제 작업보다 전환 비용이 커져 전체 성능이 오히려 저하됩니다.
OS가 강제로 번갈아 실행하는 이유 (선점형 스케줄링)
- 응답성 보장: 긴 작업 중에도 다른 스레드가 CPU 시간을 확보
- CPU 낭비 방지: I/O 대기 중인 스레드의 자리를 다른 스레드가 채움
- 공평성: 특정 스레드의 CPU 독점 방지
스레드 최대 개수와 적정 수
제한 요소
| 요소 | 내용 |
|---|---|
| 메모리 | 스레드 1개당 512KB |
| OS 제한 | Linux 기준 수천~수만 사이 |
| Context Switching | 스레드가 많을수록 전환 비용 증가 |
실무 권장 개수
CPU 바운드 작업 (연산이 많은): CPU 코어 수 + 1
I/O 바운드 작업 (대기가 많은): CPU 코어 수 * 2~4
코어 수 * 2는 스위칭을 없애는 것이 아니라, I/O 대기 중 놀고 있는 CPU를 최대한 활용하기 위한 숫자입니다.
적정 스레드 수 = CPU 코어 수 * (1 + 대기시간/실행시간)
예를 들어 4코어 CPU에서 작업의 75%가 I/O 대기라면: 4 * (1 + 0.75/0.25) = 16개가 적정입니다.
LinkedBlockingQueue 내부 동작 원리
ThreadPoolExecutor의 작업 큐로 자주 사용되는 LinkedBlockingQueue가 어떻게 스레드 안전성을 보장하는지 알아봅니다.
간섭 없는 이유 3가지
1. 분리된 Lock - 동시 접근 방지
putLock → put 전용 락
takeLock → take 전용 락
→ put과 take가 서로 다른 락 사용 → 동시에 넣고 꺼내도 충돌 없음
일반적인 큐와 달리 put/take가 서로 다른 락을 사용하므로, 생산자와 소비자가 동시에 동작할 수 있습니다.
2. Blocking - 스레드 자동 대기/깨움
큐가 비면 → take() 호출한 워커 스레드가 자동 sleep
데이터 들어오면 → 자동으로 깨어남
→ CPU 낭비 없이 대기
바쁜 대기(busy waiting) 없이 효율적으로 동작합니다.
3. 메모리 가시성 보장
내부적으로 volatile + Lock 사용
→ 한 스레드가 넣은 값을 다른 스레드가 반드시 볼 수 있음
워커 1개 vs 워커 N개
단일 워커와 다중 워커의 차이를 이해하는 것이 중요합니다.
워커 1개 (newSingleThreadExecutor):
명령1 (30초) → 처리 중...
명령2 (30초) → 큐 대기 (30초 후 시작)
워커 3개 (newFixedThreadPool(3)):
명령1 (30초) → 워커1 처리 중...
명령2 (30초) → 워커2 처리 중... → 동시 실행
명령3 (30초) → 워커3 처리 중... → 동시 실행
// 고정 워커 (권장) - 스레드 수 예측 가능
ExecutorService worker = Executors.newFixedThreadPool(4);
// 단일 워커 - 순서 보장이 필요할 때
ExecutorService worker = Executors.newSingleThreadExecutor();
// 유동 워커 - 스레드 무한 생성 위험, 짧은 작업에만 사용
ExecutorService worker = Executors.newCachedThreadPool();
다중 워커에서 큐를 공유하는 패턴:
private final BlockingQueue<String> commandQueue = new LinkedBlockingQueue<>();
private final ExecutorService worker = Executors.newFixedThreadPool(4);
@PostConstruct
public void start() {
for (int i = 0; i < 4; i++) {
worker.submit(() -> {
while (true) {
String cmd = commandQueue.take(); // 큐가 비면 자동 대기
doLongTask(cmd); // 4개 동시 처리
}
});
}
}
LinkedBlockingQueue가 스레드 안전하므로 4개의 워커가 하나의 큐에서 동시에 작업을 꺼내도 안전합니다.
실전 예시: 파일 병렬 전송 시나리오
실제 파일 동기화 시스템에서 여러 파티션의 파일을 병렬로 전송하는 예제입니다.
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
public class ParallelFileSyncSystem {
private final ExecutorService executor;
private final Semaphore bandwidthLimiter;
private final AtomicInteger totalFilesProcessed = new AtomicInteger(0);
private final ConcurrentHashMap<String, SyncStatus> statusMap = new ConcurrentHashMap<>();
public ParallelFileSyncSystem(int maxThreads, int maxConcurrentUploads) {
this.executor = new ThreadPoolExecutor(
maxThreads / 2,
maxThreads,
60L,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>(1000),
new ThreadPoolExecutor.CallerRunsPolicy()
);
this.bandwidthLimiter = new Semaphore(maxConcurrentUploads);
}
public void syncPartitions(List<Partition> partitions) {
List<CompletableFuture<Void>> futures = new ArrayList<>();
for (Partition partition : partitions) {
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
syncPartition(partition);
}, executor);
futures.add(future);
}
// 모든 파티션 동기화 완료 대기
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.thenRun(() -> {
System.out.println("=== 전체 동기화 완료 ===");
System.out.println("처리된 파일 수: " + totalFilesProcessed.get());
printStatusSummary();
})
.join();
}
private void syncPartition(Partition partition) {
System.out.println("파티션 동기화 시작: " + partition.getName());
statusMap.put(partition.getName(), SyncStatus.IN_PROGRESS);
List<String> files = partition.getFiles();
List<CompletableFuture<Void>> fileFutures = new ArrayList<>();
for (String file : files) {
CompletableFuture<Void> fileFuture = CompletableFuture.runAsync(() -> {
transferFile(partition.getName(), file);
}, executor);
fileFutures.add(fileFuture);
}
// 파티션 내 모든 파일 완료 대기
CompletableFuture.allOf(fileFutures.toArray(new CompletableFuture[0]))
.thenRun(() -> {
statusMap.put(partition.getName(), SyncStatus.COMPLETED);
System.out.println("파티션 완료: " + partition.getName() +
" (" + files.size() + " files)");
})
.exceptionally(ex -> {
statusMap.put(partition.getName(), SyncStatus.FAILED);
System.err.println("파티션 실패: " + partition.getName() +
" - " + ex.getMessage());
return null;
})
.join();
}
private void transferFile(String partitionName, String filePath) {
try {
// 대역폭 제한 (동시 업로드 수 제어)
bandwidthLimiter.acquire();
System.out.println(" [" + partitionName + "] 전송 시작: " + filePath +
" [" + Thread.currentThread().getName() + "]");
// 파일 전송 시뮬레이션
Thread.sleep(ThreadLocalRandom.current().nextInt(500, 1500));
totalFilesProcessed.incrementAndGet();
System.out.println(" [" + partitionName + "] 전송 완료: " + filePath);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new RuntimeException("전송 중단: " + filePath, e);
} finally {
bandwidthLimiter.release();
}
}
private void printStatusSummary() {
System.out.println("\n=== 파티션별 상태 ===");
statusMap.forEach((partition, status) -> {
System.out.println(partition + ": " + status);
});
}
public void shutdown() {
executor.shutdown();
try {
if (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
executor.shutdownNow();
}
} catch (InterruptedException e) {
executor.shutdownNow();
}
}
// 데이터 클래스
static class Partition {
private final String name;
private final List<String> files;
public Partition(String name, List<String> files) {
this.name = name;
this.files = files;
}
public String getName() { return name; }
public List<String> getFiles() { return files; }
}
enum SyncStatus {
IN_PROGRESS, COMPLETED, FAILED
}
// 테스트 실행
public static void main(String[] args) {
List<Partition> partitions = Arrays.asList(
new Partition("partition-A", Arrays.asList(
"/data/a/file1.dat", "/data/a/file2.dat", "/data/a/file3.dat"
)),
new Partition("partition-B", Arrays.asList(
"/data/b/file1.dat", "/data/b/file2.dat"
)),
new Partition("partition-C", Arrays.asList(
"/data/c/file1.dat", "/data/c/file2.dat", "/data/c/file3.dat", "/data/c/file4.dat"
))
);
ParallelFileSyncSystem syncSystem = new ParallelFileSyncSystem(
8, // 최대 스레드 수
3 // 동시 업로드 수 (대역폭 제한)
);
long startTime = System.currentTimeMillis();
syncSystem.syncPartitions(partitions);
long endTime = System.currentTimeMillis();
System.out.println("\n총 소요 시간: " + (endTime - startTime) + "ms");
syncSystem.shutdown();
}
}
핵심 포인트:
- ThreadPoolExecutor: 스레드 수와 큐 크기 커스터마이징
- Semaphore: 동시 업로드 수 제한으로 대역폭 관리
- ConcurrentHashMap: 파티션별 상태를 안전하게 관리
- AtomicInteger: 전체 처리 파일 수 카운팅
- CompletableFuture: 파티션별, 파일별 비동기 처리 및 조합
- CallerRunsPolicy: 백프레셔로 메모리 보호
출력 예시:
파티션 동기화 시작: partition-A
파티션 동기화 시작: partition-B
[partition-A] 전송 시작: /data/a/file1.dat [pool-1-thread-1]
[partition-B] 전송 시작: /data/b/file1.dat [pool-1-thread-2]
[partition-A] 전송 완료: /data/a/file1.dat
[partition-A] 전송 시작: /data/a/file2.dat [pool-1-thread-1]
...
파티션 완료: partition-B (2 files)
파티션 완료: partition-A (3 files)
=== 전체 동기화 완료 ===
처리된 파일 수: 9
마무리
Java 동시성 프로그래밍의 핵심을 정리하면:
- ThreadPool 사용: 직접 Thread 생성 대신 ExecutorService 사용
- 적절한 크기 설정: CPU 바운드는 코어 수 + 1, I/O 바운드는 코어 수 * 2~4
- Context Switching 인식: 스레드를 무한정 늘리면 전환 비용이 실제 작업을 압도
- LinkedBlockingQueue: 생산자-소비자 패턴의 핵심, 분리 락으로 동시 put/take 가능
- 동시성 컬렉션: HashMap 대신 ConcurrentHashMap
- Atomic 클래스: 간단한 카운터나 플래그는 AtomicInteger/AtomicBoolean
- Semaphore: 자원 접근 수 제한
- CompletableFuture: 비동기 작업 체이닝과 조합
- 항상 종료 처리: shutdown() + awaitTermination()
동시성은 어렵지만, Java가 제공하는 도구를 잘 활용하면 안전하고 효율적인 멀티스레드 프로그램을 만들 수 있습니다. 작은 예제부터 시작해서 점진적으로 복잡한 시스템으로 확장해 보세요.