전처리 템플릿

데이터 분석
공개

2025년 10월 5일

결측치 처리

  • 결측치가 발생하는 원인과 처리 전략
    • 무작위 결측(Missing Completely at Random, MCAR): 결측치가 발생할 확률이 다른 모든 변수와 무관
      • 예: 설문지 작성 중 무작위로 일부 페이지가 누락된 경우
      • 이 경우, 결측치를 제거하거나 단순 대체해도 편향이 발생하지 않음
    • 조건부 무작위 결측(Missing at Random, MAR): 결측치가 발생할 확률이 관측된 데이터에만 의존
      • 예: 고소득자일수록 소득 공개를 꺼려하는 경우 (교육수준이 높을수록 소득 결측 확률이 높음)
      • 실무에서 가장 일반적인 가정: 대부분의 실제 데이터에서 결측 패턴은 관측된 변수들로 어느 정도 설명 가능
      • 관측된 데이터를 활용한 예측 기반 대치법이 효과적
    • 비무작위 결측(Missing Not at Random, MNAR): 결측치가 발생할 확률이 결측된 값 자체와 관련
      • 예: 극도로 낮은 소득자가 소득 공개를 꺼리는 경우
      • 통계적 방법만으로는 해결이 어려우며, 도메인 지식이나 외부 정보가 필요
      • 실무에서는 MAR 가정 하에 처리 후, 민감도 분석을 통해 결측 메커니즘과 처리 방법의 적절성을 확인
        • 민감도 분석: 다양한 결측치 처리 방법을 적용하여 결과를 비교
          • 단순 방법(평균/중앙값 대치) vs 고급 방법(KNN, MICE)
          • 단순 방법과 고급 방법의 결과가 비슷하면 → MCAR 가능성 높음, 단순 방법으로도 충분
          • 고급 방법이 더 나은 성능을 보이면 → MAR 가능성 높음, 고급 방법 선택
          • 어떤 방법을 써도 결과가 불안정하면 → MNAR 가능성, 도메인 지식과 추가 정보 필요
  • 결측치 처리 방법
    • 제거: 결측치가 적거나 무작위 결측일 때 사용
    • 대치(대체):
      • 일반적인 방법
        • 시계열 데이터 o: 이전 값, 이후 값, 선형 보간법
        • 시계열 데이터 x: 평균, 중앙값, 최빈값
      • 고급 대치법(과적합 발생 가능성 유의)
        • KNN 대치: 유사한 관측치의 값을 사용하여 결측치를 대체. 결측치가 없는 데이터로 예측
        • 다변량 대치: 결측치를 다른 변수들의 값으로 예측하여 대체.

다변량 대치법

from lightgbm import LGBMRegressor, LGBMClassifier

def multi_impute(df, categorical, max_iter=40):
    df_imp = df.copy()
    num_cols = [col for col in df.columns if col not in categorical]

    # 결측치가 많은 column 우선 처리
    null_counts = df_imp.isnull().sum()
    null_cols = null_counts[null_counts > 0].sort_values().index.tolist()

    # 초기값 대체
    for col in categorical:
        df_imp[col] = df_imp[col].astype('category')
        mode = df_imp[col].mode(dropna=True)
        df_imp[col].fillna(mode[0], inplace=True)

    for col in num_cols:
        mean = df_imp[col].mean()
        df_imp[col].fillna(mean, inplace=True)

    # 반복 임퓨팅
    for _ in range(max_iter):
        prev = df_imp.copy()

        for col in null_cols:
            idx_missing = df[col].isnull()
            idx_obs = ~idx_missing
            predictors = [c for c in df.columns if c != col]

            X_obs = df_imp.loc[idx_obs, predictors]
            y_obs = df_imp.loc[idx_obs, col]
            X_mis = df_imp.loc[idx_missing, predictors]

            # column type에 따라 다른 모델 선택
            # LightGBM으로 randomforest를 사용한 이유: sklearn의 RandomForest는 categorical 변수를 직접 처리하지 못함
            if col in categorical:
                model = LGBMClassifier(
                    boosting_type="rf", n_estimators=5,
                    bagging_fraction=0.8, bagging_freq=1
                )
            else:
                model = LGBMRegressor(
                    boosting_type="rf", n_estimators=5,
                    bagging_fraction=0.8, bagging_freq=1
                )
            model.fit(X_obs, y_obs)
            y_pred = model.predict(X_mis)
            df_imp.loc[idx_missing, col] = y_pred
        if df_imp.equals(prev):
            break
    return df_imp
  • 현재 결측치 처리 방법 중 가장 성능이 좋은걸로 알려져 있는 missing forest를 모방한 방법.
    • missforest 라이브러리는 adp 환경에서 설치 불가
  • 다중 대치를 안 하고 있지만, sklearn 공식 문서에 따르면 대부분 single 대치로도 충분하다고 한다.
  • 참고로 이 방법은 missforest의 정확한 구현은 아니기 때문에, 시험에서는 다변량 대치법을 사용했다고만 쓰자.
  • 만약 train set에서만 fit을 시키고 싶다면, 이 방법은 그냥 안 쓰는걸 추천.
    • 그렇게 하려면 class 형태로 바꿔야 하는데, too much인가 싶은 느낌이 슬슬 나기 시작.
class MultiImputer:
    def __init__(self, categorical=[], max_iter=40, n_estimators=5):
        self.categorical = categorical
        self.max_iter = max_iter
        self.n_estimators = n_estimators
        
        self.models_ = {}
        self.initial_fill_ = {}
        self.null_cols_ = []
        self.num_cols_ = []
        
    def fit(self, X, y=None):
        df = X.copy()
        self.num_cols_ = [col for col in df.columns if col not in self.categorical]
        
        null_counts = df.isnull().sum()
        self.null_cols_ = null_counts[null_counts > 0].sort_values().index.tolist()
        
        # 초기값 계산 및 저장 (Train의 통계량만 사용!)
        for col in self.categorical:
            df[col] = df[col].astype('category')
            mode = df[col].mode(dropna=True)
            self.initial_fill_[col] = mode[0]
            df[col].fillna(self.initial_fill_[col], inplace=True)
        
        for col in self.num_cols_:
            mean = df[col].mean()
            self.initial_fill_[col] = mean
            df[col].fillna(mean, inplace=True)
        
        for _ in range(self.max_iter):
            prev = df.copy()
            
            for col in self.null_cols_:
                idx_missing = X[col].isnull()
                idx_obs = ~idx_missing
                    
                predictors = [c for c in df.columns if c != col]
                X_obs = df.loc[idx_obs, predictors]
                y_obs = df.loc[idx_obs, col]
                X_mis = df.loc[idx_missing, predictors]
                
                # 모델 선택 및 학습
                if col in self.categorical:
                    model = LGBMClassifier(
                        boosting_type="rf", 
                        n_estimators=self.n_estimators,
                        bagging_fraction=0.8, 
                        bagging_freq=1,
                        verbose=-1
                    )
                else:
                    model = LGBMRegressor(
                        boosting_type="rf", 
                        n_estimators=self.n_estimators,
                        bagging_fraction=0.8, 
                        bagging_freq=1,
                        verbose=-1
                    )
                
                model.fit(X_obs, y_obs)
                
                y_pred = model.predict(X_mis)
                df.loc[idx_missing, col] = y_pred
                
                self.models_[col] = model
            
            if df.equals(prev):
                break
        
        return self
    
    def transform(self, X):
        df_imp = X.copy()
        
        for col in self.categorical:
            df_imp[col] = df_imp[col].astype('category')
            df_imp[col].fillna(self.initial_fill_[col], inplace=True)
        
        for col in self.num_cols_:
            df_imp[col].fillna(self.initial_fill_[col], inplace=True)
        
        for _ in range(self.max_iter):
            prev = df_imp.copy()
            
            for col in self.null_cols_:                   
                idx_missing = X[col].isnull()
                
                predictors = [c for c in df_imp.columns if c != col]
                X_mis = df_imp.loc[idx_missing, predictors]
                
                model = self.models_[col]
                y_pred = model.predict(X_mis)
                df_imp.loc[idx_missing, col] = y_pred
            
            if df_imp.equals(prev):
                break
        
        return df_imp
    
    def fit_transform(self, X, y=None):
        self.fit(X, y)
        return self.transform(X)
  • 혹시몰라서 class 형태로 만든 다변량 대치법
  • 이걸 언제 다 적고 있을까

이상치 처리

  • 이상치 탐지는 EDA 참고
  • 처리는 알아서 잘 하자.

불균형 처리

  • 잘 알려져 있는 방법 대충 잘 선택해서 사용.
  • 딱히 SOTA(가장 좋은 방법)가 있지 않음.

성능 비교

  • 각각의 처리법에 대해서 어떤게 제일 좋은지 비교
from lightgbm import LGBMRegressor
from sklearn.impute import KNNImputer
from sklearn.model_selection import cross_val_score

df[cat_cols] = df[cat_cols].astype('category')

df1 = df.dropna()
df2 = df[num_cols].fillna(df[num_cols].mean())
for col in cat_cols:
    df2[col] = target[col].fillna(target[col].mode()[0])
df3 = multi_impute(df, categorical=cat_cols)

candis = [
    ('Nothing', df.drop('y', axis=1), df['dead']),
    ('Just Delete', df1.drop('dead', axis=1), df1['dead']),
    ('Simple Impute', df2.drop('dead', axis=1), df2['dead']),
    ('MultiImputer', df3.drop('dead', axis=1), df3['dead'])
]

result = pd.DataFrame()
for name, X, y in candis:
    rf = LGBMRegressor(boosting_type="rf", n_estimators=100, bagging_fraction=0.8, bagging_freq=1)
    result[name] = cross_val_score(rf, X, y, scoring='neg_mean_squared_error') # classifier인 경우 'accuracy' 등등

fig, ax = plt.subplots(figsize=(13, 6))

means = -result.mean() # regressor인 경우, classifier인 경우 양수
errors = result.std()

means.plot.barh(xerr=errors, ax=ax)
ax.set_yticks(np.arange(means.shape[0]))
ax.set_yticklabels(means.index)
plt.show()

Feature Selection

Filter Method

  1. basic methods
  • 하나의 값만 가지는 변수 혹은 분산이 너무 낮은 변수는 제거
from sklearn.feature_selection import VarianceThreshold

sel = VarianceThreshold(threshold=0.01)

selected_cols = df.columns[sel.get_support()]
df_selected = df[selected_cols]
  1. Univariate selection methods
from sklearn.feature_selection import SelectKBest, SelectPercentile, chi2

X_new = SelectKBest(chi2, k=2).fit_transform(X, y) # 최상위 2개

X_new = SelectPercentile(chi2, percentile=10).fit_transform(X, y) # 상위 10%
  • 카이제곱 검정량이 가장 높은 변수들만 선택.
  • 연속형 변수에 대해서는 KBinsDiscretizer 작업 필요.
from sklearn.feature_selection import SelectKBest, SelectPercentile, mutual_info_classif, mutual_info_regression

X_new = SelectKBest(mutual_info_classif, k=2).fit_transform(X, y) # 최상위 2개
X_new = SelectPercentile(mutual_info_regression, percentile=10).fit_transform(X, y) # 상위 10%
  • EDA 파트 mutual info 참고
  • 추가: 상관계수, ANOVA F-value 등등 사용 가능

Wrapper Method

  • forward, backward, 등등

Embedded Method

  • L1, L2, Elasticnet 등등
맨 위로