[파이썬] Multiprocessing, Multithreading μ‚¬μš© μ‹œ κ³ λ € 사항

Posted by Euisuk's Dev Log on February 19, 2024

[파이썬] Multiprocessing, Multithreading μ‚¬μš© μ‹œ κ³ λ € 사항

원본 κ²Œμ‹œκΈ€: https://velog.io/@euisuk-chung/파이썬-Multiprocessing-Multithreading-μ‚¬μš©-μ‹œ-κ³ λ €-사항-μ˜ˆμ‹œ-μ½”λ“œ-포함

λ©€ν‹°ν”„λ‘œμ„Έμ‹±κ³Ό λ©€ν‹°μŠ€λ ˆλ”©μ„ 선택할 λ•Œ κ³ λ €ν•΄μ•Ό ν•˜λŠ” μž‘μ—…μ˜ νŠΉμ„±μ— λŒ€ν•΄ 더 μžμ„Ένžˆ μ„€λͺ…ν•˜κ² μŠ΅λ‹ˆλ‹€. μžμ› λΆ„ν•  및 할당에 λŒ€ν•΄μ„œ κΆκΈˆν•˜μ‹  뢄을 λ‹€μŒ κ²Œμ‹œκΈ€μ—μ„œ ν™•μΈν•˜μ‹€ 수 μžˆμŠ΅λ‹ˆλ‹€.

λ©€ν‹°ν”„λ‘œμ„Έμ‹±(Multiprocessing)

λ©€ν‹°ν”„λ‘œμ„Έμ‹±μ€ 주둜 CPU 집약적인 μž‘μ—…μ— μ ν•©ν•©λ‹ˆλ‹€. 이런 μž‘μ—…μ€ 계산이 많이 ν•„μš”ν•˜κ³  CPU의 μžμ›μ„ 많이 μ‚¬μš©ν•˜λŠ” κ²½μš°μΈλ°μš”. 예둜, 데이터 뢄석, 이미지 처리, λ³΅μž‘ν•œ μˆ˜ν•™μ  계산 등이 여기에 ν•΄λ‹Ήν•©λ‹ˆλ‹€.

λ©€ν‹°ν”„λ‘œμ„Έμ‹±μ„ μ‚¬μš©ν•˜λ©΄ 각 ν”„λ‘œμ„ΈμŠ€κ°€ λ…λ¦½λœ λ©”λͺ¨λ¦¬ 곡간을 κ°€μ§€κΈ° λ•Œλ¬Έμ—, ν•œ ν”„λ‘œμ„ΈμŠ€μ—μ„œ λ°œμƒν•œ 문제(예: λ©”λͺ¨λ¦¬ λˆ„μˆ˜)κ°€ λ‹€λ₯Έ ν”„λ‘œμ„ΈμŠ€μ— 영ν–₯을 λ―ΈμΉ˜μ§€ μ•ŠμŠ΅λ‹ˆλ‹€. λ˜ν•œ, λ©€ν‹°μ½”μ–΄ CPUλ₯Ό 효율적으둜 μ‚¬μš©ν•˜μ—¬ μž‘μ—…μ„ λ³‘λ ¬λ‘œ μ²˜λ¦¬ν•  수 μžˆμŠ΅λ‹ˆλ‹€.

λ©€ν‹°ν”„λ‘œμ„Έμ‹± μ˜ˆμ‹œ μ½”λ“œ μ„€λͺ…

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from multiprocessing import Pool
import pandas as pd

def process_file(file_name):
    # λŒ€μš©λŸ‰ 파일 처리 둜직
    data = pd.read_csv(file_name)
    # λ³΅μž‘ν•œ 데이터 처리 μž‘μ—… μˆ˜ν–‰
    processed_data = data # 이 뢀뢄은 μ˜ˆμ‹œλ‘œ λŒ€μ²΄
    return processed_data

if __name__ == '__main__':
    files = ['file1.csv', 'file2.csv', 'file3.csv'] # μ²˜λ¦¬ν•  파일 λͺ©λ‘
    with Pool(processes=3) as pool:
        results = pool.map(process_file, files)
  • Pool 객체λ₯Ό μ‚¬μš©ν•˜μ—¬ λ©€ν‹°ν”„λ‘œμ„Έμ‹± 풀을 μƒμ„±ν•©λ‹ˆλ‹€. 이 풀은 μ—¬λŸ¬ ν”„λ‘œμ„ΈμŠ€λ₯Ό κ΄€λ¦¬ν•˜λ©°, 각 ν”„λ‘œμ„ΈμŠ€μ— μž‘μ—…μ„ ν• λ‹Ήν•©λ‹ˆλ‹€.
  • process_file ν•¨μˆ˜λŠ” 각 νŒŒμΌμ„ μ²˜λ¦¬ν•˜λŠ” λ‘œμ§μ„ λ‹΄κ³  μžˆμŠ΅λ‹ˆλ‹€. μ—¬κΈ°μ„œλŠ” μ˜ˆμ‹œλ‘œ pandasλ₯Ό μ‚¬μš©ν•˜μ—¬ CSV νŒŒμΌμ„ μ½μ–΄λ“€μ΄λŠ” κ²ƒμœΌλ‘œ λ‚˜νƒ€λƒˆμŠ΅λ‹ˆλ‹€.
  • pool.map λ©”μ†Œλ“œλ₯Ό 톡해 파일 λͺ©λ‘μ— λŒ€ν•΄ process_file ν•¨μˆ˜λ₯Ό λ³‘λ ¬λ‘œ μ‹€ν–‰ν•©λ‹ˆλ‹€. 각 ν”„λ‘œμ„ΈμŠ€λŠ” 파일 λͺ©λ‘ 쀑 ν•˜λ‚˜λ₯Ό λ°›μ•„ λ…λ¦½μ μœΌλ‘œ μ²˜λ¦¬ν•©λ‹ˆλ‹€.

μœ„μ—μ„œ μ‚¬μš©ν•œ pool.map λŒ€μ‹  start와 join을 μ‚¬μš©ν•˜κ³ λ„ λ™μΌν•˜κ²Œ λ©€ν‹°ν”„λ‘œμ„Έμ‹±μ„ μˆ˜ν–‰ν•  수 μžˆμŠ΅λ‹ˆλ‹€.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
from multiprocessing import Process
import pandas as pd

def process_file(file_name):
    # λŒ€μš©λŸ‰ 파일 처리 둜직
    print(f"Processing {file_name}")
    data = pd.read_csv(file_name)
    # λ³΅μž‘ν•œ 데이터 처리 μž‘μ—… μˆ˜ν–‰
    processed_data = data  # 이 뢀뢄은 μ˜ˆμ‹œλ‘œ λŒ€μ²΄
    # 처리된 데이터 λ°˜ν™˜ λŒ€μ‹  파일 이름과 처리 μ™„λ£Œ 좜λ ₯
    print(f"Finished processing {file_name}")
    return processed_data

if __name__ == '__main__':
    files = ['file1.csv', 'file2.csv', 'file3.csv']  # μ²˜λ¦¬ν•  파일 λͺ©λ‘
    processes = []

    # 각 νŒŒμΌμ— λŒ€ν•΄ λ³„λ„μ˜ ν”„λ‘œμ„ΈμŠ€ 생성 및 μ‹œμž‘
    for file_name in files:
        process = Process(target=process_file, args=(file_name,))
        processes.append(process)
        process.start()

    # λͺ¨λ“  ν”„λ‘œμ„ΈμŠ€μ˜ μ™„λ£Œλ₯Ό κΈ°λ‹€λ¦Ό
    for process in processes:
        process.join()

    print("All processing complete.")
  • .start() λ©”μ„œλ“œλ₯Ό ν˜ΈμΆœν•˜μ—¬ 각 ν”„λ‘œμ„ΈμŠ€λ₯Ό μ‹œμž‘ν•©λ‹ˆλ‹€. μ΄λŠ” 각 파일 처리 μž‘μ—…μ„ λ³‘λ ¬λ‘œ μ‹€ν–‰ν•˜κ²Œ λ§Œλ“­λ‹ˆλ‹€.
  • .join() λ©”μ„œλ“œλ₯Ό μ‚¬μš©ν•˜μ—¬ 메인 ν”„λ‘œμ„ΈμŠ€κ°€ λͺ¨λ“  μžμ‹ ν”„λ‘œμ„ΈμŠ€μ˜ 싀행이 μ™„λ£Œλ  λ•ŒκΉŒμ§€ κΈ°λ‹€λ¦¬κ²Œ ν•©λ‹ˆλ‹€. μ΄λŠ” λͺ¨λ“  데이터 처리 μž‘μ—…μ΄ μ™„λ£Œλ  λ•ŒκΉŒμ§€ ν”„λ‘œκ·Έλž¨μ˜ μ’…λ£Œλ₯Ό μ§€μ—°μ‹œν‚΅λ‹ˆλ‹€.

λ©€ν‹°μŠ€λ ˆλ”©(Multithreading)

λ©€ν‹°μŠ€λ ˆλ”©μ€ 주둜 I/O λ°”μš΄λ“œ μž‘μ—…μ— μ ν•©ν•©λ‹ˆλ‹€. 이런 μž‘μ—…μ€ λŒ€λŸ‰μ˜ μž…μΆœλ ₯, 예λ₯Ό λ“€μ–΄, 파일 읽기/μ“°κΈ°, λ„€νŠΈμ›Œν¬ 톡신 등을 ν•„μš”λ‘œ ν•˜λŠ” κ²½μš°μž…λ‹ˆλ‹€. 이 경우, ν”„λ‘œκ·Έλž¨μ€ λŒ€λΆ€λΆ„μ˜ μ‹œκ°„μ„ 데이터λ₯Ό μ½κ±°λ‚˜ μ“°κΈ°λ₯Ό 기닀리며 λ³΄λ‚΄κ²Œ λ˜λŠ”λ°, μ΄λ•Œ CPUλŠ” λ§Žμ€ μ‹œκ°„ λ™μ•ˆ 아무 일도 ν•˜μ§€ μ•Šκ²Œ λ©λ‹ˆλ‹€.

λ©€ν‹°μŠ€λ ˆλ”©μ„ μ‚¬μš©ν•˜λ©΄, ν•œ μŠ€λ ˆλ“œκ°€ I/O μž‘μ—…μœΌλ‘œ λŒ€κΈ° μƒνƒœμΌ λ•Œ λ‹€λ₯Έ μŠ€λ ˆλ“œκ°€ CPUλ₯Ό μ‚¬μš©ν•˜μ—¬ μž‘μ—…μ„ 계속 μ§„ν–‰ν•  수 μžˆμŠ΅λ‹ˆλ‹€. 이λ₯Ό 톡해 ν”„λ‘œκ·Έλž¨μ˜ 전체 μ‹€ν–‰ μ‹œκ°„μ„ 쀄일 수 μžˆμŠ΅λ‹ˆλ‹€.

λ©€ν‹°μŠ€λ ˆλ”© μ˜ˆμ‹œ μ½”λ“œ μ„€λͺ…

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import threading
import requests

def fetch_data(url):
    # μ›Ήμ‚¬μ΄νŠΈμ—μ„œ 데이터λ₯Ό μš”μ²­ν•˜κ³  μ²˜λ¦¬ν•˜λŠ” 둜직
    response = requests.get(url)
    data = response.text # 이 뢀뢄은 μ˜ˆμ‹œλ‘œ λŒ€μ²΄
    return data

urls = ['http://example.com/data1', 'http://example.com/data2', 'http://example.com/data3'] # 데이터λ₯Ό μˆ˜μ§‘ν•  μ›Ήμ‚¬μ΄νŠΈ μ£Όμ†Œ λͺ©λ‘

threads = []
for url in urls:
    thread = threading.Thread(target=fetch_data, args=(url,))
    threads.append(thread)
    thread.start()

for thread in

 threads:
    thread.join()
  • μ—¬κΈ°μ„œλŠ” threading.Threadλ₯Ό μ‚¬μš©ν•˜μ—¬ 각 URL에 λŒ€ν•œ 데이터 μˆ˜μ§‘ μž‘μ—…μ„ λ³„λ„μ˜ μŠ€λ ˆλ“œλ‘œ μ‹€ν–‰ν•©λ‹ˆλ‹€.
  • fetch_data ν•¨μˆ˜λŠ” μ£Όμ–΄μ§„ URLμ—μ„œ 데이터λ₯Ό μš”μ²­ν•˜κ³ , 응닡을 λ°›μ•„μ˜€λŠ” 역할을 ν•©λ‹ˆλ‹€.
  • 각 μŠ€λ ˆλ“œλŠ” λ…λ¦½μ μœΌλ‘œ μ›Ήμ‚¬μ΄νŠΈμ— μš”μ²­μ„ 보내고 응닡을 κΈ°λ‹€λ¦½λ‹ˆλ‹€. 이 κ³Όμ •μ—μ„œ I/O μž‘μ—…μ΄ μ§„ν–‰λ˜λŠ” λ™μ•ˆ, λ‹€λ₯Έ μŠ€λ ˆλ“œλŠ” CPUλ₯Ό ν™œμš©ν•  수 μžˆμŠ΅λ‹ˆλ‹€.
  • thread.join()은 메인 μŠ€λ ˆλ“œκ°€ λͺ¨λ“  μŠ€λ ˆλ“œμ˜ μž‘μ—…μ΄ 끝날 λ•ŒκΉŒμ§€ 기닀리도둝 ν•©λ‹ˆλ‹€.

λ©€ν‹°ν”„λ‘œμ„Έμ‹±κ³Ό λ©€ν‹°μŠ€λ ˆλ”©μ˜ 선택은 μž‘μ—…μ˜ νŠΉμ„±μ— 따라 λ‹¬λΌμ§‘λ‹ˆλ‹€. CPU μ‚¬μš©μ΄ λ§Žμ€ μž‘μ—…μ—λŠ” λ©€ν‹°ν”„λ‘œμ„Έμ‹±μ„, I/O μž‘μ—…μ΄ λ§Žμ€ κ²½μš°μ—λŠ” λ©€ν‹°μŠ€λ ˆλ”©μ„ μ‚¬μš©ν•˜λŠ” 것이 μ’‹μŠ΅λ‹ˆλ‹€.



-->