728x90

공간분석 프로젝트에서 사용된 주요 전처리 함수 정리하여 포스팅하였습니다.


import pandas as pd
import numpy as np
import os
from datetime import datetime
from collections import OrderedDict
from shapely.geometry import Point
import geopandas as gpd
import requests
from tqdm import tqdm

## 구급일지 data preprocessing
# 나이대별 개월수 정의
age_dic = {
    '0세' : list(range(12)),
    '1세' : list(range(12,24)),
    '2세' : list(range(24,36)),
    '3세' : list(range(36,48)),
    '4세' : list(range(48,60)),
    '5세' : list(range(60,71)),
    '6세' : list(range(72,83))
}

# 연령 → 개월수 변환
def pre_age_calc(value):

    if value in age_dic['0세']:
        age_ = 0
    elif value in age_dic['1세']:
        age_ = 1
    elif value in age_dic['2세']:
        age_ = 2
    elif value in age_dic['3세']:
        age_ = 3
    elif value in age_dic['4세']:
        age_ = 4
    elif value in age_dic['5세']:
        age_ = 5
    elif value in age_dic['6세']:
        age_ = 6
    else:
        age_ = 999
    return age_

# 주민등록번호에서 환자 개월수 추출
def pre_month_calc(row):
    jumin_number = row['환자주민등록번호']
    specific_date = row['출동년월일변환']
   
    # 개월수 계산
    try:
        birth_year = int(jumin_number[:2])
        birth_month = int(jumin_number[2:4])
        birth_day = int(jumin_number[4:6])

        specific_year = int(specific_date[:2])
        specific_month = int(specific_date[2:4])
        specific_day = int(specific_date[4:6])

        age_in_months = (specific_year - birth_year) * 12 + (specific_month - birth_month)

        # 생일이 지나지 않은 경우는 1 차감
        if specific_month < birth_month & (specific_month == birth_month and specific_day < birth_day):
            age_in_months -= 1

    except:
        age_in_months = np.nan

    # 환자연령계산
    age = pre_age_calc(age_in_months)
   
    return pd.Series([age_in_months,age])

# 날짜형식변환
def pre_date_trans(original_date):
   
    try:
        # 날짜 문자열을 datetime 객체로 파싱
        date_obj = datetime.strptime(original_date, "%Y-%m-%d")

        # 새로운 형식으로 날짜 문자열 생성
        new_format_date = date_obj.strftime("%y%m%d")

        # 결과 출력
        return new_format_date
    except:
        return 999999
   
# 중복되는 부분을 제거하고 하나로 통합
def remove_duplicate_part(address):
    parts = address.split()
    unique_parts = []
    for part in parts:
        if part not in unique_parts:
            unique_parts.append(part)
    return ' '.join(unique_parts)

# 심정지 여부 컬럼 생성
def pre_coma_check(df):
    df['심정지여부'] = '미포함'  # 초기값을 "미포함"으로 설정
    for index, row in df[['환자증상1','환자증상2','환자증상3','환자증상4','환자증상5']].iterrows():
        for col in ['환자증상1','환자증상2','환자증상3','환자증상4','환자증상5']:
            value = row[col]
            if isinstance(value, str) and '심정지' in value:
                df.at[index, '심정지여부'] = '포함'
                break
    return df

# 신고to현장 소요시간 계산
def pre_report_to_arrival(df):
    df['신고년월시각'] = pd.to_datetime(df['신고년월일'].astype('str') + ' ' + df['신고시각'].astype('str'))
    df.loc[df[df['현장도착년일시'] != ':'].index,'현장도착년일시'] = df.loc[df[df['현장도착년일시'] != ':'].index,'현장도착년일시'].str.replace(':', ' ', 1) #현장도착시각 변환
    df['현장도착년일시_변환'] = pd.to_datetime(df[df['현장도착년일시'] != ':']['현장도착년일시'])
    df['신고현장_소요시간'] = df['현장도착년일시_변환']  - df['신고년월시각']
    return df

# 현장to병원 소요시간 계산
def pre_spot_to_hospital(df):
    df['현장출발년월시각'] = pd.to_datetime(df[df['현장출발년월일'].notnull()]['현장출발년월일'].astype('str') + ' ' + df[df['현장출발시각'].notnull()]['현장출발시각'].astype('str'))
    df['병원도착년원일시각'] = df[df['병원도착년월일'].notnull()]['병원도착년월일'].astype('str') + ' ' + df[df['도착시간1_변환'].notnull()]['도착시간1_변환'].astype('str')
    df['병원도착년원일시각'] = pd.to_datetime(df[df['병원도착년월일'].notnull()]['병원도착년월일'].astype('str') + ' ' + df[df['병원도착시각'].notnull()]['병원도착시각'].astype('str'))
    df['현장병원_소요시간'] = df['도착시간1_변환'] - df['현장출발년월시각']
    return df

# 이송거부에 따른 재이송
def pre_arrival_1(row):
   
    if pd.isna(row[0]):
        return pd.Series([pd.NaT, "미재이송"])
   
    # row[0](도착시간1이 존재 하는 경우)
    else:
        try:
            date_string = row[0].astype(str).replace('.',"")
        except:
            date_string = str(row[0]).replace('.',"")
           
        # 문자열을 datetime 객체로 파싱
        if len(date_string) == 15:
            date_string = date_string[:-1] # 맨끝자리제거

        try:
            original_format = "%Y%m%d%H%M%S"  
            parsed_datetime = datetime.strptime(date_string, original_format)
        except:
            original_format = "%Y%m%d%H%M%S"  
            parsed_datetime = datetime.strptime(date_string, original_format)

        # 새로운 형식으로 날짜 문자열 생성
        new_format = "%Y-%m-%d %H:%M"  # 원하는 출력 문자열 형식
        formatted_date_string =pd.to_datetime(parsed_datetime.strftime(new_format))
       
        if pd.isna(row[1]):
            return pd.Series([formatted_date_string, '미재이송'])

        else:
            return pd.Series([formatted_date_string, '재이송'])

#좌표계 변환
def pre_crs(df, trans_after_crs='EPSG:5179', trans_before_crs='EPSG:4326'):
    df_temp = df.copy()
    # df의 초기 설정 좌표가 없는경우
    if df.crs == None:
        df_temp.crs = trans_before_crs # 초기 crs
        df_temp = df_temp.to_crs(trans_after_crs) # 변환 crs
        is_inf = df_temp['geometry'].apply(lambda geom: 'Infinity' in str(geom))      

        if len(df_temp[is_inf]) >= 1:  # 모두 inf 값인 경우
            df.crs = {'init': 'epsg:{}'.format(int(trans_after_crs.split(':')[1]))}
        else:
            df.crs = trans_before_crs
            df = df.to_crs(trans_after_crs)

            return df#.to_crs(trans_after_crs)#.reset_index(drop=True, inplace=True)
       
    # 좌표계가 기존에 지정이 된 경우      
    else:
        df = df.to_crs(trans_after_crs)
    return df

def pre_cc(file_nm=None, path='D:\더아이엠씨\소방청\데이터구축\행안부_도로명DB전국수치',encoding='utf8'):
   
    # 빈 데이터프레임 생성
    df = pd.DataFrame()
   
    # path 경로 안에 있는 파일 리스트
    path_info =  os.listdir(path)
   
    # 파일 이름이 각각 같은 경우
    if file_nm:
        for city in tqdm(path_info):
            # city는 폴더가 서울,경상북도 등 17개 광역이 폴더로 존재 했을 경우이며 그 안에 파일 명이 모두 동일하다
            df_ = gpd.read_file(f'{path}/{city}/{file_nm}.shp', encoding=encoding)
            df_ = pre_crs(df_)

            # 병합
            df = pd.concat([df, df_], ignore_index=True)
   
    # 파일 이름이 각각 다른 경우
    else:
        file_nms = [file for file in path_info if file.endswith('.shp')]
        for file in tqdm(file_nms):
            df_ = gpd.read_file(f'{path}/{file}', encoding=encoding)
            df_ = pre_crs(df_)
           
            # 병합
            df = pd.concat([df, df_], ignore_index=True)
           
    return df

## spatial data preprocessing
# 위경도를 geometry 타입으로 변경
def pre_trans_geometry(df,var1='Longitude',var2='Latitude'):
    from shapely.geometry import Point
    import geopandas as gpd
   
    # Point 객체를 생성하여 'geometry' 열에 저장
    geometry = [Point(xy) for xy in zip(df[var1], df[var2])]

    # GeoDataFrame 생성
    geo_df = gpd.GeoDataFrame(df, geometry=geometry)

    return geo_df

# pandasframe → geopandas 변환
def pre_trans_geodataframe(df, col: str):
    from shapely import wkt
    df_temp = df.rename(columns={col:'geometry'})
    df_temp['geometry'] = df_temp['geometry'].apply(lambda x: wkt.loads(str(x)) if pd.notnull(x) else x)
    df = gpd.GeoDataFrame(df_temp, geometry='geometry')
    df = pre_crs(df)
    return df

# 중복되는 부분을 제거하고 하나로 통합
def remove_duplicate_part(address):
    parts = address.split()
    unique_parts = []
    for part in parts:
        if part not in unique_parts:
            unique_parts.append(part)
    return ' '.join(unique_parts)

# 공간 내 카운트 함수
def pre_count(df1, df2, groupby_on = 'gid', nm = 'cnt', predicate='contains'):
   
    # 그리드에 포함된 출입구 개수를 알기 위한 sjoin
    temp = gpd.sjoin(df1,df2, how = 'inner', predicate = predicate)
   
    # 'gid' 열을 기준으로 그룹화하고 각 그룹의 행 수 세기
    grouped = temp.groupby(groupby_on).size().reset_index(name=nm)
   
    return pd.merge(df1,grouped, on ='gid',  how = 'left')

## geocoding
# vworld geocoding api
def geocoder(address):
    key = 'API KEY'
    params = {
        "service": "address",
        "request": "getcoord",
        "crs": "epsg:4326",
        "address": str(address),
        "format": "json",
        "type": "PARCEL",
        "key": f"{key}"
    }
    response = requests.get(apiurl, params=params)
    if response.status_code == 200:
        data = response.json()
        try:
            x = data['response']['result']['point']['x']
            y = data['response']['result']['point']['y']
        except:
            x = 0
            y = 0
    return pd.Series([y,x])

# naver geocode api
def sobang_naver_geo(data, client_id, client_pw) :
    import pandas as pd                                 # 데이터 프레임을 다루는 pandas 라이브러리
    from tqdm import tqdm                               # 진행 상황을 시각적으로 보여주는 tqdm 라이브러리
    import numpy as np                                  # 수학적 연산에 사용되는 numpy 라이브러리
    import urllib                                       # url 정보를 다루는 urllib 라이브러리
    import json                                         # json 파일을 다루는 json 라이브러리

    import warnings                                     # 파이썬 경고 관련 warnings 라이브러리
    warnings.filterwarnings('ignore')                   # 파이썬 경고 무시

    # 기본 url

    # 데이터 저장용 리스트
    geo_coordi = []
    geo_distance = []

    # 사업장주소 컬럼 위도/경도 변환
    for add in tqdm(data['주소1']):
        add_urlenc = urllib.parse.quote(add)

        # 주소 포함 url
        url = api_url + add_urlenc

        # url 요청용 주소 변환
        request = urllib.request.Request(url)
        # url 요청용 주소에 id 값 추가
        request.add_header("X-NCP-APIGW-API-KEY-ID", client_id)
        # url 요청용 주소에 Password 값 추가
        request.add_header("X-NCP-APIGW-API-KEY", client_pw)

        # url 요청
        try :
            response = urllib.request.urlopen(request)
        # 주소가 검색이 안되면 위도/경도 값 0으로 설정
        except urllib.error.HTTPError as e:
            latitude = 0
            longitude = 0
            geo_state = '확인불가'

        else :
            # 주소가 정상적으로 반환되면
            rescode = response.getcode()
            if rescode == 200 :
                # 반환된 정보 저장
                response_body = response.read().decode('utf-8')
                response_body = json.loads(response_body)
                if 'addresses' in response_body:
                    try:
                        latitude = response_body['addresses'][0]['y']
                        longitude = response_body['addresses'][0]['x']
                        geo_state = response_body['addresses'][0]['distance']
                       
                    except:
                        latitude = 0
                        longitude = 0
                        geo_state = '확인불가'
                # 반환된 정보에 내용이 없을 때 위도/경도 값 0으로 설정
                else :
                    latitude = 0
                    longitude = 0
                    geo_state = '확인불가'

        # 반환된 값 저장    
        geo_coordi.append([latitude, longitude])
        geo_distance.append(geo_state)
   
    # 위도/경도 데이터 추가
    df = pd.concat([data, pd.DataFrame(geo_coordi, columns = ['위도', '경도'])], axis = 1)

    # 좌표 거리 데이터 추가
    # print('geo_distance',geo_distance)
    df['좌표 변환 거리'] = geo_distance
    # # 데이터 저장
    # df.to_csv('./청주시 양봉농가 등록현황_위도 경도 추가.csv', index = False, encoding = 'cp949')
   
    return df

# kakao geocode api
def kakao_search(keyword2):
    from PyKakao import Local
    api = Local(service_key = "API KEY")

    df1 = api.search_address(keyword2, dataframe=True)
    if df1.empty:
        df2 =  api.search_keyword(keyword2, dataframe=True)
       
        if df2.empty:
            return None
        else:
            return [df2.loc[0]['y'],df2.loc[0]['x'], df2['address_name'][0]]
    else:
        return [df1.loc[0]['y'],df1.loc[0]['x'], df1['address_name'][0]]

# 다양한 주소 형태의 api 적용
def kakao_grid_search(keword1):
    words = keword1.split(" ")
    unique_words = list(OrderedDict.fromkeys(words))
   
    # 시,구,동, 번지
    keword_1 = " ".join(unique_words[:4])
    result_1 = kakao_search(keword_1)
    if result_1 is None:
        # 시,구,동, 건물
        try:
            keword_2 = " ".join(unique_words[:3] + [unique_words[4]])
        except:
            keword_2 = " ".join(unique_words[:3])
        result_2 = kakao_search(keword_2)
       
        if result_2 is None:
            # 시,구,동
            keword_3 = " ".join(unique_words[:3])
            result_3 = kakao_search(keword_3)
            if result_3 is None:
                return 0,0,np.nan
            else:
                return result_3[0], result_3[1], result_3[2]
        else:
            return result_2[0], result_2[1], result_2[2]
    else:
        return result_1[0], result_1[1], result_1[2]

## text data preprocessing
def process_commas(text):
    words = text.split(',')
    cleaned_words = [word.strip() for word in words if word.strip()]
    result = ','.join(cleaned_words)
    return result.replace(',,','')

## 시각화
def eda_viz_region_month(df,region='강원'):
    import seaborn as sns
    import matplotlib.pyplot as plt
    import matplotlib.font_manager as fm
    import matplotlib.patheffects as path_effects
    font_path = 'C:\Windows\Fonts\GULIM.ttc'  # 여기에 ttf 파일의 정확한 경로를 지정하세요.
    font_prop = fm.FontProperties(fname=font_path)
    plt.rcParams['font.family'] = font_prop.get_name()



    re_month = df.groupby(['지역','개월수']).count().reset_index()[['지역','개월수','환자연령_주민','구급보고서번호']]
    # 바 차트 그리기
    temp_gang = re_month[re_month['지역']==region][['개월수','구급보고서번호']]
    plt.figure(figsize=(30,15))
    plt.bar(temp_gang['개월수'], temp_gang['구급보고서번호'])

    # 각 막대에 값 레이블 추가
    bars = plt.bar(temp_gang['개월수'], temp_gang['구급보고서번호'])
    for bar in bars:
        yval = bar.get_height()
        plt.text(bar.get_x() + bar.get_width()/2, yval, '%d' % int(yval),
                va='bottom', ha='center', fontsize=12, #rotation=90,
                path_effects=[path_effects.withStroke(linewidth=3, foreground='white')])

    # 레이블 추가
    plt.xlabel('개월수',size=20)
    plt.ylabel('구급보고서번호',size=20)
    plt.title(f'전체 {region} 개월수 별 사고건수',size=20)

    # 그래프의 x축 눈금을 설정합니다.
    plt.xticks(temp_gang['개월수'].to_list(), size=15)

    # 바 차트 표시
    plt.show()
728x90

'데이터분석' 카테고리의 다른 글

사용자 사전 추가된 Mecab 형태소 분석 in Corab  (0) 2023.12.04
728x90

단어 통일화 및 사용자 사전 구축에 따른 mecab 형태소 분석을 Class로 정의하였습니다.

클래스 내 함수 def _morph는 변수명이 하드코딩으로 되어 있으므로 수정이 필요합니다.

또한, 해당 함수는 형태소분석기로 추출된 단어가 사고유형, 장소, 시간 등에 따라 분류가 되는 코드입니다.

만일 추출된 단어별로 재 분류가 필요 없다면 _ morph 함수를 수정해야 합니다.

 

# 사용자 사전 추가 및 형태소 분석 in corab
class Corab_morph():
    def __init__(self, df_path, ner_dic_path, user_dic_path, text_col = None, encoding = None):
        self.encoding = encoding
        self.df = self._read_file(df_path,encoding=self.encoding)
        self.ner_dic = self._read_file(ner_dic_path, sheet_name='전체')
        self.user_dic = self._read_file(user_dic_path, sheet_name='전체')
        self.text_col = text_col
        self.tokenizer = Mecab()

    def _read_file(self, path, sheet_name=None,  encoding = None):
        if '.csv' in path:
            file = pd.read_csv(path, encoding = encoding)
            return file
       
        if '.xlsx' in path or '.xls' in path:
            file = pd.read_excel(path)
            return file
        else:
            raise ImportError

    def _get_jongsung_TF(self, sample_text):
        sample_text_list = list(sample_text)
        last_word = sample_text_list[-1]
        last_word_jamo_list = list(j2hcj(h2j(last_word)))
        last_jamo = last_word_jamo_list[-1]

        jongsung_TF = "T"

        if last_jamo in ['ㅏ', 'ㅑ', 'ㅓ', 'ㅕ', 'ㅗ', 'ㅛ', 'ㅜ', 'ㅠ', 'ㅡ', 'ㅣ', 'ㅘ', 'ㅚ', 'ㅙ', 'ㅝ', 'ㅞ', 'ㅢ', 'ㅐ,ㅔ', 'ㅟ', 'ㅖ', 'ㅒ']:
            jongsung_TF = "F"

        return jongsung_TF
   
   
    # 사용자 사전 추가
    def _add_dic(self, word_list:list):
        with open("/tmp/mecab-ko-dic-2.1.1-20180720/user-dic/nnp.csv", 'r', encoding='utf-8') as f:
            file_data = f.readlines()

        # 단어추가
        for word in word_list:
            jongsung_TF = self._get_jongsung_TF(word)
            line = '{},,,,NNP,*,{},{},*,*,*,*,*\n'.format(word, jongsung_TF, word)
            file_data.append(line)

        # 덮어쓰기
        with open('/tmp/mecab-ko-dic-2.1.1-20180720/user-dic/nnp.csv','w', encoding='utf-8') as f:
            for line in file_data:
                print(line)
                f.write(line)
   
    def _replace_words(self, text):
        # word[0] : 본문내 단어, word[1] : 변경한 단어 e.g 제 주 -> 제주
        for idx, words in self.user_dic.iterrows():
            if len(words[0]) != 1:
                try:
                    text = text.replace(words[0], words[1])
                except:
                    pass
            else: # 한 글자는 변경x
                pass
        return text    
   
    # 형태소 분석
    def _morph(self, df, tokenizer):
        for idx,sent in tqdm(enumerate(df['정제_평가소견내용'])):
            spot_cls = []
            spot_word = []
            types_cls = []
            types_word = []
            cause_cls = []
            cause_word = []
            times_cls = []
            times_word = []
            dangers_cls = []
            dangers_word = []

            mecab_sent = tokenizer.morphs(sent)
            for word in mecab_sent: # 형태소 분리된 문장 sent

                if word in self.ner_dic['단어'].to_list():
                    ner = self.ner_dic[self.ner_dic['단어'] == word]['개체명'].to_list()[0]
                    cls = self.ner_dic[self.ner_dic['단어'] == word]['분류'].to_list()[0]

                    if ner == '사고유형':
                        if word not in types_word: # 중복 없애기 위함
                            types_cls.append(cls)
                            types_word.append(word)

                    elif ner == '장소':
                        if word not in spot_word:
                            spot_cls.append(cls)
                            spot_word.append(word)
                        # print('spot',spot)

                    elif ner == '위해품목':
                        if word not in dangers_word:
                            dangers_cls.append(cls)
                            dangers_word.append(word)

                    # print('cause',cause)
                    elif ner ==  '사고원인':
                        if word not in cause_word:
                            cause_cls.append(cls)
                            cause_word.append(word)

                    elif ner == '시간':
                        if word not in times_word:
                            times_cls.append(cls)
                            times_word.append(word)

                    # 위 세가지 경우말고는 다른건 제외
                    else:
                        pass

                # 단어가 사전에 없은 경우
                else:
                    continue

            # df에 넣기
            df.loc[idx,'types'] = ",".join(types_cls)
            df.loc[idx,'types_detail'] = ",".join(types_word)

            df.loc[idx,'spot'] =  ",".join(spot_cls)
            df.loc[idx,'spot_detail'] =  ",".join(spot_word)

            df.loc[idx,'cause']=  ",".join(cause_cls)
            df.loc[idx,'cause_detail']=  ",".join(cause_word)

            df.loc[idx,'object']=  ",".join(dangers_cls)
            df.loc[idx,'object_detail']=  ",".join(dangers_word)

            df.loc[idx,'time']=  ",".join(times_cls)
            df.loc[idx,'time_detail']=  ",".join(times_word)

        return df

    def run(self, word_rep = True):
       
        # 개체명 단어 추가
        self._add_dic(self.ner_dic['단어'].to_list())
       
        # 업데이트
        !bash /tmp/mecab-ko-dic-2.1.1-20180720/tools/add-userdic.sh # 해당코드를 실행시키면 자기가 관리하고 있는 모든 유저 딕셔너리를 실행, 업데이트
        !cd /tmp/mecab-ko-dic-2.1.1-20180720
        !make install
       
        # 데이터 정제
        if word_rep:
            self.df[f'정제_{self.text_col}'] = self.df[self.text_col].apply(lambda x: self._replace_words(x))
       
        # 형태소 분석
        df2 = self._morph(self.df, self.tokenizer)
       
        return df2
       
if __name__ == "__main__":
    # Mecab 설치
    !apt-get update
    !apt-get install g++ openjdk-8-jdk
    !pip3 install konlpy JPype1-py3
   
    # mecab-python의 버전 오류로 인해 아래 패키지를 설치하면 코랩에서 Mecab을 사용가능
    !pip install mecab-python3

    # 패키지 import
    from konlpy.tag import Mecab
    import pandas as pd
    from tqdm import tqdm
    try:
        from jamo import h2j, j2hcj
    except:
        ! pip install jamo
        from jamo import h2j, j2hcj

    files = Corab_morph(
        df_path='경로1',\
        ner_dic_path='경로2',\
        user_dic_path='경로3',\
        text_col = '평가소견내용' ,\
        encoding='cp949')
    df = files.run(word_rep=False)

 

728x90

'데이터분석' 카테고리의 다른 글

공간분석 프로젝트_함수정리  (0) 2023.12.04
728x90

선형회귀분석 중 조절변수, 매개변수, 스케일링에 따른 해석차이와 방법을 알고자 공부한 내용을 공유드려요

이번 기회를 통하여 여러분도 선형회귀분석의 새로운 관점을 얻고 가시길 바랍니다 :]

틀린내용이 있다면 댓글로 남겨주시면 감사하겠습니다

선형회귀_최종본.ipynb
1.07MB

 

 

728x90

+ Recent posts