PromleeBlog
sitemap
aboutMe

posting thumbnail
고급 쿼리와 데이터 분석의 시작 - MongoDB와 Python (중)
Advanced Queries & Intro to Data Analysis - MongoDB with Python (Pt. 2)

📅

🚀

들어가기 전에 🔗

지난 (상)편에서는 PyMongo의 기본적인 CRUD 사용법과 Atlas 연결의 심화된 내용을 함께 살펴보았습니다.
(중)편인 오늘은 여기서 한 걸음 더 나아가, MongoDB의 강력한 데이터 처리 능력을 Python 코드로 활용하는 방법을 배워보겠습니다.

이번 시간에는 PyMongo를 사용하여 복잡한 조건의 데이터를 효과적으로 찾아내는 고급 쿼리 기법들을 더 깊이 있게 다루고, 쿼리 성능에 결정적인 영향을 미치는
인덱스를 Python 코드로 직접 생성하고 관리하는 방법
을 알아볼 것입니다.
그리고
집계 프레임워크(Aggregation Framework)의 기초
를 PyMongo 예제를 통해, 단순 조회를 넘어선 데이터 분석의 첫발을 내디뎌 보겠습니다.

🚀

PyMongo를 활용한 고급 쿼리 작성 심화 🔗

(상)편에서 find_one()find()의 기본적인 필터링, 프로젝션, 정렬, limit 옵션을 배웠습니다. 이제 MongoDB 쿼리 언어의 다양한 연산자들을 PyMongo에서 어떻게 활용하는지 더 자세히 살펴보겠습니다.
실습을 위해 이전 (상)편에서 사용한 Atlas 연결 및 users_collection 객체가 준비되었다고 가정합니다.
  # 이전 코드에서 이어짐
from pymongo import MongoClient
ATLAS_URI = "..."
client = MongoClient(ATLAS_URI)
db = client["<dbname>"]
users_collection = db["users_python_외전"]

1. 비교 연산자 활용 🔗

(상)편의 find() 예제에서 $gte를 사용한 것처럼, 다른 비교 연산자들도 동일한 방식으로 사용합니다.
  # 예시: 나이가 20세 초과 30세 미만인 사용자 조회
  # (20 < age < 30)
query_age_range = {"age": {"$gt": 20, "$lt": 30}}
for user in users_collection.find(query_age_range, {"name": 1, "age": 1, "_id": 0}):
    print(f"20-30세 사용자: {user}")
 
  # 예시: 'tags' 필드에 "patriot" 또는 "hero"가 포함된 사용자 조회
query_tags_in = {"tags": {"$in": ["patriot", "hero"]}}
for user in users_collection.find(query_tags_in, {"name": 1, "tags": 1, "_id": 0}):
    print(f"애국자 또는 영웅 태그 사용자: {user}")

2. 논리 연산자 활용 🔗

여러 조건을 조합할 때 사용합니다.
  # 예시: 나이가 25세 미만이거나 (OR) 직업이 '독립운동가'인 사용자 조회
query_or_condition = {
    "$or": [
        {"age": {"$lt": 25}},
        {"occupation": "독립운동가"} # (상)편 예제 데이터 기준: ' 대한민국 독립유공자'
    ]
}
print("\n25세 미만이거나 독립운동가인 사용자:")
for user in users_collection.find(query_or_condition, {"name": 1, "age": 1, "occupation":1, "_id":0}):
    print(user)
 
  # 예시: 'email' 필드가 존재하면서 (AND) 'status' 필드가 'martyr'가 아닌 사용자
  # $and는 명시적으로 사용할 수도 있지만, 기본적으로 딕셔너리 내 여러 키는 AND로 처리됩니다.
query_and_not_condition = {
    "email": {"$exists": True}, # email 필드가 존재하고
    "status": {"$ne": "martyr"}  # status가 'martyr'가 아닌
}
print("\n이메일이 있고 순교자가 아닌 사용자:")
for user in users_collection.find(query_and_not_condition, {"name": 1, "email":1, "status":1, "_id":0}):
    print(user)

3. 요소 연산자 활용 🔗

  # 예시: 'lastModified' 필드가 Date 타입인 문서 조회
query_type_date = {"lastModified": {"$type": "date"}}
print("\nlastModified가 Date 타입인 사용자:")
for user in users_collection.find(query_type_date, {"name": 1, "lastModified":1, "_id":0}):
    print(user)

4. 배열 연산자 활용 🔗

  # 예시: 'tags' 필드에 "patriot"와 "activist"를 모두 포함하는 사용자 조회
query_tags_all = {"tags": {"$all": ["patriot", "activist"]}}
print("\n'patriot'와 'activist' 태그를 모두 가진 사용자:")
for user in users_collection.find(query_tags_all, {"name": 1, "tags":1, "_id":0}):
    print(user)
 
  # 예시: 'scores' 배열 필드에 80점 이상, 90점 미만인 점수가 하나라도 있는 학생 조회
  # (scores 필드가 있다고 가정하고 임시 데이터 삽입 후 테스트)
  # users_collection.update_one({"name": "유관순"}, {"$set": {"scores":}})
  # users_collection.update_one({"name": "안중근"}, {"$set": {"scores":}})
 
query_scores_elemMatch = {
    "scores": {"$elemMatch": {"$gte": 80, "$lt": 90}}
}
print("\n80점대 점수를 가진 학생:")
for student in users_collection.find(query_scores_elemMatch, {"name":1, "scores":1, "_id":0}):
    print(student)

5. 정규 표현식 활용 🔗

문자열 필드에서 패턴 매칭을 수행합니다. Python의 re 모듈을 사용하거나, 직접 MongoDB의 $regex 연산자를 사용할 수 있습니다.
import re
 
  # 예시: 이름(name)이 '김'으로 시작하는 사용자 조회 (Python re 모듈 사용)
regex_kim_start = re.compile(r"^김")
query_name_regex_re = {"name": regex_kim_start}
print("\n이름이 '김'으로 시작하는 사용자 (re 모듈):")
for user in users_collection.find(query_name_regex_re, {"name":1, "_id":0}):
    print(user)
 
  # 예시: 이메일(email) 주소에 'example.com'을 포함하는 사용자 조회 ($regex 사용)
query_email_regex_operator = {"email": {"$regex": "example\\.com", "$options": "i"}} # i: 대소문자 무시
print("\n'example.com' 이메일을 사용하는 사용자 ($regex):")
for user in users_collection.find(query_email_regex_operator, {"name":1, "email":1, "_id":0}):
    print(user)

🚀

Python 코드로 인덱스 관리하기 🔗

쿼리 성능을 높이기 위해 필수적인 인덱스를 PyMongo를 통해 생성, 조회, 삭제할 수 있습니다.

1. 인덱스 생성: create_index, create_indexes 🔗

create_index(keys, **kwargs): 단일 또는 복합 인덱스를 생성합니다.
create_indexes([IndexModel, ...]): 여러 인덱스를 한 번에 생성합니다. IndexModel 객체를 사용합니다.
from pymongo import IndexModel, ASCENDING, DESCENDING
 
try:
    # 단일 필드 인덱스 생성 (email 오름차순)
    users_collection.create_index([("email", ASCENDING)], name="email_index", unique=True)
    print("\n'email_index' (unique) 생성 성공.")
 
    # 복합 인덱스 생성 (occupation 오름차순, age 내림차순)
    occupation_age_index = IndexModel([("occupation", ASCENDING), ("age", DESCENDING)], name="occupation_age_idx")
    users_collection.create_indexes([occupation_age_index])
    print("'occupation_age_idx' 생성 성공.")
 
except pymongo.errors.OperationFailure as e:
    print(f"인덱스 생성 중 오류: {e}") # 이미 존재하거나, 고유 제약 위반 등

2. 인덱스 조회: index_information() 🔗

컬렉션의 모든 인덱스 정보를 딕셔너리 형태로 반환합니다.
print("\n현재 'users_collection'의 인덱스 정보:")
for index_name, index_info in users_collection.index_information().items():
    print(f"- {index_name}: {index_info}")

3. 인덱스 삭제: drop_index(), drop_indexes() 🔗

  # 예시: 'occupation_age_idx' 인덱스 삭제
  # users_collection.drop_index("occupation_age_idx")
  # print("\n'occupation_age_idx' 인덱스 삭제됨.")
 
  # 예시: 모든 사용자 정의 인덱스 삭제 (실행 시 주의!)
  # users_collection.drop_indexes()
  # print("\n모든 사용자 정의 인덱스 삭제됨 (id_ 제외).")

🚀

PyMongo로 집계 프레임워크(Aggregation Framework) 시작하기 🔗

MongoDB의 가장 강력한 기능 중 하나인 집계 프레임워크는 여러 단계(스테이지)로 구성된 파이프라인을 통해 데이터를 변환하고 분석합니다. PyMongo에서는 aggregate() 메서드를 사용합니다.
aggregate(pipeline, **kwargs):

주요 집계 스테이지와 PyMongo 예시 🔗

➡️

1. $match: 문서 필터링 🔗

find()의 쿼리 조건과 동일한 문법을 사용합니다.
  # 예시: 'occupation'이 '대한민국 독립유공자'인 문서만 선택
pipeline_match = [
    {"$match": {"occupation": "대한민국 독립유공자"}}
]
print("\n집계 $match: 독립유공자만 필터링")
for doc in users_collection.aggregate(pipeline_match):
    print(doc)
➡️

2. $project: 문서 구조 변경 (필드 선택, 추가, 이름 변경) 🔗

  # 예시: 이름은 'fullName'으로, 나이만 포함, _id는 제외
pipeline_project = [
    {"$match": {"occupation": "대한민국 독립유공자"}}, # 이전 결과에 이어서
    {"$project": {
        "_id": 0,
        "fullName": "$name", # $name은 name 필드의 값을 의미
        "currentAge": "$age"
    }}
]
print("\n집계 $project: 이름 변경 및 필드 선택")
for doc in users_collection.aggregate(pipeline_project):
    print(doc)
➡️

3. $group: 문서 그룹화 및 집계 연산 🔗

_id 필드에 그룹화 기준을 지정하고, 다양한 집계 연산자($sum, $avg, $min, $max, $push, $addToSet 등)를 사용합니다.
  # 예시: 직업(occupation)별 평균 나이와 인원 수 계산
pipeline_group = [
    {"$group": {
        "_id": "$occupation", # 직업별로 그룹화
        "averageAge": {"$avg": "$age"}, # 평균 나이 계산
        "count": {"$sum": 1} # 그룹 내 문서 개수(인원 수) 계산
    }}
]
print("\n집계 $group: 직업별 평균 나이 및 인원 수")
for group_data in users_collection.aggregate(pipeline_group):
    print(group_data)
➡️

4. $sort: 문서 정렬 🔗

  # 예시: 위 직업별 평균 나이 결과를 인원 수(count) 내림차순으로 정렬
pipeline_sort = [
    {"$group": {
        "_id": "$occupation",
        "averageAge": {"$avg": "$age"},
        "count": {"$sum": 1}
    }},
    {"$sort": {"count": DESCENDING}} # 인원 수 내림차순
]
print("\n집계 $sort: 직업별 인원 수 내림차순 정렬")
for sorted_data in users_collection.aggregate(pipeline_sort):
    print(sorted_data)
이 외에도 $limit, $skip, $unwind, $lookup 등 다양한 강력한 스테이지들이 있습니다.
집계 프레임워크는 매우 방대하므로, 오늘 이 시간에는 기본적인 사용법을 익히는 데 중점을 두었습니다.

🚀

결론 🔗

오늘은 MongoDB 올인 외전 시리즈의 두 번째 시간으로, Python과 PyMongo를 사용하여 MongoDB의 고급 기능을 활용하는 방법을 배웠습니다.
다양한 쿼리 연산자를 통해 원하는 데이터를 더욱 정교하게 찾아내는 방법, 코드 레벨에서 인덱스를 직접 관리하여 성능을 최적화하는 방법, 그리고 집계 프레임워크의 기초를 통해 데이터를 그룹화하고 분석하는 방법을 익혔습니다.

이러한 고급 기능들을 잘 활용하면, Python 애플리케이션에서 MongoDB를 단순한 데이터 저장소를 넘어, 강력한 데이터 처리 및 분석 엔진으로 사용할 수 있게 됩니다.

다음 외전 (하)편에서는 PyMongo를 이용한
트랜잭션 처리
, 대용량 파일을 다루는
GridFS
, 실시간 데이터 변경 감지를 위한
Change Streams
등 더욱 흥미로운 고급 주제들을 다룰 예정입니다.

참고 🔗