MongoClient
에서 세션(Session)을 시작하고, 해당 세션 내에서 트랜잭션 작업을 수행합니다. # (상), (중)편에서 사용한 Atlas 연결 및 client, db, users_collection 객체가 준비되었다고 가정
from pymongo import MongoClient
ATLAS_URI = "..."
client = MongoClient(ATLAS_URI) # 복제 세트 환경에 연결되어 있어야 함
def transfer_money(from_user_name, to_user_name, amount):
# 트랜잭션은 반드시 세션 내에서 실행되어야 합니다.
with client.start_session() as session:
# 세션 내에서 트랜잭션 시작
with session.start_transaction():
try:
print(f"\n--- '{from_user_name}'에서 '{to_user_name}'으로 {amount} 송금 트랜잭션 시작 ---")
users_collection = client[DATABASE_NAME][COLLECTION_NAME] # 세션 내에서 컬렉션 객체 다시 얻기 권장
# 1. 보내는 사람 계좌에서 돈 인출
from_user_update_result = users_collection.update_one(
{"name": from_user_name, "balance": {"$gte": amount}}, # 잔액 충분한지 확인
{"$inc": {"balance": -amount}},
session=session # 모든 작업에 세션 전달
)
if from_user_update_result.matched_count == 0 or from_user_update_result.modified_count == 0:
# 여기서 session.abort_transaction()을 명시적으로 호출할 수도 있지만,
# 예외 발생 시 with 블록이 종료되면서 자동으로 롤백됩니다.
raise Exception(f"'{from_user_name}'의 잔액이 부족하거나 사용자를 찾을 수 없습니다. 롤백됩니다.")
print(f"'{from_user_name}' 계좌에서 {amount} 인출 성공.")
# 2. 받는 사람 계좌에 돈 입금
# (의도적으로 오류를 발생시켜 롤백 테스트를 해볼 수도 있습니다. 예: to_user_name을 없는 이름으로)
# if to_user_name == "없는사람":
# raise ValueError("테스트용 강제 오류 발생!")
to_user_update_result = users_collection.update_one(
{"name": to_user_name},
{"$inc": {"balance": amount}},
session=session # 모든 작업에 세션 전달
)
if to_user_update_result.matched_count == 0:
raise Exception(f"'{to_user_name}' 사용자를 찾을 수 없습니다. 롤백됩니다.")
print(f"'{to_user_name}' 계좌에 {amount} 입금 성공.")
# 모든 작업이 성공했으므로 트랜잭션 커밋 (with 블록 정상 종료 시 자동 커밋)
# session.commit_transaction() # 명시적으로 호출할 수도 있음
print("모든 작업 성공, 트랜잭션이 커밋되었습니다.")
return True
except Exception as e:
print(f"트랜잭션 중 오류 발생: {e}")
# with session.start_transaction() 블록 내에서 예외 발생 시 자동으로 롤백됨
# session.abort_transaction() # 명시적으로 호출할 수도 있음
print("오류로 인해 트랜잭션이 롤백되었습니다.")
return False
# 테스트를 위해 사용자 데이터 준비 (실제 실행 전 (상)편 코드 등으로 데이터 삽입 필요)
# users_collection.update_many({}, {"$set": {"balance": 1000}}) # 모든 사용자 잔액 1000으로 초기화
# print(f"송금 결과: {transfer_money('유관순', '안중근', 100)}")
# print(f"송금 결과 (롤백 테스트 - 잔액 부족): {transfer_money('김구', '윤봉길', 2000)}")
# print(f"송금 결과 (롤백 테스트 - 받는이 없음): {transfer_money('안중근', '없는사람', 50)}")
client.start_session()
세션을 시작합니다. with
문과 함께 사용하면 세션이 자동으로 닫힙니다.session.start_transaction()
트랜잭션을 시작합니다. 이 또한 with
문과 함께 사용하면 트랜잭션이 정상 종료 시 자동 커밋되고, 예외 발생 시 자동 롤백됩니다.session=session
인자를 전달해야 합니다.maxCommitTimeMS
옵션으로 조절 가능)gridfs
모듈을 통해 GridFS 기능을 제공합니다.import gridfs
from bson.objectid import ObjectId
# GridFS 객체 생성 (데이터베이스 객체 db가 준비되어 있어야 함)
# db = client[DATABASE_NAME] # 이전 코드에서 이어짐
fs = gridfs.GridFS(db) # 기본적으로 'fs.files'와 'fs.chunks' 컬렉션 사용
# 파일 업로드 (put 메서드 사용)
file_path_to_upload = "my_large_video.mp4" # 예시 파일 경로
# (실제 테스트를 위해서는 이 경로에 파일이 있어야 합니다.)
# 여기서는 간단히 텍스트 파일을 업로드하는 예시로 대체합니다.
try:
with open("my_document.txt", "w") as f:
f.write("이것은 GridFS에 저장될 문서입니다. 여러 청크로 나뉠 수 있습니다!")
with open("my_document.txt", "rb") as f_to_upload: # 바이너리 읽기 모드
# put()은 파일 객체 또는 바이트 데이터를 받음, filename은 필수
file_id = fs.put(f_to_upload, filename="important_document.txt", encoding="utf-8", uploader="Alice")
print(f"\n--- GridFS 파일 업로드 ---")
print(f"파일 업로드 성공, File ID: {file_id} (ObjectId)")
# 파일 찾기 및 정보 확인 (get_last_version 또는 get)
# filename으로 가장 최신 버전의 파일 정보(GridOut 객체) 가져오기
grid_out_file = fs.get_last_version(filename="important_document.txt")
if grid_out_file:
print(f"\n파일 '{grid_out_file.filename}' 정보:")
print(f" ID: {grid_out_file._id}")
print(f" 길이: {grid_out_file.length} bytes")
print(f" 업로드 날짜: {grid_out_file.upload_date}")
print(f" MD5: {grid_out_file.md5}")
print(f" 인코딩: {grid_out_file.encoding}") # put 할 때 지정한 메타데이터
print(f" 업로더: {grid_out_file.uploader}")
# 파일 내용 읽기 (다운로드)
# grid_out_file은 파일과 유사한 인터페이스를 제공 (read, seek 등)
# with open("downloaded_document.txt", "wb") as f_downloaded:
# f_downloaded.write(grid_out_file.read())
# print("\n파일 내용 다운로드 완료: downloaded_document.txt")
# 여기서는 간단히 콘솔에 출력
print("\n파일 내용 일부:")
print(grid_out_file.read(50).decode('utf-8') + "...") # 처음 50바이트만 읽어서 디코딩
grid_out_file.close() # GridOut 객체도 닫아주는 것이 좋음
# 파일 삭제 (delete 메서드 사용 - 파일 ID로 삭제)
# fs.delete(file_id)
# print(f"\n파일 ID '{file_id}' 삭제 완료.")
# 파일 존재 여부 확인 (exists 메서드)
# print(f"파일 'important_document.txt' 존재 여부: {fs.exists(filename='important_document.txt')}")
# print(f"파일 ID '{file_id}' 존재 여부: {fs.exists(file_id)}")
except FileNotFoundError:
print(f"GridFS 업로드/다운로드 테스트를 위한 파일을 찾을 수 없습니다: {file_path_to_upload} 또는 my_document.txt")
except Exception as e:
print(f"GridFS 작업 중 오류 발생: {e}")
GridFS
객체를 생성합니다. 기본적으로 fs.files
(메타데이터)와 fs.chunks
(데이터 청크) 컬렉션을 사용합니다.filename
은 필수로 지정해야 하며, 다른 메타데이터(예: contentType
, encoding
, 사용자 정의 필드)도 함께 저장할 수 있습니다. 저장된 파일의 _id
(ObjectId)를 반환합니다.GridOut
객체를 반환합니다. GridOut
객체는 파일과 유사한 인터페이스(read()
, seek()
, tell()
, close()
)를 제공합니다. # Change Streams는 보통 별도의 스크립트나 백그라운드 작업으로 계속 실행됩니다.
# 여기서는 간단한 예시로, 특정 시간 동안 변경 사항을 감시합니다.
# db = client[DATABASE_NAME] # 이전 코드에서 이어짐
# monitored_collection = db[COLLECTION_NAME]
pipeline = [{"$match": {"operationType": {"$in": ["insert", "update", "delete"]}}}]
try:
print(f"\n--- '{COLLECTION_NAME}' 컬렉션에 대한 Change Streams 시작 (10초간 감시) ---")
# with monitored_collection.watch(pipeline) as stream: # MongoDB 4.0.7+
with monitored_collection.watch(pipeline, max_await_time_ms=1000) as stream: # 이전 버전 호환성 및 타임아웃
for change in stream:
print("변경 사항 감지:")
print(change) # 전체 변경 문서 출력
# 필요한 정보 추출 및 처리 로직 추가
# 예: print(f" 작업 유형: {change['operationType']}")
# if 'fullDocument' in change:
# print(f" 전체 문서: {change['fullDocument']}")
# if 'documentKey' in change:
# print(f" 문서 키: {change['documentKey']}")
# 예제이므로, 몇 초 후 또는 특정 조건에서 루프를 빠져나오도록 처리 필요
# 이 예제에서는 watch()가 블로킹될 수 있으므로 실제 사용 시 주의
# 여기서는 예시를 위해 바로 break (실제로는 별도 스레드나 비동기 처리 필요)
# 이 루프는 변경이 있을 때마다 실행됩니다.
# 이 예제에서는 바로 break하여 한번만 출력하고 종료하도록 합니다. (테스트용)
# 실제 애플리케이션에서는 이 루프를 계속 유지하거나,
# stream.next()가 None을 반환할 때까지 (max_await_time_ms 이후) 루프를 돌게 됩니다.
break # 데모를 위해 첫 번째 변경 후 종료
print("Change Streams 감시 종료 (예시).")
# 감시 중에 다른 터미널에서 monitored_collection에 데이터 변경 작업을 수행하면 위에서 감지됩니다.
# 예: users_collection.insert_one({"name": "실시간테스트", "change_stream_test": True})
# users_collection.update_one({"name": "실시간테스트"}, {"$set": {"value": 123}})
# users_collection.delete_one({"name": "실시간테스트"})
except pymongo.errors.PyMongoError as e:
print(f"Change Streams 작업 중 오류: {e}")
pipeline
을 사용하여 특정 작업 유형만 감시하거나 필드를 추가할 수 있습니다.'updateLookup'
으로 설정하면 업데이트 작업 시 변경 후 전체 문서를 함께 반환합니다. (기본값 'default'
는 일부 정보만)ChangeStreamCursor
는 이터러블이며, 새로운 변경 이벤트가 발생할 때까지 블로킹될 수 있습니다. max_await_time_ms
옵션으로 대기 시간을 조절할 수 있습니다.operationType
, documentKey
, fullDocument
(설정에 따라), updateDescription
등의 필드를 포함합니다.w="majority"
)retryWrites=true
URI 옵션 활용)asyncio
를 사용한 비동기 프로그래밍을 한다면, PyMongo(동기 드라이버) 대신 비동기 MongoDB 드라이버인 async/await
문법과 완벽하게 호환되며, 논블로킹(non-blocking) I/O를 통해 높은 동시성 처리 능력을 제공합니다.
pip install motor
import asyncio
import motor.motor_asyncio # motor.motor_tornado 도 있음
async def do_motor_stuff():
ATLAS_URI_MOTOR = "mongodb+srv://<username>:<password>@<cluster-url>/<dbname>?retryWrites=true&w=majority"
client = motor.motor_asyncio.AsyncIOMotorClient(ATLAS_URI_MOTOR)
db = client.myAppDB
users_collection = db.users_python_외전 # 컬렉션 이름은 기존과 동일하게 사용 가능
# 비동기 작업 예시
await users_collection.insert_one({"name": "MotorTest", "value": 1})
doc = await users_collection.find_one({"name": "MotorTest"})
print(f"\nMotor로 찾은 문서: {doc}")
client.close() # Motor도 사용 후 닫아주는 것이 좋음
# asyncio.run(do_motor_stuff()) # Python 3.7+
# 또는
# if __name__ == "__main__":
# loop = asyncio.get_event_loop()
# loop.run_until_complete(do_motor_stuff())
그동안 MongoDB 올인 본편 시리즈와 Python 외전 시리즈를 함께해주신 모든 분들께 진심으로 감사드립니다.