지난 (상)편과 (중)편을 통해 우리는 Python과 PyMongo를 사용하여 MongoDB의 기본부터 고급 쿼리, 인덱싱, 그리고 집계 프레임워크의 기초까지 탄탄하게 다져왔습니다.
이제 그 여정의 마지막 단계로, MongoDB를 한층 더 전문적으로 활용할 수 있게 해주는 최고급 기술들과 실전에서 유용한 팁들을 함께 살펴보겠습니다.
이번 시간에는 여러 데이터베이스 작업을 하나의 원자적 단위로 묶어 데이터 일관성을 보장하는
트랜잭션(Transaction)
, 이미지나 동영상 같은 대용량 파일을 MongoDB에 효율적으로 저장하고 관리하는
GridFS
, 데이터베이스의 변경 사항을 실시간으로 감지하여 대응할 수 있게 해주는
Change Streams
에 대해 배울 것입니다.
더불어, 실제 Python 애플리케이션 개발 시 MongoDB를 더욱 효과적으로 사용하기 위한 실전 팁들과, 비동기 프로그래밍 환경에서 유용한 드라이버인
, 모든 작업이 성공적으로 완료되거나(커밋 Commit), 하나라도 실패하면 모든 작업이 이전 상태로 되돌려지는(롤백 Rollback) 것을 보장하는 기능입니다.
이를 통해 데이터의
원자성(Atomicity)
과
일관성(Consistency)
을 유지할 수 있습니다.
👨💻
MongoDB에서 트랜잭션을 사용하려면, 해당 MongoDB 서버가 반드시 복제 세트(Replica Set)로 구성되어 있어야 합니다. 단일 mongod 인스턴스에서는 트랜잭션을 사용할 수 없습니다.
(샤드 클러스터 환경에서도 트랜잭션 사용 가능)
PyMongo에서 트랜잭션을 사용하려면 MongoClient에서 세션(Session)을 시작하고, 해당 세션 내에서 트랜잭션 작업을 수행합니다.
# (상), (중)편에서 사용한 Atlas 연결 및 client, db, users_collection 객체가 준비되었다고 가정from pymongo import MongoClientATLAS_URI = "..."client = MongoClient(ATLAS_URI) # 복제 세트 환경에 연결되어 있어야 함def transfer_money(from_user_name, to_user_name, amount): # 트랜잭션은 반드시 세션 내에서 실행되어야 합니다. with client.start_session() as session: # 세션 내에서 트랜잭션 시작 with session.start_transaction(): try: print(f"\n--- '{from_user_name}'에서 '{to_user_name}'으로 {amount} 송금 트랜잭션 시작 ---") users_collection = client[DATABASE_NAME][COLLECTION_NAME] # 세션 내에서 컬렉션 객체 다시 얻기 권장 # 1. 보내는 사람 계좌에서 돈 인출 from_user_update_result = users_collection.update_one( {"name": from_user_name, "balance": {"$gte": amount}}, # 잔액 충분한지 확인 {"$inc": {"balance": -amount}}, session=session # 모든 작업에 세션 전달 ) if from_user_update_result.matched_count == 0 or from_user_update_result.modified_count == 0: # 여기서 session.abort_transaction()을 명시적으로 호출할 수도 있지만, # 예외 발생 시 with 블록이 종료되면서 자동으로 롤백됩니다. raise Exception(f"'{from_user_name}'의 잔액이 부족하거나 사용자를 찾을 수 없습니다. 롤백됩니다.") print(f"'{from_user_name}' 계좌에서 {amount} 인출 성공.") # 2. 받는 사람 계좌에 돈 입금 # (의도적으로 오류를 발생시켜 롤백 테스트를 해볼 수도 있습니다. 예: to_user_name을 없는 이름으로) # if to_user_name == "없는사람": # raise ValueError("테스트용 강제 오류 발생!") to_user_update_result = users_collection.update_one( {"name": to_user_name}, {"$inc": {"balance": amount}}, session=session # 모든 작업에 세션 전달 ) if to_user_update_result.matched_count == 0: raise Exception(f"'{to_user_name}' 사용자를 찾을 수 없습니다. 롤백됩니다.") print(f"'{to_user_name}' 계좌에 {amount} 입금 성공.") # 모든 작업이 성공했으므로 트랜잭션 커밋 (with 블록 정상 종료 시 자동 커밋) # session.commit_transaction() # 명시적으로 호출할 수도 있음 print("모든 작업 성공, 트랜잭션이 커밋되었습니다.") return True except Exception as e: print(f"트랜잭션 중 오류 발생: {e}") # with session.start_transaction() 블록 내에서 예외 발생 시 자동으로 롤백됨 # session.abort_transaction() # 명시적으로 호출할 수도 있음 print("오류로 인해 트랜잭션이 롤백되었습니다.") return False # 테스트를 위해 사용자 데이터 준비 (실제 실행 전 (상)편 코드 등으로 데이터 삽입 필요) # users_collection.update_many({}, {"$set": {"balance": 1000}}) # 모든 사용자 잔액 1000으로 초기화 # print(f"송금 결과: {transfer_money('유관순', '안중근', 100)}") # print(f"송금 결과 (롤백 테스트 - 잔액 부족): {transfer_money('김구', '윤봉길', 2000)}") # print(f"송금 결과 (롤백 테스트 - 받는이 없음): {transfer_money('안중근', '없는사람', 50)}")
주요 포인트 (트랜잭션)
client.start_session()
세션을 시작합니다. with 문과 함께 사용하면 세션이 자동으로 닫힙니다.
session.start_transaction()
트랜잭션을 시작합니다. 이 또한 with 문과 함께 사용하면 트랜잭션이 정상 종료 시 자동 커밋되고, 예외 발생 시 자동 롤백됩니다.
트랜잭션 내의 모든 CRUD 작업에는 반드시 session=session 인자를 전달해야 합니다.
트랜잭션은 여러 문서나 여러 컬렉션에 걸친 작업도 지원합니다 (단, 동일 데이터베이스 내).
트랜잭션은 기본적으로 60초의 시간 제한이 있습니다. (maxCommitTimeMS 옵션으로 조절 가능)
MongoDB의 BSON 문서 크기는 기본적으로 16MB로 제한됩니다. 이보다 큰 파일(예: 고화질 이미지, 동영상, 대용량 로그 파일 등)을 저장해야 할 경우
GridFS
를 사용할 수 있습니다.
GridFS는 큰 파일을 여러 개의 작은 "청크(chunk)"로 나누어 별도의 컬렉션에 저장하고, 파일에 대한 메타데이터(파일명, 타입, 업로드 날짜 등)를 또 다른 컬렉션에 저장하는 규격입니다.
PyMongo에서는 gridfs 모듈을 통해 GridFS 기능을 제공합니다.
import gridfsfrom bson.objectid import ObjectId # GridFS 객체 생성 (데이터베이스 객체 db가 준비되어 있어야 함) # db = client[DATABASE_NAME] # 이전 코드에서 이어짐fs = gridfs.GridFS(db) # 기본적으로 'fs.files'와 'fs.chunks' 컬렉션 사용 # 파일 업로드 (put 메서드 사용)file_path_to_upload = "my_large_video.mp4" # 예시 파일 경로 # (실제 테스트를 위해서는 이 경로에 파일이 있어야 합니다.) # 여기서는 간단히 텍스트 파일을 업로드하는 예시로 대체합니다.try: with open("my_document.txt", "w") as f: f.write("이것은 GridFS에 저장될 문서입니다. 여러 청크로 나뉠 수 있습니다!") with open("my_document.txt", "rb") as f_to_upload: # 바이너리 읽기 모드 # put()은 파일 객체 또는 바이트 데이터를 받음, filename은 필수 file_id = fs.put(f_to_upload, filename="important_document.txt", encoding="utf-8", uploader="Alice") print(f"\n--- GridFS 파일 업로드 ---") print(f"파일 업로드 성공, File ID: {file_id} (ObjectId)") # 파일 찾기 및 정보 확인 (get_last_version 또는 get) # filename으로 가장 최신 버전의 파일 정보(GridOut 객체) 가져오기 grid_out_file = fs.get_last_version(filename="important_document.txt") if grid_out_file: print(f"\n파일 '{grid_out_file.filename}' 정보:") print(f" ID: {grid_out_file._id}") print(f" 길이: {grid_out_file.length} bytes") print(f" 업로드 날짜: {grid_out_file.upload_date}") print(f" MD5: {grid_out_file.md5}") print(f" 인코딩: {grid_out_file.encoding}") # put 할 때 지정한 메타데이터 print(f" 업로더: {grid_out_file.uploader}") # 파일 내용 읽기 (다운로드) # grid_out_file은 파일과 유사한 인터페이스를 제공 (read, seek 등) # with open("downloaded_document.txt", "wb") as f_downloaded: # f_downloaded.write(grid_out_file.read()) # print("\n파일 내용 다운로드 완료: downloaded_document.txt") # 여기서는 간단히 콘솔에 출력 print("\n파일 내용 일부:") print(grid_out_file.read(50).decode('utf-8') + "...") # 처음 50바이트만 읽어서 디코딩 grid_out_file.close() # GridOut 객체도 닫아주는 것이 좋음 # 파일 삭제 (delete 메서드 사용 - 파일 ID로 삭제) # fs.delete(file_id) # print(f"\n파일 ID '{file_id}' 삭제 완료.") # 파일 존재 여부 확인 (exists 메서드) # print(f"파일 'important_document.txt' 존재 여부: {fs.exists(filename='important_document.txt')}") # print(f"파일 ID '{file_id}' 존재 여부: {fs.exists(file_id)}")except FileNotFoundError: print(f"GridFS 업로드/다운로드 테스트를 위한 파일을 찾을 수 없습니다: {file_path_to_upload} 또는 my_document.txt")except Exception as e: print(f"GridFS 작업 중 오류 발생: {e}")
fs.put(data, **kwargs)
파일이나 바이트 데이터를 GridFS에 저장합니다. filename은 필수로 지정해야 하며, 다른 메타데이터(예: contentType, encoding, 사용자 정의 필드)도 함께 저장할 수 있습니다. 저장된 파일의 _id(ObjectId)를 반환합니다.
fs.get(file_id) 또는 fs.get_last_version(filename=...)
GridFS에서 파일을 읽기 위한 GridOut 객체를 반환합니다. GridOut 객체는 파일과 유사한 인터페이스(read(), seek(), tell(), close())를 제공합니다.
Change Streams는 복제 세트나 샤드 클러스터 내의 특정 컬렉션, 데이터베이스, 또는 전체 배포 환경에서 발생하는 데이터 변경(삽입, 업데이트, 삭제, 교체 등) 사항을
실시간으로 스트리밍
받을 수 있는 기능입니다.
이를 통해 실시간 알림, 데이터 동기화, 감사 로깅 등 다양한 기능을 구현할 수 있습니다.
Change Streams를 사용하려면 MongoDB 서버 버전과 복제 세트 구성 등의 요구 사항이 충족되어야 합니다. (MongoDB 3.6 이상 복제 세트 필요)
# Change Streams는 보통 별도의 스크립트나 백그라운드 작업으로 계속 실행됩니다. # 여기서는 간단한 예시로, 특정 시간 동안 변경 사항을 감시합니다. # db = client[DATABASE_NAME] # 이전 코드에서 이어짐 # monitored_collection = db[COLLECTION_NAME]pipeline = [{"$match": {"operationType": {"$in": ["insert", "update", "delete"]}}}]try: print(f"\n--- '{COLLECTION_NAME}' 컬렉션에 대한 Change Streams 시작 (10초간 감시) ---") # with monitored_collection.watch(pipeline) as stream: # MongoDB 4.0.7+ with monitored_collection.watch(pipeline, max_await_time_ms=1000) as stream: # 이전 버전 호환성 및 타임아웃 for change in stream: print("변경 사항 감지:") print(change) # 전체 변경 문서 출력 # 필요한 정보 추출 및 처리 로직 추가 # 예: print(f" 작업 유형: {change['operationType']}") # if 'fullDocument' in change: # print(f" 전체 문서: {change['fullDocument']}") # if 'documentKey' in change: # print(f" 문서 키: {change['documentKey']}") # 예제이므로, 몇 초 후 또는 특정 조건에서 루프를 빠져나오도록 처리 필요 # 이 예제에서는 watch()가 블로킹될 수 있으므로 실제 사용 시 주의 # 여기서는 예시를 위해 바로 break (실제로는 별도 스레드나 비동기 처리 필요) # 이 루프는 변경이 있을 때마다 실행됩니다. # 이 예제에서는 바로 break하여 한번만 출력하고 종료하도록 합니다. (테스트용) # 실제 애플리케이션에서는 이 루프를 계속 유지하거나, # stream.next()가 None을 반환할 때까지 (max_await_time_ms 이후) 루프를 돌게 됩니다. break # 데모를 위해 첫 번째 변경 후 종료 print("Change Streams 감시 종료 (예시).") # 감시 중에 다른 터미널에서 monitored_collection에 데이터 변경 작업을 수행하면 위에서 감지됩니다. # 예: users_collection.insert_one({"name": "실시간테스트", "change_stream_test": True}) # users_collection.update_one({"name": "실시간테스트"}, {"$set": {"value": 123}}) # users_collection.delete_one({"name": "실시간테스트"})except pymongo.errors.PyMongoError as e: print(f"Change Streams 작업 중 오류: {e}")
주요 포인트 (Change Streams)
collection.watch(pipeline=None, full_document='default', ...)
특정 컬렉션에 대한 Change Stream 커서를 엽니다. pipeline을 사용하여 특정 작업 유형만 감시하거나 필드를 추가할 수 있습니다.
full_document 옵션
'updateLookup'으로 설정하면 업데이트 작업 시 변경 후 전체 문서를 함께 반환합니다. (기본값 'default'는 일부 정보만)
반환된 ChangeStreamCursor는 이터러블이며, 새로운 변경 이벤트가 발생할 때까지 블로킹될 수 있습니다. max_await_time_ms 옵션으로 대기 시간을 조절할 수 있습니다.
각 변경 이벤트는 특정 구조를 가진 문서를 반환하며, operationType, documentKey, fullDocument(설정에 따라), updateDescription 등의 필드를 포함합니다.
이것으로 3편에 걸친 "파이썬 MongoDB 완전 정복" 외전 시리즈가 모두 마무리되었습니다.
(상)편의 기본 다지기부터 시작하여, (중)편의 고급 쿼리와 분석 입문, 그리고 오늘 (하)편에서는 트랜잭션, GridFS, Change Streams와 같은 전문가 수준의 기능들과 실전 팁, 비동기 처리까지 살펴보았습니다.
이 외전 시리즈를 통해 여러분이 Python 환경에서 MongoDB를 더욱 깊이 있고 자신감 있게 활용할 수 있는 튼튼한 발판을 마련하셨기를 바랍니다.
MongoDB와 Python은 함께 사용될 때 매우 강력한 시너지를 내는 조합이며, 현대 웹 개발, 데이터 분석, 빅데이터 처리 등 다양한 분야에서 그 가치를 인정받고 있습니다.
그동안 MongoDB 올인 본편 시리즈와 Python 외전 시리즈를 함께해주신 모든 분들께 진심으로 감사드립니다.