PromleeBlog
sitemapaboutMe

posting thumbnail
트랜잭션, GridFS와 실전 팁 - MongoDB와 Python (하)
Transactions, GridFS & Pro Tips - MongoDB with Python (Pt. 3)

📅

🚀

들어가기 전에🔗

지난 (상)편과 (중)편을 통해 우리는 Python과 PyMongo를 사용하여 MongoDB의 기본부터 고급 쿼리, 인덱싱, 그리고 집계 프레임워크의 기초까지 탄탄하게 다져왔습니다.
이제 그 여정의 마지막 단계로, MongoDB를 한층 더 전문적으로 활용할 수 있게 해주는 최고급 기술들과 실전에서 유용한 팁들을 함께 살펴보겠습니다.

이번 시간에는 여러 데이터베이스 작업을 하나의 원자적 단위로 묶어 데이터 일관성을 보장하는
트랜잭션(Transaction)
, 이미지나 동영상 같은 대용량 파일을 MongoDB에 효율적으로 저장하고 관리하는
GridFS
, 데이터베이스의 변경 사항을 실시간으로 감지하여 대응할 수 있게 해주는
Change Streams
에 대해 배울 것입니다.
더불어, 실제 Python 애플리케이션 개발 시 MongoDB를 더욱 효과적으로 사용하기 위한 실전 팁들과, 비동기 프로그래밍 환경에서 유용한 드라이버인
Motor
에 대해서도 간략히 소개하며 이 외전 시리즈를 마무리하겠습니다.

🚀

1. 데이터 일관성 보장: PyMongo 트랜잭션🔗

트랜잭션은
두 개 이상의 데이터베이스 작업을 하나의 논리적인 작업 단위로 묶어서
, 모든 작업이 성공적으로 완료되거나(커밋 Commit), 하나라도 실패하면 모든 작업이 이전 상태로 되돌려지는(롤백 Rollback) 것을 보장하는 기능입니다.
이를 통해 데이터의
원자성(Atomicity)
일관성(Consistency)
을 유지할 수 있습니다.
👨‍💻
MongoDB에서 트랜잭션을 사용하려면, 해당 MongoDB 서버가 반드시 복제 세트(Replica Set)로 구성되어 있어야 합니다. 단일 mongod 인스턴스에서는 트랜잭션을 사용할 수 없습니다.
(샤드 클러스터 환경에서도 트랜잭션 사용 가능)
PyMongo에서 트랜잭션을 사용하려면 MongoClient에서 세션(Session)을 시작하고, 해당 세션 내에서 트랜잭션 작업을 수행합니다.
  # (상), (중)편에서 사용한 Atlas 연결 및 client, db, users_collection 객체가 준비되었다고 가정
from pymongo import MongoClient
ATLAS_URI = "..."
client = MongoClient(ATLAS_URI) # 복제 세트 환경에 연결되어 있어야 함
 
def transfer_money(from_user_name, to_user_name, amount):
    # 트랜잭션은 반드시 세션 내에서 실행되어야 합니다.
    with client.start_session() as session:
        # 세션 내에서 트랜잭션 시작
        with session.start_transaction():
            try:
                print(f"\n--- '{from_user_name}'에서 '{to_user_name}'으로 {amount} 송금 트랜잭션 시작 ---")
                
                users_collection = client[DATABASE_NAME][COLLECTION_NAME] # 세션 내에서 컬렉션 객체 다시 얻기 권장
 
                # 1. 보내는 사람 계좌에서 돈 인출
                from_user_update_result = users_collection.update_one(
                    {"name": from_user_name, "balance": {"$gte": amount}}, # 잔액 충분한지 확인
                    {"$inc": {"balance": -amount}},
                    session=session # 모든 작업에 세션 전달
                )
 
                if from_user_update_result.matched_count == 0 or from_user_update_result.modified_count == 0:
                    # 여기서 session.abort_transaction()을 명시적으로 호출할 수도 있지만,
                    # 예외 발생 시 with 블록이 종료되면서 자동으로 롤백됩니다.
                    raise Exception(f"'{from_user_name}'의 잔액이 부족하거나 사용자를 찾을 수 없습니다. 롤백됩니다.")
 
                print(f"'{from_user_name}' 계좌에서 {amount} 인출 성공.")
 
                # 2. 받는 사람 계좌에 돈 입금
                # (의도적으로 오류를 발생시켜 롤백 테스트를 해볼 수도 있습니다. 예: to_user_name을 없는 이름으로)
                # if to_user_name == "없는사람":
                #     raise ValueError("테스트용 강제 오류 발생!")
 
                to_user_update_result = users_collection.update_one(
                    {"name": to_user_name},
                    {"$inc": {"balance": amount}},
                    session=session # 모든 작업에 세션 전달
                )
 
                if to_user_update_result.matched_count == 0:
                    raise Exception(f"'{to_user_name}' 사용자를 찾을 수 없습니다. 롤백됩니다.")
                
                print(f"'{to_user_name}' 계좌에 {amount} 입금 성공.")
 
                # 모든 작업이 성공했으므로 트랜잭션 커밋 (with 블록 정상 종료 시 자동 커밋)
                # session.commit_transaction() # 명시적으로 호출할 수도 있음
                print("모든 작업 성공, 트랜잭션이 커밋되었습니다.")
                return True
 
            except Exception as e:
                print(f"트랜잭션 중 오류 발생: {e}")
                # with session.start_transaction() 블록 내에서 예외 발생 시 자동으로 롤백됨
                # session.abort_transaction() # 명시적으로 호출할 수도 있음
                print("오류로 인해 트랜잭션이 롤백되었습니다.")
                return False
 
  # 테스트를 위해 사용자 데이터 준비 (실제 실행 전 (상)편 코드 등으로 데이터 삽입 필요)
  # users_collection.update_many({}, {"$set": {"balance": 1000}}) # 모든 사용자 잔액 1000으로 초기화
  # print(f"송금 결과: {transfer_money('유관순', '안중근', 100)}")
  # print(f"송금 결과 (롤백 테스트 - 잔액 부족): {transfer_money('김구', '윤봉길', 2000)}")
  # print(f"송금 결과 (롤백 테스트 - 받는이 없음): {transfer_money('안중근', '없는사람', 50)}")
주요 포인트 (트랜잭션)

🚀

2. 대용량 파일 저장소: GridFS🔗

MongoDB의 BSON 문서 크기는 기본적으로 16MB로 제한됩니다. 이보다 큰 파일(예: 고화질 이미지, 동영상, 대용량 로그 파일 등)을 저장해야 할 경우
GridFS
를 사용할 수 있습니다.
GridFS는 큰 파일을 여러 개의 작은 "청크(chunk)"로 나누어 별도의 컬렉션에 저장하고, 파일에 대한 메타데이터(파일명, 타입, 업로드 날짜 등)를 또 다른 컬렉션에 저장하는 규격입니다.
PyMongo에서는 gridfs 모듈을 통해 GridFS 기능을 제공합니다.
import gridfs
from bson.objectid import ObjectId
 
  # GridFS 객체 생성 (데이터베이스 객체 db가 준비되어 있어야 함)
  # db = client[DATABASE_NAME] # 이전 코드에서 이어짐
fs = gridfs.GridFS(db) # 기본적으로 'fs.files'와 'fs.chunks' 컬렉션 사용
 
  # 파일 업로드 (put 메서드 사용)
file_path_to_upload = "my_large_video.mp4" # 예시 파일 경로
  # (실제 테스트를 위해서는 이 경로에 파일이 있어야 합니다.)
  # 여기서는 간단히 텍스트 파일을 업로드하는 예시로 대체합니다.
try:
    with open("my_document.txt", "w") as f:
        f.write("이것은 GridFS에 저장될 문서입니다. 여러 청크로 나뉠 수 있습니다!")
    
    with open("my_document.txt", "rb") as f_to_upload: # 바이너리 읽기 모드
        # put()은 파일 객체 또는 바이트 데이터를 받음, filename은 필수
        file_id = fs.put(f_to_upload, filename="important_document.txt", encoding="utf-8", uploader="Alice")
    print(f"\n--- GridFS 파일 업로드 ---")
    print(f"파일 업로드 성공, File ID: {file_id} (ObjectId)")
 
    # 파일 찾기 및 정보 확인 (get_last_version 또는 get)
    # filename으로 가장 최신 버전의 파일 정보(GridOut 객체) 가져오기
    grid_out_file = fs.get_last_version(filename="important_document.txt")
    if grid_out_file:
        print(f"\n파일 '{grid_out_file.filename}' 정보:")
        print(f"  ID: {grid_out_file._id}")
        print(f"  길이: {grid_out_file.length} bytes")
        print(f"  업로드 날짜: {grid_out_file.upload_date}")
        print(f"  MD5: {grid_out_file.md5}")
        print(f"  인코딩: {grid_out_file.encoding}") # put 할 때 지정한 메타데이터
        print(f"  업로더: {grid_out_file.uploader}")
 
        # 파일 내용 읽기 (다운로드)
        # grid_out_file은 파일과 유사한 인터페이스를 제공 (read, seek 등)
        # with open("downloaded_document.txt", "wb") as f_downloaded:
        #     f_downloaded.write(grid_out_file.read())
        # print("\n파일 내용 다운로드 완료: downloaded_document.txt")
        
        # 여기서는 간단히 콘솔에 출력
        print("\n파일 내용 일부:")
        print(grid_out_file.read(50).decode('utf-8') + "...") # 처음 50바이트만 읽어서 디코딩
        grid_out_file.close() # GridOut 객체도 닫아주는 것이 좋음
 
    # 파일 삭제 (delete 메서드 사용 - 파일 ID로 삭제)
    # fs.delete(file_id)
    # print(f"\n파일 ID '{file_id}' 삭제 완료.")
 
    # 파일 존재 여부 확인 (exists 메서드)
    # print(f"파일 'important_document.txt' 존재 여부: {fs.exists(filename='important_document.txt')}")
    # print(f"파일 ID '{file_id}' 존재 여부: {fs.exists(file_id)}")
 
 
except FileNotFoundError:
    print(f"GridFS 업로드/다운로드 테스트를 위한 파일을 찾을 수 없습니다: {file_path_to_upload} 또는 my_document.txt")
except Exception as e:
    print(f"GridFS 작업 중 오류 발생: {e}")
 
주요 포인트 (GridFS)

🚀

3. 실시간 데이터 변경 감지: Change Streams🔗

Change Streams는 복제 세트나 샤드 클러스터 내의 특정 컬렉션, 데이터베이스, 또는 전체 배포 환경에서 발생하는 데이터 변경(삽입, 업데이트, 삭제, 교체 등) 사항을
실시간으로 스트리밍
받을 수 있는 기능입니다.
이를 통해 실시간 알림, 데이터 동기화, 감사 로깅 등 다양한 기능을 구현할 수 있습니다.
Change Streams를 사용하려면 MongoDB 서버 버전과 복제 세트 구성 등의 요구 사항이 충족되어야 합니다. (MongoDB 3.6 이상 복제 세트 필요)
  # Change Streams는 보통 별도의 스크립트나 백그라운드 작업으로 계속 실행됩니다.
  # 여기서는 간단한 예시로, 특정 시간 동안 변경 사항을 감시합니다.
 
  # db = client[DATABASE_NAME] # 이전 코드에서 이어짐
  # monitored_collection = db[COLLECTION_NAME]
pipeline = [{"$match": {"operationType": {"$in": ["insert", "update", "delete"]}}}]
 
try:
    print(f"\n--- '{COLLECTION_NAME}' 컬렉션에 대한 Change Streams 시작 (10초간 감시) ---")
    # with monitored_collection.watch(pipeline) as stream: # MongoDB 4.0.7+
    with monitored_collection.watch(pipeline, max_await_time_ms=1000) as stream: # 이전 버전 호환성 및 타임아웃
        for change in stream:
            print("변경 사항 감지:")
            print(change) # 전체 변경 문서 출력
            # 필요한 정보 추출 및 처리 로직 추가
            # 예: print(f"  작업 유형: {change['operationType']}")
            #     if 'fullDocument' in change:
            #         print(f"  전체 문서: {change['fullDocument']}")
            #     if 'documentKey' in change:
            #         print(f"  문서 키: {change['documentKey']}")
            
            # 예제이므로, 몇 초 후 또는 특정 조건에서 루프를 빠져나오도록 처리 필요
            # 이 예제에서는 watch()가 블로킹될 수 있으므로 실제 사용 시 주의
            # 여기서는 예시를 위해 바로 break (실제로는 별도 스레드나 비동기 처리 필요)
            # 이 루프는 변경이 있을 때마다 실행됩니다.
            # 이 예제에서는 바로 break하여 한번만 출력하고 종료하도록 합니다. (테스트용)
            # 실제 애플리케이션에서는 이 루프를 계속 유지하거나,
            # stream.next()가 None을 반환할 때까지 (max_await_time_ms 이후) 루프를 돌게 됩니다.
            break # 데모를 위해 첫 번째 변경 후 종료
    
    print("Change Streams 감시 종료 (예시).")
    
    # 감시 중에 다른 터미널에서 monitored_collection에 데이터 변경 작업을 수행하면 위에서 감지됩니다.
    # 예: users_collection.insert_one({"name": "실시간테스트", "change_stream_test": True})
    #     users_collection.update_one({"name": "실시간테스트"}, {"$set": {"value": 123}})
    #     users_collection.delete_one({"name": "실시간테스트"})
 
except pymongo.errors.PyMongoError as e:
    print(f"Change Streams 작업 중 오류: {e}")
주요 포인트 (Change Streams)

🚀

4. 실전 활용 팁 및 비동기 드라이버 Motor 간략 소개🔗

실전 활용 팁🔗

비동기 프로그래밍을 위한 Motor (간략 소개)🔗

Python에서 asyncio를 사용한 비동기 프로그래밍을 한다면, PyMongo(동기 드라이버) 대신 비동기 MongoDB 드라이버인
Motor
를 사용하는 것이 좋습니다.
Motor는 async/await 문법과 완벽하게 호환되며, 논블로킹(non-blocking) I/O를 통해 높은 동시성 처리 능력을 제공합니다.
Motor 설치:
pip install motor
Motor 기본 사용 예시 (asyncio 환경)
import asyncio
import motor.motor_asyncio # motor.motor_tornado 도 있음
 
async def do_motor_stuff():
    ATLAS_URI_MOTOR = "mongodb+srv://<username>:<password>@<cluster-url>/<dbname>?retryWrites=true&w=majority"
    client = motor.motor_asyncio.AsyncIOMotorClient(ATLAS_URI_MOTOR)
    db = client.myAppDB
    users_collection = db.users_python_외전 # 컬렉션 이름은 기존과 동일하게 사용 가능
 
    # 비동기 작업 예시
    await users_collection.insert_one({"name": "MotorTest", "value": 1})
    doc = await users_collection.find_one({"name": "MotorTest"})
    print(f"\nMotor로 찾은 문서: {doc}")
    
    client.close() # Motor도 사용 후 닫아주는 것이 좋음
 
  # asyncio.run(do_motor_stuff()) # Python 3.7+
  # 또는
  # if __name__ == "__main__":
  #     loop = asyncio.get_event_loop()
  #     loop.run_until_complete(do_motor_stuff())
Motor의 API는 PyMongo와 매우 유사하여, PyMongo에 익숙하다면 쉽게 배울 수 있습니다.

🚀

결론🔗

이것으로 3편에 걸친 "파이썬 MongoDB 완전 정복" 외전 시리즈가 모두 마무리되었습니다.
(상)편의 기본 다지기부터 시작하여, (중)편의 고급 쿼리와 분석 입문, 그리고 오늘 (하)편에서는 트랜잭션, GridFS, Change Streams와 같은 전문가 수준의 기능들과 실전 팁, 비동기 처리까지 살펴보았습니다.

이 외전 시리즈를 통해 여러분이 Python 환경에서 MongoDB를 더욱 깊이 있고 자신감 있게 활용할 수 있는 튼튼한 발판을 마련하셨기를 바랍니다.
MongoDB와 Python은 함께 사용될 때 매우 강력한 시너지를 내는 조합이며, 현대 웹 개발, 데이터 분석, 빅데이터 처리 등 다양한 분야에서 그 가치를 인정받고 있습니다.
그동안 MongoDB 올인 본편 시리즈와 Python 외전 시리즈를 함께해주신 모든 분들께 진심으로 감사드립니다.

참고🔗