PromleeBlog
sitemap
aboutMe

posting thumbnail
추천 학습 데이터셋 구축과 크롤링 파이프라인 설계 방법 - XGBoost 추천시스템 2편
How to Build a Recommendation Dataset and Crawling Pipeline - XGBoost Recommendation System Part 2

📅

들어가기 전에 🔗

안녕하세요.
서로 다른 출발지에서 만날 때, 모두가 납득할 만한 약속 장소를 정하는 일은 생각보다 어렵습니다.
그래서 meet-in-the-middle 이라는 약속 장소 추천 서비스를 만들었습니다.

1편에서는 추천을 랭킹 문제로 정의하고, 사용자 행동으로 라벨을 만드는 방법을 정리했습니다.
이번 2편에서는 그 라벨을 학습시키기 위한 데이터셋을 만들기 위해 Station Place Edge 스키마를 잡고, seed 수집과 정제 규칙, 품질 리포트까지 실습으로 구성해보겠습니다.

실습 코드 및 데이터셋은 GitHub 저장소에서 확인하실 수 있습니다.

추천 데이터셋을 구성하는 핵심 엔티티 🔗

추천 시스템을 운영하려면 결국 세 가지 정보가 필요합니다.

제가 meetinthemiddle에서 추천 학습용 데이터셋을 만들 때 사용한 최소 단위는 Station, Place, Edge 세 가지였습니다.

Station 🔗

Station은 교통의 기준점입니다.
지하철역이나 버스 환승 거점 같은 지점이 됩니다.
이것을 분리해두면 좌표 정규화, 지역 묶음, 이동 비용 계산을 모두 일관되게 처리할 수 있습니다.

예를 들어 특정 카페가 홍대입구역 근처에 있다면, 카페 자체 좌표가 조금 흔들리더라도 Station 기준으로 이동 비용을 잡을 수 있습니다.
이렇게 하면 크롤링 데이터의 좌표 품질이 약간 흔들려도 모델 입력이 안정적입니다.

Place 🔗

Place는 추천 후보입니다.
카페, 음식점, 술집, 스터디카페처럼 사용자가 실제로 선택할 수 있는 목적지입니다.
여기에는 최소한 다음이 필요합니다.

Edge 🔗

Edge는 Station과 Station 사이 이동 비용입니다.
가장 중요한 값은 ETA입니다.
ETA는 이동 시간이며, 모델 관점에서는 공정성을 정량화하는 핵심 피처가 됩니다.

Edge를 분리해두면 다음이 쉬워집니다.

seed 데이터와 crawl 데이터의 역할 분리 🔗

데이터 수집을 시작할 때 가장 흔한 실수는 처음부터 크롤링을 크게 돌리는 것입니다.
데이터가 많아 보이지만, 스키마가 흔들리면 정제 비용이 폭발합니다.
그래서 저는 seed 단계와 crawl 단계를 분리하는 것을 권합니다.

seed 데이터 🔗

seed 데이터는 소량이지만 품질이 높은 기준 데이터입니다.
역 목록, 대표 상권, 대표 카테고리 같은 것들을 작은 규모로 먼저 고정합니다.
이 단계에서 중요한 목표는 두 가지입니다.

crawl 데이터 🔗

crawl 데이터는 seed 스키마를 따라 대규모로 확장하는 단계입니다.
여기서부터는 결측, 중복, 이상치가 계속 들어옵니다.
그래서 crawl 단계에서는 수집 그 자체보다도 데이터 품질 리포트가 핵심입니다.
seed에서 crawl로 확장되는 파이프라인 흐름도
seed에서 crawl로 확장되는 파이프라인 흐름도

실습 - 데이터 정제 파이프라인 구축하기 🔗

이번 글에서는 크롤링을 실제로 붙이기 전에, 데이터셋 구축에서 가장 많이 나오는 정제 문제를 작게 재현해보겠습니다.
실습 저장소의 part2 브랜치를 기준으로 진행합니다.
https://github.com/PROMLEE/xgboost-ml-practice/tree/part2

실습 파일 구조 🔗

xgboost-ml-practice/
├── ml/
│   ├── raw_stations.csv
│   ├── raw_places.csv
│   ├── raw_edges.csv
│   ├── dataset_builder.py
│   └── out/
│       ├── stations.csv
│       ├── places.csv
│       ├── edges.csv
│       └── quality_report.json
└── scripts/
    ├── setup_mac_linux.sh
    ├── setup_windows.ps1
    ├── build_dataset.sh
    └── build_dataset_windows.ps1

raw 데이터에서 일부러 넣어둔 문제들 🔗

raw_places.csv에는 현업에서 자주 보는 문제가 들어있습니다.
ml/raw_places.csv
place_id,place_name,category,station_id,rating_count,avg_rating,price_level
pl_001,Cafe A,cafe,st_003,120,4.5,2
pl_002,BBQ Place,restaurant,st_004,58,4.2,3
pl_003,Study Cafe,study_cafe,st_009,35,4.8,2
pl_004,Bar Z,bar,st_006,12,4.1,3
pl_005,Cafe A,cafe,st_003,120,4.5,2
pl_006,Invalid Rating,restaurant,st_007,10,6.2,2
pl_007,Missing Station,cafe,,5,4.0,1
pl_008,Negative Count,cafe,st_008,-1,4.0,1
pl_009,Weird Category,CAFE,st_002,2,3.5,1
pl_010,No Name,cafe,st_001,3,4.0,1

raw_edges.csv에는 다음 문제가 들어있습니다.
ml/raw_edges.csv
from_station_id,to_station_id,mode,eta_min
st_001,st_002,subway,3
st_002,st_003,subway,9
st_003,st_008,subway,2
st_003,st_009,walk,12
st_004,st_007,subway,4
st_007,st_005,subway,7
st_006,st_001,subway,10
st_001,st_002,subway,3
st_001,st_010,subway,
st_010,st_001,subway,9999
st_003,st_004,subway,-5

정제 규칙 🔗

dataset_builder.py는 아래 규칙으로 정제합니다.


dataset_builder.py 코드 보기
ml/dataset_builder.py
"""Build a clean, normalized dataset from raw seed CSV files.
 
This script is intentionally small and opinionated for students.
It reads raw seed CSV files and outputs normalized, validated CSVs plus a quality report.
 
Input files (default: ml/):
- raw_stations.csv
- raw_places.csv
- raw_edges.csv
 
Output files (default: ml/out/):
- stations.csv
- places.csv
- edges.csv
- quality_report.json
"""
 
from __future__ import annotations
 
import argparse
import csv
import json
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Dict, Iterable, List, Optional, Tuple
 
 
def _read_csv(path: Path) -> List[Dict[str, str]]:
    with path.open("r", encoding="utf-8", newline="") as f:
        reader = csv.DictReader(f)
        return [dict(row) for row in reader]
 
 
def _write_csv(path: Path, rows: Iterable[Dict[str, Any]], fieldnames: List[str]) -> None:
    path.parent.mkdir(parents=True, exist_ok=True)
    with path.open("w", encoding="utf-8", newline="") as f:
        writer = csv.DictWriter(f, fieldnames=fieldnames)
        writer.writeheader()
        for row in rows:
            writer.writerow({k: row.get(k, "") for k in fieldnames})
 
 
def _to_float(v: Any) -> Optional[float]:
    if v is None:
        return None
    s = str(v).strip()
    if s == "":
        return None
    try:
        return float(s)
    except ValueError:
        return None
 
 
def _to_int(v: Any) -> Optional[int]:
    if v is None:
        return None
    s = str(v).strip()
    if s == "":
        return None
    try:
        return int(float(s))
    except ValueError:
        return None
 
 
def _norm_id(v: Any) -> str:
    return str(v or "").strip()
 
 
def _norm_text(v: Any) -> str:
    return " ".join(str(v or "").strip().split())
 
 
def _norm_category(v: Any) -> str:
    s = _norm_text(v).lower()
    s = s.replace(" ", "_")
    return s
 
 
@dataclass
class QualityReport:
    input_rows: Dict[str, int]
    output_rows: Dict[str, int]
    dropped_rows: Dict[str, int]
    duplicate_keys: Dict[str, int]
    invalid_values: Dict[str, int]
    warnings: List[str]
 
    def to_dict(self) -> Dict[str, Any]:
        return {
            "input_rows": self.input_rows,
            "output_rows": self.output_rows,
            "dropped_rows": self.dropped_rows,
            "duplicate_keys": self.duplicate_keys,
            "invalid_values": self.invalid_values,
            "warnings": self.warnings,
        }
 
 
def build_dataset(raw_dir: Path, out_dir: Path) -> QualityReport:
    stations_raw = _read_csv(raw_dir / "raw_stations.csv")
    places_raw = _read_csv(raw_dir / "raw_places.csv")
    edges_raw = _read_csv(raw_dir / "raw_edges.csv")
 
    input_rows = {
        "stations": len(stations_raw),
        "places": len(places_raw),
        "edges": len(edges_raw),
    }
 
    warnings: List[str] = []
    invalid_values = {
        "stations_latlon_missing": 0,
        "places_avg_rating_invalid": 0,
        "places_rating_count_invalid": 0,
        "edges_eta_invalid": 0,
    }
 
    stations: List[Dict[str, Any]] = []
    station_seen: set[str] = set()
    station_dupes = 0
    station_dropped = 0
 
    for r in stations_raw:
        station_id = _norm_id(r.get("station_id"))
        if not station_id:
            station_dropped += 1
            continue
        if station_id in station_seen:
            station_dupes += 1
            continue
        station_seen.add(station_id)
 
        lat = _to_float(r.get("lat"))
        lon = _to_float(r.get("lon"))
        if lat is None or lon is None:
            invalid_values["stations_latlon_missing"] += 1
 
        stations.append(
            {
                "station_id": station_id,
                "station_name": _norm_text(r.get("station_name")),
                "line_name": _norm_text(r.get("line_name")),
                "lat": "" if lat is None else round(lat, 6),
                "lon": "" if lon is None else round(lon, 6),
            }
        )
 
    station_index = {s["station_id"] for s in stations}
 
    places: List[Dict[str, Any]] = []
    place_seen: set[str] = set()
    place_dupes = 0
    place_dropped = 0
 
    # Seed de-dup key: (name, station, category)
    place_key_seen: set[Tuple[str, str, str]] = set()
    place_key_dupes = 0
 
    for r in places_raw:
        place_id = _norm_id(r.get("place_id"))
        place_name = _norm_text(r.get("place_name"))
        station_id = _norm_id(r.get("station_id"))
        category = _norm_category(r.get("category"))
 
        if not place_id or not place_name:
            place_dropped += 1
            continue
        if place_id in place_seen:
            place_dupes += 1
            continue
        place_seen.add(place_id)
 
        if not station_id or station_id not in station_index:
            place_dropped += 1
            continue
 
        key = (place_name.lower(), station_id, category)
        if key in place_key_seen:
            place_key_dupes += 1
            continue
        place_key_seen.add(key)
 
        rating_count = _to_int(r.get("rating_count"))
        if rating_count is None or rating_count < 0:
            invalid_values["places_rating_count_invalid"] += 1
            rating_count = max(0, rating_count or 0)
 
        avg_rating = _to_float(r.get("avg_rating"))
        if avg_rating is None or not (0.0 <= avg_rating <= 5.0):
            invalid_values["places_avg_rating_invalid"] += 1
            avg_rating = None
 
        price_level = _to_int(r.get("price_level"))
        if price_level is None or not (1 <= price_level <= 4):
            price_level = None
 
        places.append(
            {
                "place_id": place_id,
                "place_name": place_name,
                "category": category,
                "station_id": station_id,
                "rating_count": rating_count,
                "avg_rating": "" if avg_rating is None else round(avg_rating, 2),
                "price_level": "" if price_level is None else price_level,
            }
        )
 
    edges: List[Dict[str, Any]] = []
    edge_dropped = 0
    edge_dupes = 0
    edge_seen: set[Tuple[str, str, str]] = set()
 
    for r in edges_raw:
        a = _norm_id(r.get("from_station_id"))
        b = _norm_id(r.get("to_station_id"))
        mode = _norm_category(r.get("mode"))
        eta = _to_int(r.get("eta_min"))
 
        if not a or not b or not mode:
            edge_dropped += 1
            continue
        if a not in station_index or b not in station_index:
            edge_dropped += 1
            continue
        if eta is None or eta <= 0 or eta > 300:
            invalid_values["edges_eta_invalid"] += 1
            edge_dropped += 1
            continue
 
        key = (a, b, mode)
        if key in edge_seen:
            edge_dupes += 1
            continue
        edge_seen.add(key)
 
        edges.append({"from_station_id": a, "to_station_id": b, "mode": mode, "eta_min": eta})
 
    out_dir.mkdir(parents=True, exist_ok=True)
 
    _write_csv(out_dir / "stations.csv", stations, ["station_id", "station_name", "line_name", "lat", "lon"])
    _write_csv(
        out_dir / "places.csv",
        places,
        ["place_id", "place_name", "category", "station_id", "rating_count", "avg_rating", "price_level"],
    )
    _write_csv(out_dir / "edges.csv", edges, ["from_station_id", "to_station_id", "mode", "eta_min"])
 
    report = QualityReport(
        input_rows=input_rows,
        output_rows={"stations": len(stations), "places": len(places), "edges": len(edges)},
        dropped_rows={"stations": station_dropped, "places": place_dropped, "edges": edge_dropped},
        duplicate_keys={
            "stations_station_id": station_dupes,
            "places_place_id": place_dupes,
            "places_name_station_category": place_key_dupes,
            "edges_from_to_mode": edge_dupes,
        },
        invalid_values=invalid_values,
        warnings=warnings,
    )
 
    (out_dir / "quality_report.json").write_text(
        json.dumps(report.to_dict(), ensure_ascii=False, indent=2) + "\n",
        encoding="utf-8",
    )
 
    return report
 
 
def main() -> int:
    parser = argparse.ArgumentParser(description="Build a clean dataset from raw seed CSV files.")
    parser.add_argument("--raw-dir", default="ml", help="Directory containing raw_*.csv")
    parser.add_argument("--out-dir", default="ml/out", help="Output directory")
    args = parser.parse_args()
 
    report = build_dataset(Path(args.raw_dir), Path(args.out_dir))
    print("Dataset build finished.")
    print(json.dumps(report.to_dict(), ensure_ascii=False, indent=2))
    return 0
 
 
if __name__ == "__main__":
    raise SystemExit(main())

실행 방법 🔗

이 실습은 상대경로 기준으로만 안내드립니다.
터미널에서 practice 폴더로 이동한 다음 실행하시면 됩니다.

Mac OS, Linux 🔗

가상환경을 만들고 라이브러리를 설치합니다.
scripts/setup_mac_linux.sh
#!/usr/bin/env bash
set -euo pipefail
 
python3 -m venv .venv-ml
source .venv-ml/bin/activate
 
python -m pip install --upgrade pip
pip install -r requirements-ml.txt
 
echo "Environment ready."
데이터셋 빌드를 실행합니다.
scripts/build_dataset.sh
#!/usr/bin/env bash
set -euo pipefail
 
source .venv-ml/bin/activate
 
python3 ml/dataset_builder.py \
  --raw-dir ml \
  --out-dir ml/out
데이터셋 빌드 결과
데이터셋 빌드 결과
실행이 끝나면 ml/out 폴더에 정제 결과가 생성됩니다.
ml/out/
ml/out/
├── stations.csv
├── places.csv
├── edges.csv
└── quality_report.json

Windows 🔗

PowerShell에서 가상환경을 만들고 활성화합니다.
scripts/setup_windows.ps1
$ErrorActionPreference = "Stop"
 
python -m venv .venv-ml
 
# PowerShell venv activation
.\.venv-ml\Scripts\Activate.ps1
 
python -m pip install --upgrade pip
pip install -r requirements-ml.txt
 
Write-Output "Environment ready."
데이터셋 빌드를 실행합니다.
scripts/build_dataset_windows.ps1
$ErrorActionPreference = "Stop"
 
.\.venv-ml\Scripts\Activate.ps1
 
python ml\dataset_builder.py `
  --raw-dir ml `
  --out-dir ml\out

출력 예시 🔗

quality_report.json은 정제 과정에서 무엇이 빠졌고, 어떤 문제가 얼마나 있었는지를 요약합니다.
ml/out/quality_report.json
{
  "input_rows": {
    "stations": 10,
    "places": 10,
    "edges": 11
  },
  "output_rows": {
    "stations": 10,
    "places": 8,
    "edges": 7
  },
  "dropped_rows": {
    "stations": 0,
    "places": 1,
    "edges": 3
  },
  "duplicate_keys": {
    "stations_station_id": 0,
    "places_place_id": 0,
    "places_name_station_category": 1,
    "edges_from_to_mode": 1
  },
  "invalid_values": {
    "stations_latlon_missing": 1,
    "places_avg_rating_invalid": 1,
    "places_rating_count_invalid": 1,
    "edges_eta_invalid": 3
  },
  "warnings": []
}

결과 해석 🔗

품질 리포트를 보면 places는 10행에서 8행으로 줄어 있습니다.
Station이 비어 있던 행 1개가 제거되고, place_name과 station_id와 category 기준 중복 1개가 제거된 결과입니다.
edges는 결측과 이상치가 제거되면서 11행에서 7행으로 줄어 있습니다.
이 숫자는 크롤링 파이프라인에서 데이터 품질을 감시하는 최소 지표로 그대로 사용할 수 있습니다.

결론 🔗

추천 모델을 잘 만드는 것만큼, 추천 데이터셋을 잘 만드는 것이 중요합니다.
Station Place Edge처럼 최소 단위의 스키마를 먼저 고정해두면, 크롤링 데이터가 늘어도 정제 규칙을 일관되게 적용할 수 있습니다.
그리고 품질 리포트를 함께 남기면, 학습 데이터가 언제 망가졌는지 추적할 수 있습니다.

다음 3편에서는 이렇게 만들어진 Station Place Edge 데이터셋을 바탕으로, ETA 기반 공정성 피처를 어떻게 설계하고 XGBoost가 학습할 수 있는 입력으로 바꾸는지 다뤄보겠습니다.

참고 🔗