• [티스토리 게시글 추천 시스템 만들기] #3 클롤링 데이터 TF-IDF 계산하기

    2021. 3. 25. 20:16

    by. 위지원

    2021.03.24 - [✎ 21.上/Data] - [티스토리 게시글 추천 시스템 만들기] #1 계획

    2021.03.24 - [✎ 21.上/Data] - [티스토리 게시글 추천 시스템 만들기] #2 크롤링해서 HDFS에 Parquet으로 저장

     

    tfidf 계산

    계산 결과를 계속 도출하는게 시간이 걸리기때문에 parquet 형태로 또 저장해두고 이후에는 그냥 hdfs에서 가져다 썼다.

     

    어.. 근데 뭔가 그냥 판다스로 계산한 것같아서... 어... 스파크를 파케이때문에 쓴 느낌이라.. 어.... 사실 스팍없어도 됬을거라는 생각이.. 👽

    def cal_tf_idf(spark, docs_df: dataframe.DataFrame, stop_word_df: dataframe.DataFrame):
        docs_info = {str(row['post_num']): row['contents'] for row in docs_df.collect()}
        stop_words = [row['stop_word'] for row in stop_word_df.select("stop_word").collect()]
    
        docs = docs_info.values()
        vect = TfidfVectorizer(max_features=2, stop_words=stop_words)
        countvect = vect.fit_transform(docs)
    
        countvect_df = pd.DataFrame(countvect.toarray(), columns=sorted(vect.vocabulary_))
        countvect_df.index = docs_info.keys()
    
        cosine_matrix = cosine_similarity(countvect_df, countvect_df)
    
        post2id = {i: c for i, c in enumerate(docs_info.keys())}
        id2post = {c: i for i, c in post2id.items()}
    
        # Toy Story의 id 추출
        idx = id2post['2']  # Toy Story : 0번 인덱스
        sim_scores = [(i, c) for i, c in enumerate(cosine_matrix[idx]) if i != idx]  # 자기 자신을 제외한 영화들의 유사도 및 인덱스를 추출
        sim_scores = sorted(sim_scores, key=lambda x: x[1], reverse=True)  # 유사도가 높은 순서대로 정렬
    
        sim_scores = [(post2id[i], score) for i, score in sim_scores[0:10]]
        print(sim_scores)
    

     

    ㅎ   ㅎ

     

    저장은 위와같은 구조로했다.

    추천 결과

    추천 결과는 아래와 같다.

    코테문제를 넣었더니 코테 문제를 추천해준다 오 ? 생각보다 잘된다.

     

    스프링 글을 넣어봤다. 오 스프링글 추천해준당 ㅎㅎㅎ잘된다.

     

    * 에공 제목을 잘 못긁어온 경우가 있다. 

    출처 url 없에느라 영어를 정규에서 빼버린게 문제같다. 영어도 넣고 url을 정규표현에서 없에는 쪽으로 수정해야할 것같다.

    --수정 후--

     

    내 블로그에서 가장 핫한 글 중 하나인 정처기도 해봤다. 역시 잘된다. 뿌듯하당 

     

     

    속도를 좀 개선해야할 것 같다 한번 request할 때마다 matrix를 계산해서 그런 것 같다. 이부분을 저장해서 바로 사용할 수 있도록 해야겠다.


    전체 코드

    코드 전문은 다음과 같다.

    import argparse
    import logging
    import re
    from typing import List, Optional
    
    import pandas as pd
    import pyspark
    import requests
    from bs4 import BeautifulSoup
    from konlpy.tag import Okt
    from pyspark.sql import SparkSession, dataframe
    from sklearn.feature_extraction.text import TfidfVectorizer
    from sklearn.metrics.pairwise import cosine_similarity
    from tqdm import tqdm
    
    logger = logging.getLogger()
    
    
    def cal_tf_idf(docs_df: dataframe.DataFrame, stop_word_df: dataframe.DataFrame, post_num: str) -> List:
        post_name: dict = {str(row['post_num']): row['post_name'] for row in docs_df.collect()}
        post_info: dict = {str(row['post_num']): re.sub(' +', " ", row['contents']) for row in docs_df.collect()}
        stop_words: List = [row['stop_word'] for row in stop_word_df.select("stop_word").collect()]
    
        docs = post_info.values()
    
        vect = TfidfVectorizer(stop_words=stop_words)
        tfvect = vect.fit(docs)
    
        countvect_df = pd.DataFrame(tfvect.transform(docs).toarray(), columns=sorted(vect.vocabulary_))
        countvect_df.index = post_info.keys()
    
        cosine_matrix = cosine_similarity(countvect_df, countvect_df)
    
        post2id: dict = {i: c for i, c in enumerate(post_info.keys())}
        id2post: dict = {c: i for i, c in post2id.items()}
    
        idx: int = id2post[post_num]
        sim_scores: List = [(i, c) for i, c in enumerate(cosine_matrix[idx]) if i != idx]
        sim_scores: List = sorted(sim_scores, key=lambda x: x[1], reverse=True)
    
        sim_scores: List = [(post2id[i], post_name[post2id[i]]) for i, score in sim_scores[0:10]]
    
        return sim_scores
    
    
    def make_stop_word(texts: List) -> List:
        okt = Okt()
        article_pos_list = []
    
        article_pos = okt.pos(texts)
        if article_pos:
            for pos in article_pos:
                if pos[1] != "Noun" or len(pos[0]) == 1:
                    article_pos_list.append({"stop_word": pos[0]})
    
        return article_pos_list
    
    
    def crawler_main(post_num: int) -> (List, List):
        contents: List = [""] * post_num
    
        logger.debug("크롤링을 시작합니다.")
        docs = []
        for post_num in tqdm(range(1, post_num)):
            post_name, result = crawler(post_num)
            if result:
                docs.append({"post_name": post_name, "post_num": post_num, "contents": result})
                contents[post_num]: Optional[List] = result
    
            else:
                logger.warning("존재하지 않는 게시글입니다.")
    
        # 불용어 생성
        logger.debug("불용어 생성을 시작합니다.")
        stop_words = []
        for post_num, content in enumerate(contents):
            if content:
                stop_word: list = make_stop_word(content)
                stop_words.extend(stop_word)
    
        return docs, stop_words
    
    
    def crawler(post_num: int) -> (str, Optional[List]):
        url: str = "https://weejw.tistory.com/%d" % post_num
        content = requests.get(url).content
        logger.debug("URL:%s" % url)
    
        soup = BeautifulSoup(content, "html.parser")
        exists_post = soup.select_one("div.absent_post")
    
        if exists_post is not None:
            return None, None
    
        contents = soup.select_one("div.tt_article_useless_p_margin")
        post_name = soup.select_one("div.box_article_tit")
    
        if contents is None:
            return None, None
    
        patterns = {
            '[^ 가-힣]+': "",
            '출처': "",
            " +": " "
        }
    
        results = str(contents.findAll("p"))
        post_name = str(post_name.find("p", class_="txt_sub_tit"))
    
        for pattern, replace in patterns.items():
            results = re.sub(pattern, replace, results)
        
        
        post_name = re.sub('(<([^>]+)>)', "", post_name) #<< #수#정
    
            results = re.sub(" +", " ", results)
    
        return post_name, results.strip() if results else None
    
    
    def get_spark_session():
        app_name = "Python"
        master = "local"
        spark = SparkSession.builder \
            .appName(app_name) \
            .master(master) \
            .getOrCreate()
    
        return spark
    
    
    def log_init():
        logger.setLevel(logging.INFO)
        formatter = logging.Formatter("%(asctime)s [%(levelname)s]: %(message)s")
        stream_handler = logging.StreamHandler()
        stream_handler.setFormatter(formatter)
        logger.addHandler(stream_handler)
    
        s_logger = logging.getLogger('py4j.java_gateway')
        s_logger.setLevel(logging.FATAL)
    
    
    def get_parser():
        parser = argparse.ArgumentParser(description="Process some integers.")
    
        parser.add_argument("--crawler-flag", default="False",
                            help="crawling and save?")
    
        parser.add_argument("--total-post-num", default=5, type=int,
                            help="Total post num")
    
        parser.add_argument("--post-num", default="2",
                            help="post num to recommendation")
    
        return parser.parse_args()
    
    
    def main(args):
        log_init()
        spark = get_spark_session()
        docs_path = "hdfs://localhost:9000/user/jiwonwee/crawler_data/docs.parquet"
        stop_word_path = "hdfs://localhost:9000/user/jiwonwee/crawler_data/stop_docs.parquet"
    
        # HDFS에 파일 저장
        if args.crawler_flag == "True":
            logger.debug("크롤링 및 저장을 시작합니다.")
            docs, stop_words = crawler_main(post_num=args.total_post_num)
    
            docs_df = spark.createDataFrame(docs)
            stop_words_df = spark.createDataFrame(stop_words)
    
            logger.debug("결과를 hdfs에 저장합니다.")
            docs_df.write.mode("overwrite").parquet(docs_path)
            stop_words_df.write.mode("overwrite").parquet(stop_word_path)
    
        # HDFS에서 파일 읽음
        else:
            logger.debug("데이터를 불러오는 중입니다.")
            try:
                docs_df = spark.read.parquet(docs_path)
                stop_words_df = spark.read.parquet(stop_word_path)
    
                logger.debug("tfidf 계산을 시작합니다.")
                recommend_result = cal_tf_idf(docs_df, stop_words_df, args.post_num)
    
                logger.debug("추천 결과를 출력합니다.")
                post_name, _ = crawler(int(args.post_num))
                logger.info("요청 페이지: https://weejw.tistory.com/%s [%s]" % (args.post_num, post_name))
                logger.info("========================================")
                for recommend in recommend_result:
                    logger.info("추천 페이지: https://weejw.tistory.com/%s [%s]" % (recommend[0], recommend[1]))
    
            except pyspark.sql.utils.AnalysisException:
                logger.error("파일을 찾을 수 없습니다!")
    
    
    if __name__ == "__main__":
        args = get_parser()
        main(args)
    

     

    REFERENCE

     

    02. 컨텐츠 기반 추천시스템 - TF-IDF를 이용한 추천시스템

    해당 글은 T-아카데미에서 발표한 추천시스템 - 입문하기의 자료에 딥러닝을 이용한 추천시스템과 추천시스템 대회를 분석한 내용을 추가한 글입니다. 해당 자료보다 더욱더 좋은 자료들이 페

    eda-ai-lab.tistory.com

     

    대화의 장 💬