728x90

공간분석 프로젝트에서 사용된 주요 전처리 함수 정리하여 포스팅하였습니다.


import pandas as pd
import numpy as np
import os
from datetime import datetime
from collections import OrderedDict
from shapely.geometry import Point
import geopandas as gpd
import requests
from tqdm import tqdm

## 구급일지 data preprocessing
# 나이대별 개월수 정의
age_dic = {
    '0세' : list(range(12)),
    '1세' : list(range(12,24)),
    '2세' : list(range(24,36)),
    '3세' : list(range(36,48)),
    '4세' : list(range(48,60)),
    '5세' : list(range(60,71)),
    '6세' : list(range(72,83))
}

# 연령 → 개월수 변환
def pre_age_calc(value):

    if value in age_dic['0세']:
        age_ = 0
    elif value in age_dic['1세']:
        age_ = 1
    elif value in age_dic['2세']:
        age_ = 2
    elif value in age_dic['3세']:
        age_ = 3
    elif value in age_dic['4세']:
        age_ = 4
    elif value in age_dic['5세']:
        age_ = 5
    elif value in age_dic['6세']:
        age_ = 6
    else:
        age_ = 999
    return age_

# 주민등록번호에서 환자 개월수 추출
def pre_month_calc(row):
    jumin_number = row['환자주민등록번호']
    specific_date = row['출동년월일변환']
   
    # 개월수 계산
    try:
        birth_year = int(jumin_number[:2])
        birth_month = int(jumin_number[2:4])
        birth_day = int(jumin_number[4:6])

        specific_year = int(specific_date[:2])
        specific_month = int(specific_date[2:4])
        specific_day = int(specific_date[4:6])

        age_in_months = (specific_year - birth_year) * 12 + (specific_month - birth_month)

        # 생일이 지나지 않은 경우는 1 차감
        if specific_month < birth_month & (specific_month == birth_month and specific_day < birth_day):
            age_in_months -= 1

    except:
        age_in_months = np.nan

    # 환자연령계산
    age = pre_age_calc(age_in_months)
   
    return pd.Series([age_in_months,age])

# 날짜형식변환
def pre_date_trans(original_date):
   
    try:
        # 날짜 문자열을 datetime 객체로 파싱
        date_obj = datetime.strptime(original_date, "%Y-%m-%d")

        # 새로운 형식으로 날짜 문자열 생성
        new_format_date = date_obj.strftime("%y%m%d")

        # 결과 출력
        return new_format_date
    except:
        return 999999
   
# 중복되는 부분을 제거하고 하나로 통합
def remove_duplicate_part(address):
    parts = address.split()
    unique_parts = []
    for part in parts:
        if part not in unique_parts:
            unique_parts.append(part)
    return ' '.join(unique_parts)

# 심정지 여부 컬럼 생성
def pre_coma_check(df):
    df['심정지여부'] = '미포함'  # 초기값을 "미포함"으로 설정
    for index, row in df[['환자증상1','환자증상2','환자증상3','환자증상4','환자증상5']].iterrows():
        for col in ['환자증상1','환자증상2','환자증상3','환자증상4','환자증상5']:
            value = row[col]
            if isinstance(value, str) and '심정지' in value:
                df.at[index, '심정지여부'] = '포함'
                break
    return df

# 신고to현장 소요시간 계산
def pre_report_to_arrival(df):
    df['신고년월시각'] = pd.to_datetime(df['신고년월일'].astype('str') + ' ' + df['신고시각'].astype('str'))
    df.loc[df[df['현장도착년일시'] != ':'].index,'현장도착년일시'] = df.loc[df[df['현장도착년일시'] != ':'].index,'현장도착년일시'].str.replace(':', ' ', 1) #현장도착시각 변환
    df['현장도착년일시_변환'] = pd.to_datetime(df[df['현장도착년일시'] != ':']['현장도착년일시'])
    df['신고현장_소요시간'] = df['현장도착년일시_변환']  - df['신고년월시각']
    return df

# 현장to병원 소요시간 계산
def pre_spot_to_hospital(df):
    df['현장출발년월시각'] = pd.to_datetime(df[df['현장출발년월일'].notnull()]['현장출발년월일'].astype('str') + ' ' + df[df['현장출발시각'].notnull()]['현장출발시각'].astype('str'))
    df['병원도착년원일시각'] = df[df['병원도착년월일'].notnull()]['병원도착년월일'].astype('str') + ' ' + df[df['도착시간1_변환'].notnull()]['도착시간1_변환'].astype('str')
    df['병원도착년원일시각'] = pd.to_datetime(df[df['병원도착년월일'].notnull()]['병원도착년월일'].astype('str') + ' ' + df[df['병원도착시각'].notnull()]['병원도착시각'].astype('str'))
    df['현장병원_소요시간'] = df['도착시간1_변환'] - df['현장출발년월시각']
    return df

# 이송거부에 따른 재이송
def pre_arrival_1(row):
   
    if pd.isna(row[0]):
        return pd.Series([pd.NaT, "미재이송"])
   
    # row[0](도착시간1이 존재 하는 경우)
    else:
        try:
            date_string = row[0].astype(str).replace('.',"")
        except:
            date_string = str(row[0]).replace('.',"")
           
        # 문자열을 datetime 객체로 파싱
        if len(date_string) == 15:
            date_string = date_string[:-1] # 맨끝자리제거

        try:
            original_format = "%Y%m%d%H%M%S"  
            parsed_datetime = datetime.strptime(date_string, original_format)
        except:
            original_format = "%Y%m%d%H%M%S"  
            parsed_datetime = datetime.strptime(date_string, original_format)

        # 새로운 형식으로 날짜 문자열 생성
        new_format = "%Y-%m-%d %H:%M"  # 원하는 출력 문자열 형식
        formatted_date_string =pd.to_datetime(parsed_datetime.strftime(new_format))
       
        if pd.isna(row[1]):
            return pd.Series([formatted_date_string, '미재이송'])

        else:
            return pd.Series([formatted_date_string, '재이송'])

#좌표계 변환
def pre_crs(df, trans_after_crs='EPSG:5179', trans_before_crs='EPSG:4326'):
    df_temp = df.copy()
    # df의 초기 설정 좌표가 없는경우
    if df.crs == None:
        df_temp.crs = trans_before_crs # 초기 crs
        df_temp = df_temp.to_crs(trans_after_crs) # 변환 crs
        is_inf = df_temp['geometry'].apply(lambda geom: 'Infinity' in str(geom))      

        if len(df_temp[is_inf]) >= 1:  # 모두 inf 값인 경우
            df.crs = {'init': 'epsg:{}'.format(int(trans_after_crs.split(':')[1]))}
        else:
            df.crs = trans_before_crs
            df = df.to_crs(trans_after_crs)

            return df#.to_crs(trans_after_crs)#.reset_index(drop=True, inplace=True)
       
    # 좌표계가 기존에 지정이 된 경우      
    else:
        df = df.to_crs(trans_after_crs)
    return df

def pre_cc(file_nm=None, path='D:\더아이엠씨\소방청\데이터구축\행안부_도로명DB전국수치',encoding='utf8'):
   
    # 빈 데이터프레임 생성
    df = pd.DataFrame()
   
    # path 경로 안에 있는 파일 리스트
    path_info =  os.listdir(path)
   
    # 파일 이름이 각각 같은 경우
    if file_nm:
        for city in tqdm(path_info):
            # city는 폴더가 서울,경상북도 등 17개 광역이 폴더로 존재 했을 경우이며 그 안에 파일 명이 모두 동일하다
            df_ = gpd.read_file(f'{path}/{city}/{file_nm}.shp', encoding=encoding)
            df_ = pre_crs(df_)

            # 병합
            df = pd.concat([df, df_], ignore_index=True)
   
    # 파일 이름이 각각 다른 경우
    else:
        file_nms = [file for file in path_info if file.endswith('.shp')]
        for file in tqdm(file_nms):
            df_ = gpd.read_file(f'{path}/{file}', encoding=encoding)
            df_ = pre_crs(df_)
           
            # 병합
            df = pd.concat([df, df_], ignore_index=True)
           
    return df

## spatial data preprocessing
# 위경도를 geometry 타입으로 변경
def pre_trans_geometry(df,var1='Longitude',var2='Latitude'):
    from shapely.geometry import Point
    import geopandas as gpd
   
    # Point 객체를 생성하여 'geometry' 열에 저장
    geometry = [Point(xy) for xy in zip(df[var1], df[var2])]

    # GeoDataFrame 생성
    geo_df = gpd.GeoDataFrame(df, geometry=geometry)

    return geo_df

# pandasframe → geopandas 변환
def pre_trans_geodataframe(df, col: str):
    from shapely import wkt
    df_temp = df.rename(columns={col:'geometry'})
    df_temp['geometry'] = df_temp['geometry'].apply(lambda x: wkt.loads(str(x)) if pd.notnull(x) else x)
    df = gpd.GeoDataFrame(df_temp, geometry='geometry')
    df = pre_crs(df)
    return df

# 중복되는 부분을 제거하고 하나로 통합
def remove_duplicate_part(address):
    parts = address.split()
    unique_parts = []
    for part in parts:
        if part not in unique_parts:
            unique_parts.append(part)
    return ' '.join(unique_parts)

# 공간 내 카운트 함수
def pre_count(df1, df2, groupby_on = 'gid', nm = 'cnt', predicate='contains'):
   
    # 그리드에 포함된 출입구 개수를 알기 위한 sjoin
    temp = gpd.sjoin(df1,df2, how = 'inner', predicate = predicate)
   
    # 'gid' 열을 기준으로 그룹화하고 각 그룹의 행 수 세기
    grouped = temp.groupby(groupby_on).size().reset_index(name=nm)
   
    return pd.merge(df1,grouped, on ='gid',  how = 'left')

## geocoding
# vworld geocoding api
def geocoder(address):
    key = 'API KEY'
    params = {
        "service": "address",
        "request": "getcoord",
        "crs": "epsg:4326",
        "address": str(address),
        "format": "json",
        "type": "PARCEL",
        "key": f"{key}"
    }
    response = requests.get(apiurl, params=params)
    if response.status_code == 200:
        data = response.json()
        try:
            x = data['response']['result']['point']['x']
            y = data['response']['result']['point']['y']
        except:
            x = 0
            y = 0
    return pd.Series([y,x])

# naver geocode api
def sobang_naver_geo(data, client_id, client_pw) :
    import pandas as pd                                 # 데이터 프레임을 다루는 pandas 라이브러리
    from tqdm import tqdm                               # 진행 상황을 시각적으로 보여주는 tqdm 라이브러리
    import numpy as np                                  # 수학적 연산에 사용되는 numpy 라이브러리
    import urllib                                       # url 정보를 다루는 urllib 라이브러리
    import json                                         # json 파일을 다루는 json 라이브러리

    import warnings                                     # 파이썬 경고 관련 warnings 라이브러리
    warnings.filterwarnings('ignore')                   # 파이썬 경고 무시

    # 기본 url

    # 데이터 저장용 리스트
    geo_coordi = []
    geo_distance = []

    # 사업장주소 컬럼 위도/경도 변환
    for add in tqdm(data['주소1']):
        add_urlenc = urllib.parse.quote(add)

        # 주소 포함 url
        url = api_url + add_urlenc

        # url 요청용 주소 변환
        request = urllib.request.Request(url)
        # url 요청용 주소에 id 값 추가
        request.add_header("X-NCP-APIGW-API-KEY-ID", client_id)
        # url 요청용 주소에 Password 값 추가
        request.add_header("X-NCP-APIGW-API-KEY", client_pw)

        # url 요청
        try :
            response = urllib.request.urlopen(request)
        # 주소가 검색이 안되면 위도/경도 값 0으로 설정
        except urllib.error.HTTPError as e:
            latitude = 0
            longitude = 0
            geo_state = '확인불가'

        else :
            # 주소가 정상적으로 반환되면
            rescode = response.getcode()
            if rescode == 200 :
                # 반환된 정보 저장
                response_body = response.read().decode('utf-8')
                response_body = json.loads(response_body)
                if 'addresses' in response_body:
                    try:
                        latitude = response_body['addresses'][0]['y']
                        longitude = response_body['addresses'][0]['x']
                        geo_state = response_body['addresses'][0]['distance']
                       
                    except:
                        latitude = 0
                        longitude = 0
                        geo_state = '확인불가'
                # 반환된 정보에 내용이 없을 때 위도/경도 값 0으로 설정
                else :
                    latitude = 0
                    longitude = 0
                    geo_state = '확인불가'

        # 반환된 값 저장    
        geo_coordi.append([latitude, longitude])
        geo_distance.append(geo_state)
   
    # 위도/경도 데이터 추가
    df = pd.concat([data, pd.DataFrame(geo_coordi, columns = ['위도', '경도'])], axis = 1)

    # 좌표 거리 데이터 추가
    # print('geo_distance',geo_distance)
    df['좌표 변환 거리'] = geo_distance
    # # 데이터 저장
    # df.to_csv('./청주시 양봉농가 등록현황_위도 경도 추가.csv', index = False, encoding = 'cp949')
   
    return df

# kakao geocode api
def kakao_search(keyword2):
    from PyKakao import Local
    api = Local(service_key = "API KEY")

    df1 = api.search_address(keyword2, dataframe=True)
    if df1.empty:
        df2 =  api.search_keyword(keyword2, dataframe=True)
       
        if df2.empty:
            return None
        else:
            return [df2.loc[0]['y'],df2.loc[0]['x'], df2['address_name'][0]]
    else:
        return [df1.loc[0]['y'],df1.loc[0]['x'], df1['address_name'][0]]

# 다양한 주소 형태의 api 적용
def kakao_grid_search(keword1):
    words = keword1.split(" ")
    unique_words = list(OrderedDict.fromkeys(words))
   
    # 시,구,동, 번지
    keword_1 = " ".join(unique_words[:4])
    result_1 = kakao_search(keword_1)
    if result_1 is None:
        # 시,구,동, 건물
        try:
            keword_2 = " ".join(unique_words[:3] + [unique_words[4]])
        except:
            keword_2 = " ".join(unique_words[:3])
        result_2 = kakao_search(keword_2)
       
        if result_2 is None:
            # 시,구,동
            keword_3 = " ".join(unique_words[:3])
            result_3 = kakao_search(keword_3)
            if result_3 is None:
                return 0,0,np.nan
            else:
                return result_3[0], result_3[1], result_3[2]
        else:
            return result_2[0], result_2[1], result_2[2]
    else:
        return result_1[0], result_1[1], result_1[2]

## text data preprocessing
def process_commas(text):
    words = text.split(',')
    cleaned_words = [word.strip() for word in words if word.strip()]
    result = ','.join(cleaned_words)
    return result.replace(',,','')

## 시각화
def eda_viz_region_month(df,region='강원'):
    import seaborn as sns
    import matplotlib.pyplot as plt
    import matplotlib.font_manager as fm
    import matplotlib.patheffects as path_effects
    font_path = 'C:\Windows\Fonts\GULIM.ttc'  # 여기에 ttf 파일의 정확한 경로를 지정하세요.
    font_prop = fm.FontProperties(fname=font_path)
    plt.rcParams['font.family'] = font_prop.get_name()



    re_month = df.groupby(['지역','개월수']).count().reset_index()[['지역','개월수','환자연령_주민','구급보고서번호']]
    # 바 차트 그리기
    temp_gang = re_month[re_month['지역']==region][['개월수','구급보고서번호']]
    plt.figure(figsize=(30,15))
    plt.bar(temp_gang['개월수'], temp_gang['구급보고서번호'])

    # 각 막대에 값 레이블 추가
    bars = plt.bar(temp_gang['개월수'], temp_gang['구급보고서번호'])
    for bar in bars:
        yval = bar.get_height()
        plt.text(bar.get_x() + bar.get_width()/2, yval, '%d' % int(yval),
                va='bottom', ha='center', fontsize=12, #rotation=90,
                path_effects=[path_effects.withStroke(linewidth=3, foreground='white')])

    # 레이블 추가
    plt.xlabel('개월수',size=20)
    plt.ylabel('구급보고서번호',size=20)
    plt.title(f'전체 {region} 개월수 별 사고건수',size=20)

    # 그래프의 x축 눈금을 설정합니다.
    plt.xticks(temp_gang['개월수'].to_list(), size=15)

    # 바 차트 표시
    plt.show()
728x90

'데이터분석' 카테고리의 다른 글

사용자 사전 추가된 Mecab 형태소 분석 in Corab  (0) 2023.12.04

+ Recent posts