Home > Software design >  when calling a module from the same Python package and using schedule
when calling a module from the same Python package and using schedule

Time:12-15

from selenium import webdriver
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.support.wait import WebDriverWait
from webdriver_manager.chrome import ChromeDriverManager
from datetime import date, time , timedelta
import datetime
from bs4 import BeautifulSoup
from 메신저.kakao import kakao
import schedule
import time
import requests
import json
import os.path
from random import randint
from time import sleep

class gmarket_sales():

def __init__(self):
           
    self.url = 'https://minishop.gmarket.co.kr/meritblog'
    self.now = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S (%a)')
    self.hour = datetime.datetime.now().strftime('%H시_%M_분')
    self.today = date.today()
    self.folder = None
    self.today_file = None
    self.kakao_talk = kakao()
        
def chrome(self,url):
    
    chrome_driver = Service(ChromeDriverManager().install())
    options = Options()
    options.add_experimental_option('detach',True)
    options.add_experimental_option('excludeSwitches',['enable-logging'])
    # options.add_argument('--headless')
    # options.add_argument('--window-size = x, y')
    # options.add_argument('--start-maximazed')
    # options.add_argument('--start-fullscreen')
    # options.add_argument('--mute-audio')
    # self.driver = webdriver.Chrome(options=options,service=chrome_driver)
    self.driver = webdriver.Chrome(options=options, service=chrome_driver)
    # url = 'http://minishop.gmarket.co.kr/hanvitis'
    self.driver.get(url)
    
    # return url

def shopping_mall(self):
    
    self.chrome('https://minishop.gmarket.co.kr/meritblog')

    mall_name = self.driver.find_element(By.CSS_SELECTOR,'a.shop_title_ui_txt').text

    self.folder = f'./메리트몰_데이터베이스/지마켓'
    self.today_file = f'{self.today}_{mall_name}_지마켓.json'
    
    return mall_name

def soup(self,url_param):
    
    # headers = {'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/108.0.0.0 Safari/537.36'}
    try:
        response = requests.get(url_param)
        if response.status_code == 200:
            print(f'응답코드는 {response.status_code} 입니다.')
            sp = BeautifulSoup(response.text, 'html.parser')
        
        return sp

    except requests.packages.urllib3.exceptions.MaxRetryError as e:
       print(str(e))

def total_product(self):
    """G마켓 미니샵 카테고리별 상품 갯수 파악하여
    총 상품수 구하기"""
            
    total_items = 0
    
    products = self.driver.find_element(By.ID,'ulCategory').find_elements(By.CSS_SELECTOR,'span.data_num')
    for product in products:
        items = int(product.text.replace('(',"").replace(')',""))
        total_items  = items
            
    # 391개
    return total_items

def paging(self,total_items):
    """매개변수로 전달받은 총 상품 갯수와
    한 페이지에 보여 줄 상품 갯수를 이용하여 총 페이지 수 구해서
    페이지 리스트 반환"""
           
    page_list = []

    # 전체상품보기 클릭
    self.driver.execute_script('arguments[0].click();',self.driver.find_element(By.CSS_SELECTOR,'.allList_view > a'))
    time.sleep(2)

    # 한 페이지의 상품 수
    view_limit = int(self.driver.find_element(By.CSS_SELECTOR,'div.limit').text.replace("개씩",""))
    
    # 페이지 수 구하기
    if total_items % view_limit == 0:
        page = total_items // view_limit
    else:
        page = total_items // view_limit   1
    
    # 페이지 리스트
    for cnt in range(page):
        page_url = f'{self.url}/List?CategoryType=General&SortType=FocusRank&DisplayType=SmallImage&Page={cnt 1}&PageSize=60'
        page_list.append(page_url)

    return page_list

# ==========================================================================================================================
# 데이터 크롤링을 두 단계로 나눈다
# [1] data_one --> 상품 url과 가격 정보
# [2] data_two --> 상품 제반정보
# [3] processing --> 두 상품 정보를 비교하여 합친다.
# ==========================================================================================================================

def data_one(self,page_list):
    """상품 url 리스트
    정상가/할인가/할인율 딕셔너리"""
    
    url_list = []
    price_dic = {}

    for i_page,page in enumerate(page_list):           
        
        print(f'{i_page 1} 페이지의 정보를 크롤링 합니다.')
        html = self.soup(page)

        for items in html.find('ul',class_='type2').find_all('li'):

            # url
            item_url = items.find('a')['href']
            
            # 상품코드
            item_code = item_url[-10:]
            
            # 가격 및 할인율
            if items.find('p',class_='prd_price').find('span',class_='del_important'):
                original_price = items.find('p',class_='prd_price').find('span',class_='del_important').text.replace("원","")
                discount_price = items.find('p',class_='prd_price').find('strong').text.replace("원","")
                sale_rate = items.find('p',class_='prd_price').find('span',class_='splt_ico usr_clr').text
            
            else: 
                original_price = items.find('p',class_='prd_price').find('strong').text.replace("원","")
                discount_price = "없음"
                sale_rate = "없음"
            
            url_list.append(item_url)

            price_dic[item_code]={"정상가":original_price,"할인가":discount_price,"할인율":sale_rate}
        
        sleep(randint(1,10))
                           
    return url_list , price_dic 

def data_two(self,url_list):
    """상품코드/품목/상품명/재고수량/리뷰수 리스트"""
    data_list = []
   
    for index, url in enumerate(url_list):
        
        item_html = self.soup(url)
       
        try:
            # 스크립트 tag의 텍스트 추출 -> split 함수로 구분해서 필요 데이터만 추출
            data = item_html.find('div',class_="vip-tabcontentwrap").find('script',{'type':'text/javascript'}).text.split(';')[0]
            indexing = data.find('{')
            script = json.loads(data[indexing:])
        except Exception as e:
            print(str(e))
            continue
                                
        # 크롤링
        code = script['GoodsCode'] # 코드
        subject = script['GdscName'] # 품목
        title = script['GoodsName'] # 상품명
        stock = script['MinSellAmount'] # 재고수량
        review = script['ReviewCount'] # 리뷰수
        
        item_dic = {'시간':self.hour,'순번':index 1,'상품코드':code,'품목':subject,
        "상품명":title,"재고수량":stock,"리뷰수":review,"url":url}
        
        data_list.append(item_dic)  
        
    return data_list    

def most_recent_date(self):
    """지정 폴더 안에서 [오늘]을 제외한 가장 최근의 날짜를 찾는 조건문"""
    
    # 지마켓 폴더 안에, 마지막 날짜의 파일을 찾는 조건문
    all_files = sorted(list(os.listdir(self.folder)))

    if len(all_files) == 0:
        print(f'{self.folder} 경로의 파일이 없습니다.')
        last_day_file = None
        # last_day_file = all_files[0]
    elif len(all_files) == 1:
        print(f'{self.folder} 경로의 파일은 1개 입니다.')
        last_day_file = all_files[0]
    else:
        if self.today_file in all_files:
            last_day_file = all_files[-2]
        else:
            last_day_file = all_files[-1]

    return last_day_file   

def write_data(self,price_dic,data_list):
    """금일자 파일 유무 검사 후
    파일이 있으면 덮어쓰기"""

    if os.path.exists(f'{self.folder}/{self.today_file}'):
        with open(f'{self.folder}/{self.today_file}','r',encoding='utf-8-sig') as f:
            item_dic = json.load(f)     
    else:
        item_dic = {}
            
    item_dic[self.now]=[]

    for data in data_list:

        if data['상품코드'] in price_dic.keys():
            data.update(price_dic[data['상품코드']])
            item_dic[self.now].append(data)
        else:
            continue
    
    with open(f'{self.folder}/{self.today_file}','w',encoding='utf-8-sig') as f:
        json.dump(item_dic,f,indent=4,ensure_ascii=False)

    return item_dic

def get_stock_data(self,data_list):

    stock_dic = {}

    for data in data_list:
        try:
            code = data['상품코드']
            num = data['순번']
            subject = data['품목']
            title = data['상품명']
            stock = data['재고수량']
            review = data['리뷰수']
            original_price = data['정상가']
            discount_price = data['할인가']
            sale_rate = data['할인율']
            url = data['url']
        except Exception as e:
            print('오류사항 :',str(e))
            continue
    
        stock_dic[code]={'시간':self.hour,'순번':num,'품목':subject,'상품명':title,'재고수량':stock,'리뷰수':review,'정상가':original_price,
        '할인가':discount_price,'할인율':sale_rate,"url":url}

    return stock_dic

def before_after_data(self,item_dic,last_day_file):
    """before_data = 금일 또는 가장 최근의 데이터 중 직전 데이터
    after_data = 현 시각 크롤링 한 데이터"""

    key_list = sorted(item_dic.keys())
    
    recent_key = key_list[-2:]

    if last_day_file == None:
        print("해당 폴더에 저장된 데이터가 없으므로 현재 데이터만 저장 하겠습니다.")
        before_dic = None

    elif len(recent_key) <= 1:
        print("금일자 데이터 1개 이므로, 최근 일자 데이터 중 마지막 타임 데이터를 가져오겠습니다.")
        with open (f'{self.folder}/{last_day_file}','r',encoding='utf-8-sig') as f:
            last_day = json.load(f)
        
        last_time = sorted(last_day.keys())[-1]

        recent_key.append(last_time)
        recent_key = sorted(recent_key)

        before_dic = self.get_stock_data(last_day[last_time])
        
    else:
        print("금일자 데이터 2개 이상입니다.")
        before_dic = self.get_stock_data(item_dic[recent_key[0]])
    
    after_dic = self.get_stock_data(item_dic[recent_key[-1]])

    return before_dic , after_dic , recent_key
       
def real_time_sales(self,before_dic,after_dic,mall_name,recent_key):
    """before_dic 데이터와 after_dic 데이터 비교"""

    if before_dic == None:
        return after_dic
    
    else:
        print(f"{datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')} ----- item_counting ----- {len(after_dic.keys())}")

        for key in after_dic.keys():

            item_code = key
            try :
                before_stock = int(before_dic[item_code]['재고수량'])
                print(f"\nBefore_Time -- [{recent_key[0]}]\n상품코드 -- [{key}]\n재고수량 -- {before_stock}개")
                
                after_stock = int(after_dic[item_code]['재고수량'])
                print(f"\nAfter_Time -- [{recent_key[-1]}]\n상품코드 -- [{key}]\n재고수량 -- {after_stock}개")

            except Exception as e:
                print('오류사항 =',str(e))
                continue
        
            # 데이터 X
            if not before_stock or not after_stock:
                # print(f"\nBefore_Time 또는 After_Time 데이터가 없습니다.\n")
                pass
            # 동일
            elif before_stock == after_stock:
                # print(f'\nBefore_Time == After_Time 재고수량이 동일합니다.\n')
                pass
            # 취소
            elif before_stock < after_stock:
                cancell = after_stock - before_stock
                print(f"\nBefore_Time에 판매된 상품이 취소되었습니다.\n취소된 수량은 {cancell}개 입니다.")
                                    
                self.kakao_talk.send_message(f'[지마켓] [{mall_name}] 취소 알림톡' '\n'
                 f'[{datetime.datetime.now().replace(second=0,microsecond=0)}]' '\n\n'
                 f'상품코드 : [{item_code}]' '\n'
                 f'상품명 : {after_dic[key]["상품명"][:16]}' '\n'
                 f'정상가 : {after_dic[key]["정상가"]}' '\n'
                 f'할인가 : {after_dic[key]["할인가"]}' '\n'
                 f'할인율 : {after_dic[key]["할인율"]}' '\n'
                 f'취소수량 : {cancell}' '\n\n'
                )            
            # 판매
            else:
                real_time_sale = before_stock - after_stock
                self.kakao_talk.send_message(f'[지마켓] [{mall_name}] 판매 알림톡' '\n'
                 f'[{datetime.datetime.now().replace(second=0,microsecond=0)}]' '\n\n'
                 f'상품코드 : [{item_code}]' '\n'
                 f'상품명 : {after_dic[key]["상품명"][:16]}' '\n'
                 f'정상가 : {after_dic[key]["정상가"]}' '\n'
                 f'할인가 : {after_dic[key]["할인가"]}' '\n'
                 f'할인율 : {after_dic[key]["할인율"]}' '\n'
                 f'판매수량 : {real_time_sale}' '\n\n'
                #  "="*23
                )      

# ===================================================================================================
# 신규 아이템 / 삭제 아이템 체크
# ===================================================================================================

def text_processing(self,item_list,text):

    start_pos = 0
    end_pos = len(item_list)
    step = 2
    count = 1
    
    for idx in range(start_pos,end_pos step,step):
        out = item_list[start_pos : start_pos step]
        
        if out != []:
            text_1 = f'[{count}] - {text} 상품 리스트\n{self.now}\n\n'

            if text == '신규등록':
                for data in out:
                    key = list(data.keys())[0]
                                            
                    elem1 = f'상품코드 : {key}'
                    elem2 = f'url : {data[key]["url"]}'
                                            
                    text_1  = elem1 '\n'
                    text_1  = elem2 '\n'
            else:
                for data in out:
                    key = list(data.keys())[0]
                                            
                    elem1 = f'상품코드 : {key}'
                    elem2 = f'상품명 : {data[key]["상품명"]}'
                                            
                    text_1  = elem1 '\n'
                    text_1  = elem2 '\n'
                                            
            count =1

            self.kakao_talk.send_message(text_1)
        start_pos =step

def new_item(self,before_dic,after_dic):

    if before_dic != None :
    
        new_item_list = []
        text = '신규등록'

        for code in after_dic.keys():
        
            if code not in before_dic.keys():
                
                new = {code:after_dic[code]}
                new_item_list.append(new)
        
        if new_item_list != []:
            print(f'신규등록 된 상품은 총 {len(new_item_list)}개 입니다.')
            self.text_processing(new_item_list,text)
        else:
            print('신규등록 된 상품이 없습니다.')
    
def delete_item(self,before_dic,after_dic):

    if before_dic != None:
    
        delete_item_list = []
        text = '삭제처리'

        for code in before_dic.keys():
        
            if code not in after_dic.keys():
                
                delete = {code:before_dic[code]}
                delete_item_list.append(delete)
        
        if delete_item_list != []:
            print(f'삭제처리 된 상품은 총 {len(delete_item_list)}개 입니다.')
            self.text_processing(delete_item_list,text)
        else:
            print('삭제처리 된 상품이 없습니다.')
                
    else:

        return after_dic 

def check_start(self):
    
    # url = self.connect()
    mall_name = self.shopping_mall()
    total_items = self.total_product()
    page_list = self.paging(total_items)
    url_list,price_dic = self.data_one(page_list)
    data_list = self.data_two(url_list)
    last_day_file = self.most_recent_date()
    item_dic = self.write_data(price_dic,data_list)
    before_dic,after_dic,recent_key = self.before_after_data(item_dic,last_day_file)
    self.real_time_sales(before_dic,after_dic,mall_name,recent_key)
    self.new_item(before_dic,after_dic)
    self.delete_item(before_dic,after_dic)
    self.driver.quit()

if __name__ == "__main__":
    
    g_market = gmarket_sales()
    # g_market.check_start()
    # schedule.every(15).minutes.do(g_market.check_start)
    schedule.every().hour.at(":11").do(g_market.check_start)
    # schedule.every().hour.at(":30").do(g_market.check_start)
    # schedule.every().hour.at(":40").do(g_market.check_start)
    
    while True:
        schedule.run_pending()
        time.sleep(1)

Questions

[1] schedule.every(15).minutes.do(g_market.check_start)

When the above code is executed, the url connection does not work. Conversely schedule.every().hour.at(":11").do(g_market.check_start) In the set time zone, connecting is connected, but I do not know the principle.

[2] If you extract json data, datetime.datetime.now().strf('%Y-%m-%d %H:%M:%S (%a)') Only the first crawl time data is recorded in json.

[3] Intermittently I get MaxRetryError: HTTPConnectionPool(host='127.0.0.1', port=58408): Max retries exceeded with url error. I seem to have a problem with self.driver.quit(), but I can't quite figure out how to fix it.

CodePudding user response:

The code you've provided appears to run at the expected interval. I specifically tested with schedule.every(15).minutes.do(g_market.check_start), and confirmed it ran every 15 minutes.

There are multiple methods to add a basic timer to the main loop to display that it is running. Here is one example that you can modify as needed that will just confirm to you that the program is in fact running:

    start_time = datetime.datetime.now()   # Get time before entering main loop
    print(f"Program starting at {start_time}")
    while True:
        schedule.run_pending()
        time.sleep(1)
        current_time = datetime.datetime.now()   # Get time for now and check time passed
        seconds_passed = (current_time - start_time).total_seconds() 
        if seconds_passed > 180:  # Display a message every 3 minutes (adjust as desired).
            print(f"Main loop running: {current_time}")
            start_time = current_time  # Reset start_time for comparison. 

I would also suggest making the following insertion to the check_start() method so you know a scheduled job has started:

    def check_start(self):
        print(f"Scheduled job starting @{datetime.datetime.now()}")
        # url = self.connect()
        mall_name = self.shopping_mall()
        ...

As for the exception handling, I think that within the soup() method, you may need to handle more exception types, such as requests.exceptions.ConnectTimeout and urllib3.exceptions.ConnectTimeoutError. Then you need to decide how you want to handle these exceptions - do you want to wait and try again, or do you want to continue to the next item to be processed. The soup() method currently returns None when an exception is thrown, even if you catch it correctly, so the caller needs to handle that None return value. Example:

    def data_one(self,page_list):
        """상품 url 리스트
        정상가/할인가/할인율 딕셔너리"""
        
        url_list = []
        price_dic = {}
        
        for i_page,page in enumerate(page_list):           
            
            print(f'{i_page 1} 페이지의 정보를 크롤링 합니다.')
            html = self.soup(page)
            if html is None:
                continue   # soup failed to get a response, so go to the next page.

            for items in html.find('ul',class_='type2').find_all('li'):
                 ...

    def data_two(self,url_list):
        """상품코드/품목/상품명/재고수량/리뷰수 리스트"""
        data_list = []
   
        for index, url in enumerate(url_list):
            
            item_html = self.soup(url)
            if item_html is None:
                continue  # soup got no response, so go to the next item.
            try:
                # 스크립트 tag의 텍스트 추출 -> split 함수로 
                ...

  • Related