競馬予測AIの作成⑯(レース結果取得プログラムのバージョンアップ)

Python

はじめに

以前作成したレース結果を取得するプログラムを改修しました。
主な改修のポイントは下記となります。

  1. 可読性と保守性を向上させるために下記を適用
    – 関数ベースからクラスベースに変更
    – 1つの関数、メソッドで実施する処理を簡素化
    – 関数、メソッドのコメントをreStructuredTextスタイルに変更
  2. スクレイピング時のマルチプロセスの実行エラーの回避
    chromedriverはマルチプロセスに対応していないので、排他制御を追加

参考にした書籍

¥3,190 (2024/01/23 07:45時点 | Yahooショッピング調べ)

プログラムの目的

本プログラムは、競馬のレース結果をnetkeiba.comからスクレイピングし、その情報整理して保存するためのプログラムです。本プログラムでは、指定した年の範囲でレース結果を収集します。

プログラムの動作概要

  1. 指定した年の範囲に基づいてスクレイピングを開始します。
  2. 指定した年のレース検索ページにアクセスします。
  3. レース詳細ページへのリンクを収集し、それぞれのリンクからレース結果を取得します。
  4. 取得したレース結果はDataFrameに整形し、CSVファイルに保存します。
  5. すべての指定された年に対してこのプロセスを繰り返します。

プログラムの主な機能

機能概要
レース結果のスクレイピングnetkeiba.com からレース結果の詳細情報をスクレイピングします。
データの整形と保存スクレイピングしたレース結果のデータを整形し、CSVファイルとして保存します。
動的Webページの処理JavaScriptによって動的に生成されるコンテンツを処理するためにSeleniumを使用します。
マルチプロセス対応複数のレース情報を同時に取得するためにマルチプロセスを使用します。

プログラムで取得する情報

本プロジェクトで取得するデータは下記となります。

カラム名
A列タイム01:27.6(分:秒.マイクロ秒)
B列出馬名ピリカメノコ
C列出馬の父親名マジェスティックウォリアー
D列出馬の母親名Dream Supreme
E列性齢牝2
F列騎手体重54
G列騎手名坂口裕一
H列オッズ1.7
I列人気順位1
J列馬体重454(-4)
K列距離1300
L列天候 曇
M列馬場ダート
N列馬場状態 重
O列レース日2023年12月31日
P列レース名2歳C2一組
Q列開催場所9回水沢5日目
R列レース番号1 R

<取得したデータの一部>

Pythonと主なライブラリのバージョン

ライブラリバージョン補足
Python3.12.1本プログラムの作成言語
BeautifulSoup4.12.3HTMLの解析とスクレイピングに使用
Selenium4.16.0動的コンテンツの処理に使用
Pandas2.2.0データの整形と分析に使用
multiprocessing標準ライブラリマルチプロセスの実装に使用

ユーザーインターフェース

本プログラムはコマンドラインベースであり、GUIは提供していません。
コマンドラインを通じて本プログラムを実行します。

プログラムの構成

以下の主なモジュールから構成されます。

モジュール概要
データ収集モジュールSeleniumとBeautifulSoupを使用して、指定されたWebページからデータを取得します。
データ処理モジュール取得したデータをPandasを使用して整形し、分析や記録に適した形式に変換します。
データ保存モジュール整形したデータをCSVファイルとして保存します。
マルチプロセス管理モジュールPythonのmultiprocessingを使用して、複数のプロセスを管理し、データ収集を並列化します。

プログラムのフローチャート

<補足>

  • 茶色:最初に実行される箇所
    マルチプロセスで動作するので最初に実行される箇所は2箇所となります。
  • 緑色:他の関数やメソッドを呼び出さない
  • 灰色:通常の関数、メソッド
  • 関数名、メソッド名の前の数字はプログラム内の行数

プログラムの関数とメソッドの概要

関数、メソッド行数概要
create_year_range(start_year, end_year, current_year):25指定された開始年と終了年の間の年のリストを生成します。開始年と終了年が現在の年より未来でないことを確認し、それに応じて年のリストを返します。
initialize_browser():55Selenium用のChromeブラウザインスタンスを初期化し、返します。オプション設定やWebDriverのインストールなどを行います。
save_race_results(race_results, race_results_year, race_results_month):85取得したレース結果(DataFrame形式)をCSVファイルとして保存します。ファイル名には年と月が含まれます。
wait_for_random_time(min, max):102指定された最小時間と最大時間の間でランダムな時間だけ待機します。
scroll_webpage(browser, pixel):112ChromeブラウザインスタンスでWebページを縦にスクロールします。スクロール量はピクセル単位で指定します。
init(self):124RaceResultsScraperクラスのコンストラクタです。初期設定として現在の日付や年、処理開始時間などの属性を設定します。
create_month_range(self, year_to_generate_month_range):140指定された年に対して、その年の月の範囲を生成します。現在の年によって、生成する月の範囲が異なります。
generate_year_month_pairs(self, year_to_generate_year_month_pairs):165指定された年について、年と月のペアのリストを生成します。これはスクレイピングするレースの年月を指定するのに使用します。
collect_race_links(self, browser):186レース詳細のWebページへのリンクを集めます。ブラウザインスタンスを使用してページをナビゲートし、リンクを収集します。
fetch_race_results(self, link_list, year, month):214指定されたリンクのリストを使用して、各レースの詳細結果を取得します。これらの結果はDataFrameに結合されて返します。
scrape_race_details(self, year, month):239指定された年と月のレース詳細情報をスクレイピングし、CSVファイルとして保存するメインの関数です。
search_race(self, browser, year, month):270指定された年と月に基づいてレースを検索します。この関数は検索条件を設定し、検索を実行します。
set_race_conditions(self, browser):287netkeiba.comでのレース検索条件(競争種別)を設定します。
set_search_period(self, browser, year, month):300netkeiba.comでのレース検索条件(期間)を設定します。
set_display_options(self, browser, num):327netkeiba.comでのレース検索の表示オプションを設定します。
submit_search(self, browser):339netkeiba.comでのレース検索を実行します。
make_race_url_list(self, browser, link_list):349取得したHTMLからレース結果のURLを抽出し、URLリストを作成します。
get_page_html(self, browser):377WebページのHTMLを取得します。
get_absolute_url(self, relative_url):386相対URLから絶対URLを生成します。
is_race_link(self, url):397URLがレース結果のリンクであるかどうかを判断します。
click_next_btn(self, browser, count_click_next):416「次へ」ボタンをクリックし、ページを進めます。
get_race_page_html(self, url):453指定されたURLのWebページからHTMLを取得します。
prepare_dataframe_to_save_race_results(self, soup):471スクレイピングしたHTMLからレース結果をデータフレームに整形します。
format_race_results_table(self, race_results_extracted_columns, soup):521スクレイピングしたHTMLから取得したレース結果データを整形し、保存用のデータフレームを作成します。
get_detail_race_results(self, url):563指定されたURLからレースの詳細結果を取得し、整形されたデータフレームを返します。
get_horse_parents_name(self, soup):585出走馬の血統情報(父馬と母馬の名前)を取得します。
get_horse_links(self, race_table_data):606レースの結果テーブルから出走馬の詳細Webページへのリンクを取得します。
get_blood_table(self, link):622指定されたリンクから馬の血統情報を含むHTMLテーブルを取得します。
get_horse_father_name(self, soup_blood_table):639BeautifulSoupオブジェクトから馬の父親の名前を取得します。
get_horse_mother_name(self, soup_blood_table):653BeautifulSoupオブジェクトから馬の母親の名前を取得します。
retry_request(self, url, max_retries=3, retry_delay=1):667指定されたURLに対してリクエストを行い、必要に応じてリトライを実行します。
get_race_name(self, soup):687指定されたBeautifulSoupオブジェクトからレース名を取得します。
get_race_round(self, soup):699指定されたBeautifulSoupオブジェクトからレースのラウンド数を取得します。
get_race_date(self, soup):713指定されたBeautifulSoupオブジェクトからレースの開催日を取得します。
get_race_place(self, soup):728指定されたBeautifulSoupオブジェクトからレースの開催場所を取得します。
get_race_distance(self, soup):742指定されたBeautifulSoupオブジェクトからレースの距離を取得します。
get_race_weather(self, soup):759指定されたBeautifulSoupオブジェクトからレースの開催時の天候を取得します。
get_race_condition(self, soup):774指定されたBeautifulSoupオブジェクトからレースの開催時の馬場状態を取得します。
combine_race_result(self, year, end_month, race_results_path):789指定された年と月の範囲におけるレース結果のCSVファイルを結合します。

プログラムの実行環境の作成

プログラムを実行するための環境を構築します。

chromeのインストール

chromeを用いてスクレイピングを実施します。プログラムを実行する環境にchromeの最新バージョンをインストールしてください。

Python3.12のインストール

Microsoft StoreからPython3.12を入手します。

  1. [スタート]→[Microsoft Store]を選択します。
  2. [Microsoft Store]の上部にある検索バーで[python]と入力して検索します。
  3. 「Python3.12」の「入手」をクリックします。

Pythonの仮想環境の作成

  1. Cドライブの直下に[keiba_ai]のフォルダを作成します。
  2. [keiba_ai]フォルダ内に本ブログの[get_raceResults.py]をコピーします。
  3. [スタート]→[Windows システム ツール]→[コマンド プロンプト]をクリックします。
  4. [コマンド プロンプト]で下記のコマンドを実行して[keiba_ai]フォルダに移動します。

    > cd C:\keiba_ai


  5. 下記のコマンドを実行してPythonの実行環境を構築します。

    > python -m venv venv


  6. 下記のコマンドを実行してPythonの実行環境を起動します。

    > venv\Scripts\activate.bat


  7. 下記のコマンドを実行してPythonの実行環境のパッケージのインストール状況を確認します。

    (venv)> pip list


    pipのパッケージのバージョンが古いと、下記のように[notice]が表示されることあります。



    Package    Version
    ---------- -------
    pip        22.2.2
    setuptools 63.2.0

    [notice] A new release of pip available: 22.2.2 -> 23.3.2
    [notice] To update, run: python.exe -m pip install --upgrade pip


  8. [notice]が表示されたら下記のコマンドを実行します。

    (venv)> python.exe -m pip install --upgrade pip


  9. 下記のコマンドを実行してPythonの実行環境にパッケージをインストールします。

    pip install pandas==2.2.0
    pip install bs4==0.0.2
    pip install lxml==5.1.0
    pip install requests==2.31.0
    pip install filelock==3.13.1
    pip install selenium==4.16.0
    pip install webdriver-manager==4.0.1
    pip install tqdm==4.66.1


以上でプログラムの実行環境の構築は完了です。

プログラムの実行

実行環境を[C:\keiba_ai\]とした場合の手順となります。
本プログラムはマルチプロセスで動作します。1年分の情報を取得するのに、プロセスを4つに設定した場合はおおよそ12時間ほど掛かります。

同時に実行するプロセス数と取得する期間の設定方法は後述します。

  1. [スタート]→[Windows システム ツール]→[コマンド プロンプト]をクリックします。
  2. 下記のコマンドを実行してプログラムを実行します。

    > cd C:\keiba_ai
    > venv\Scripts\activate.bat
    (venv)> python get_raceResults.py


  3. 実行するとchromeが自動起動してきます。
    また、コマンドプロンプトに下記のメッセージが表示されます。

    2023年1月の情報取得
    2023年4月の情報取得
    2023年2月の情報取得
    2023年3月の情報取得
    次のWebページ無し
    次のWebページ無し
    年: 2023 月: 3 のレース結果の詳細を取得します
    0%| | 0/1445 [00:00<?, ?it/s] 次のWebページ無し
    年: 2023 月: 1 のレース結果の詳細を取得します


  4. 情報取得が完了するとコマンドプロンプトに下記のメッセージが表示されます。

    2023 年 10 月のレース結果のCSVファイルを作成します。
    100%|███████████████████████████████████████████████| 1482/1482 [2:59:29<00:00, 7.27s/it]
    2023 年 11 月のレース結果のCSVファイルを作成します。
    100%|███████████████████████████████████████████████| 1477/1477 [3:00:51<00:00, 7.35s/it]
    2023 年 12 月のレース結果のCSVファイルを作成します。
    実行時間:38671.72秒


  5. [C:\keiba_ai\data\raceresults\]配下に取得した情報を保存したCSVファイルが作成されます。

同時に実行するプロセス数の設定

同時に実行するプロセス数は「class RaceResultsScraper」の「__init__」で設定しています。
下記の値を変更することでマルチプロセスの数を調整できます。

        # マルチプロセスのプール数を設定する
        self.pool_size = 4

マルチプロセスの数は、3あるいは4を設定すると安定してプログラムを実行できます。
5以上に設定すると、プログラムを実行する環境によっては、レース結果の取得に失敗が頻発することがあります。

取得する期間の設定

同時に実行するプロセス数は「class RaceResultsScraper」の「__init__」で設定しています。
下記の値を変更することで取得する期間を調整できます。

        # レース結果取得の始まりと終わりの年を設定する
        self.start_year = 2022
        self.end_year = 2024

プログラムの全コード

# ライブラリの読み込み
import pandas as pd
import urllib.parse
import requests
import re
import time
import os
import random
import datetime
from filelock import FileLock, Timeout

from multiprocessing import Pool
from selenium.webdriver.common.by import By
from selenium.webdriver.support.select import Select
from bs4 import BeautifulSoup

from selenium.webdriver.chrome.service import Service as ChromeService
from selenium import webdriver
from webdriver_manager.chrome import ChromeDriverManager

# プログレスバーを表示するためのライブラリを読み込む
from tqdm import tqdm


def create_year_range(start_year, end_year, current_year):
    """
    指定された開始年と終了年の範囲で年のリストを生成する。
    :param int start_year: 範囲の開始年。
    :param int end_year: 範囲の終了年。
    :param int current_year: 現在の年。
    :return: 開始年から終了年までの年のリスト。
    :rtype: list[int]
    """
    # 入力値の型チェック
    if not isinstance(start_year, int) or not isinstance(end_year, int):
        raise TypeError("start_yearとend_yearはint型である必要があります。")

    # start_yearがend_yearより大きい場合のエラーチェック
    if start_year > end_year:
        raise ValueError("start_yearはend_yearより大きくてはいけません。")

    # start_yearが現在の年より未来である場合のエラーチェック
    if start_year > current_year:
        raise ValueError("start_yearは現在の年より未来であってはいけません。")

    # end_yearが現在の年より未来である場合のエラーチェック
    if end_year > current_year:
        raise ValueError("end_yearは現在の年より未来であってはいけません。")

    # 年のリストを作成
    make_year_list = list(range(start_year, end_year + 1))
    return make_year_list


def initialize_browser():
    """
    Chromeブラウザを初期化し、browserにインスタンスを設定する。
    ヘッドレスモードでの起動、不要なログ出力を抑制するオプションを追加する。
    :return: browserインスタンス
    :rtype: webdriver.Chrome
    """

    # ロックファイルのパス
    lock_file_path = './data/raceresults/chromedriver_install.lock'

    # FileLockオブジェクトを作成
    lock = FileLock(lock_file_path, timeout=120)    # 120秒のタイムアウトを設定

    try:
        # ロックを取得(ロックファイルが存在しない場合は作成、存在する場合は解放されるのを待つ)
        with lock.acquire():
            # ロックを取得できたら、Chromedriverをインストールする
            chrome_options = webdriver.ChromeOptions()
            # chrome_options.add_argument('--headless')
            chrome_options.add_experimental_option('excludeSwitches', ['enable-logging'])
            browser = webdriver.Chrome(service=ChromeService(ChromeDriverManager().install()), options=chrome_options)

            return browser
    except Timeout:
        # ロック取得のタイムアウトの場合
        print('ロックの取得に失敗しました。')
        return None


def save_race_results(race_results, race_results_year, race_results_month):
    """
    取得したレース結果をCSVファイルとして保存する。
    引数として与えられたレース結果データフレーム(race_result)を、指定された年(race_results_year)と月(race_results_month)を
    ファイル名に含めてCSVファイルとして保存する。保存先のディレクトリが存在しない場合は、ディレクトリを新規に作成する。
    :param pd.DataFrame race_results: 保存するレース結果のデータフレーム。
    :param int race_results_year: レース結果の年。
    :param str race_results_month: レース結果の月。
    :return: 無し。CSVファイルとしての保存を行う。
    """
    print(race_results_year, '年', race_results_month, '月のレース結果のCSVファイルを作成します。')
    os.makedirs('./data/raceresults', exist_ok=True)
    file_name = f'{race_results_year}_{race_results_month}_raceresults.csv'
    file_path = f'./data/raceresults/{file_name}'
    race_results.to_csv(file_path, encoding='cp932', header=False, index=False, errors="ignore")


def wait_for_random_time(min, max):
    """
    minとmaxの範囲内でランダムな時間だけ待機する。
    :param int min: 最小待機時間(秒)
    :param int max: 最大待機時間(秒)
    :return: 無し。
    """
    time.sleep(random.uniform(min, max))


def scroll_webpage(browser, pixel):
    """
    Webページを指定されたピクセル分だけ縦方向にスクロールする。
    :param webdriver.Chrome browser: スクロール対象のブラウザオブジェクト。
    :param int pixel: スクロールするピクセル数(正の整数でWebページ下部へ、負の整数でWebページ上部へスクロール)
    :return: 無し。
    """
    # 画面を下にスクロールする
    browser.execute_script(f'window.scrollTo(0, {pixel});')


class RaceResultsScraper:
    def __init__(self):
        # 現在の日付、月、年を取得
        self.current_date = datetime.date.today()
        self.current_year = self.current_date.year
        self.current_month = self.current_date.month

        # マルチプロセスのプール数を設定する
        self.pool_size = 4

        # プログラムの開始時刻を取得する
        self.start_time = time.time()

        # レース結果取得の始まりと終わりの年を設定する
        self.start_year = 2022
        self.end_year = 2024

    def create_month_range(self, year_to_generate_month_range):
        """
        指定された年における月の範囲を生成する。
        指定された年が現在の年より未来であったり、現在の年で1月の場合には処理を行わない。
        :param int year_to_generate_month_range: 月の範囲を生成する年。
        :return: 月のリスト, 終了月。
                現在の年で1月の場合は、戻り値は「None,None」を返す。
                現在の年と同じ年の場合は、戻り値は「現在の月までのリスト」と「その前の月」となる。
                現在の年以外の場合は、戻り値は「1月から12月までのリスト」と「12月」となる。
        :rtype: tuple
        """
        if year_to_generate_month_range == self.current_year and self.current_month == 1:
            # 現在の年で1月の場合は、戻り値は「None,None」を返す
            return None, None
        elif year_to_generate_month_range == self.current_year and self.current_month > 1:
            # 現在の年と同じ年の場合は、戻り値は「現在の月までのリスト」と「その前の月」となる。
            month_list = range(1, self.current_month)
            end_month = self.current_month - 1
        else:
            # 現在の年以外の場合は、戻り値は「1月から12月までのリスト」と「12月」となる
            month_list = range(1, 13)
            end_month = 12

        return month_list, end_month

    def generate_year_month_pairs(self, year_to_generate_year_month_pairs):
        """
        指定された年に対して、その年の月のペアを生成する。
        指定された年に対して対応する月のリストを生成し、その年の月のペアのリストを生成する。
        現在の年で1月の場合は、その年に対する月のペアを生成しない。
        :param int year_to_generate_year_month_pairs: 年と月のペアを生成する年。
        :return: 指定された年とそれに対応する月のペアのリスト。各ペアは(year, month)の形式で、monthは文字列。
        :rtype: list[tuple]
        """
        # 月のリストを作成する
        month_list, end_month = self.create_month_range(year_to_generate_year_month_pairs)

        # month_listがNoneの場合、処理をスキップする
        if month_list is None:
            return None

        # 年と月のペアを作成する
        year_month_pairs = [(year_to_generate_year_month_pairs, str(month)) for month in month_list]

        return year_month_pairs, end_month

    def collect_race_links(self, browser):
        """
        レース詳細Webページのリンクを集める。
        :param webdriver.Chrome browser: レース結果を集めるためのブラウザオブジェクト。
        :return: レース詳細Webページへのリンクが含まれるリスト
        :rtype: list
        """
        link_list = []  # リンクWebページのURLリスト
        no_click_next = 0  # 0:次のWebページ無し、1:次のWebページ有り
        count_click_next = 0  # 0:検索結果の1ページ目、1以上:検索結果の2ページ目以上

        # count = 0
        while no_click_next == 0:
            # count += 1
            # if count > 1:
            #     browser.quit()
            #     break
            # レース結果のリンクWebページのURLを取得する
            link_list = self.make_race_url_list(browser, link_list)

            # 「次」をクリックし、「次」の有無とクリックした回数を返す
            no_click_next, count_click_next = self.click_next_btn(browser, count_click_next)

        # chromeを閉じる
        browser.quit()

        return link_list

    def fetch_race_results(self, link_list, year, month):
        """
        与えられたURLのリスト(link_list)を使用して、各レースの詳細結果を取得する。
        取得した結果はDataFrameに結合され、最終的にこの結合されたDataFrameが戻り値として返される。
        ただし、日本国外のレース(URLにアルファベットが含まれているもの)は無視される。
        :param list link_list: レース結果の詳細WebページへのURLを含むリスト。
        :param int year: 取得対象の年。
        :param str month: 取得対象の月。
        :return: 取得したレース結果のDataFrame。各レースの詳細情報を含む。
        :rtype: pd.DataFrame
        """
        # レース結果を保存するデータフレームを用意する
        race_results = pd.DataFrame()

        print('年:', year, '月:', month, 'のレース結果の詳細を取得します')
        for url in tqdm(link_list):
            if url.split('/')[-2].isdecimal():
                detail_race_results = self.get_detail_race_results(url)
                if isinstance(detail_race_results, str):
                    print('年:', year, '月:', month, 'のレース情報の取得に失敗しました:', url)
                else:
                    race_results = pd.concat([race_results, detail_race_results], axis=0)

        return race_results

    def scrape_race_details(self, year, month):
        """
        指定された年と月のレース詳細情報をスクレイピングする。
        netkeiba.comから指定された年と月のレース結果の詳細情報を取得し、CSVファイルとして保存する。
        レース結果のリンクWebページにアクセスし、日本のレース結果のみを取得する。
        :param int year: スクレイピングするレースの年。
        :param str month: スクレイピングするレースの月
        :return: None 処理を正常に完了するとNoneを返す。
        """
        # ランダム待ち
        # wait_for_random_time(1, 10)

        # chromeを起動する
        browser = initialize_browser()

        # 競馬データベースを開く
        browser.get('https://db.netkeiba.com/?pid=race_search_detail')
        browser.implicitly_wait(10)  # 指定した要素が見つかるまでの待ち時間を10秒と設定する

        print(f'{year}年{month}月の情報取得')
        self.search_race(browser, year, month)  # 検索条件を設定して検索する

        # レース詳細Webページのリンクを集める
        link_list = self.collect_race_links(browser)

        # レース結果のリンクWebページにアクセスして、レース結果を取得する
        race_results = self.fetch_race_results(link_list, year, month)

        # CSVにレース結果を保存する
        save_race_results(race_results, year, month)

    def search_race(self, browser, year, month):
        """
        指定された年と月に基づいてレースの検索を行う。
        netkeiba.comのレース検索Webページにアクセスし、与えられた年(year)と月(month)に該当するレース情報を検索する。
        検索条件を設定し、検索結果のWebページへ遷移する処理を担当する。
        :param webdriver.Chrome browser: 検索を行うためのブラウザオブジェクト。
        :param int year: 検索するレースの年。
        :param str month: 検索するレースの月。
        :return: 無し。
        """
        wait_for_random_time(2, 5)
        self.set_race_conditions(browser)
        self.set_search_period(browser, str(year), str(month))
        scroll_webpage(browser, 400)
        self.set_display_options(browser, str(100))
        self.submit_search(browser)

    def set_race_conditions(self, browser):
        """
        netkeiba.comのレース検索条件の競争種別で「芝」と「ダート」にチェックを入れる。
        :param webdriver.Chrome browser: 検索を行うためのブラウザオブジェクト。
        :return: 無し。
        """
        # 競争種別で「芝」と「ダート」にチェックを入れる
        elem_check_track_1 = browser.find_element(By.ID, value='check_track_1')
        elem_check_track_2 = browser.find_element(By.ID, value='check_track_2')

        elem_check_track_1.click()
        elem_check_track_2.click()

    def set_search_period(self, browser, year, month):
        """
        netkeiba.comのレース検索条件の期間で「年」と「月」にyearとmonthで指定した値を入れる。
        :param webdriver.Chrome browser: 検索を行うためのブラウザオブジェクト。
        :param str year: 検索するレースの年。
        :param str month: 検索するレースの月。
        :return: 無し。
        """
        # 期間を設定する
        elem_start_year = browser.find_element(By.NAME, value='start_year')
        elem_start_year_select = Select(elem_start_year)
        elem_start_year_select.select_by_value(year)

        # 月を指定する場合は、select_by_valueで月数を指定する
        elem_start_month = browser.find_element(By.NAME, value='start_mon')
        elem_start_month_select = Select(elem_start_month)
        elem_start_month_select.select_by_value(month)

        elem_end_year = browser.find_element(By.NAME, value='end_year')
        elem_end_year_select = Select(elem_end_year)
        elem_end_year_select.select_by_value(year)

        # 月を指定する場合は、select_by_valueで月数を指定する
        elem_end_month = browser.find_element(By.NAME, value='end_mon')
        elem_end_month_select = Select(elem_end_month)
        elem_end_month_select.select_by_value(month)

    def set_display_options(self, browser, num):
        """
        netkeiba.comのレース検索条件の表示件数にnumで指定した値を入れる。
        指定できる値は、20、50、100。
        :param webdriver.Chrome browser: 検索を行うためのブラウザオブジェクト。
        :param str num: 表示する項目の数。
        :return: 無し。
        """
        elem_list = browser.find_element(By.NAME, value='list')
        elem_list_select = Select(elem_list)
        elem_list_select.select_by_value(num)

    def submit_search(self, browser):
        """
        netkeiba.comのレース検索条件で検索をクリックする。
        :param webdriver.Chrome browser: 検索を行うためのブラウザオブジェクト。
        :return: 無し。
        """
        elem_search = browser.find_element(By.CLASS_NAME, value='search_detail_submit')
        wait_for_random_time(2, 5)
        elem_search.submit()

    def make_race_url_list(self, browser, link_list):
        """
        取得したHTMLからレース結果のURLを抽出し、URLリストを作成する。
        :param webdriver.Chrome browser: HTMLを取得するためのブラウザオブジェクト。
        :param list link_list: 収集したレース結果のリンクのURLリスト
        :return: 更新されたリンクのURLリスト
        :rtype: list
        """
        html_content = self.get_page_html(browser)
        soup = BeautifulSoup(html_content, 'html.parser')

        try:
            table_data = soup.find(class_='nk_tb_common')
            if not table_data:
                raise ValueError("テーブルデータが見つかりません")

            for link in table_data.find_all('a'):
                href = link.get('href')
                if self.is_race_link(href):
                    absolute_url = self.get_absolute_url(href)
                    link_list.append(absolute_url)

            return link_list

        except Exception as e:
            print(f"エラーが発生しました: {e}")
            return []

    def get_page_html(self, browser):
        """
        WebページのHTMLを取得する。
        :param webdriver.Chrome browser: HTMLを取得するためのブラウザオブジェクト。
        :return: 取得したページのHTMLコンテンツ
        :rtype: str
        """
        return browser.page_source.encode('utf-8')

    def get_absolute_url(self, relative_url):
        """
        相対URLから絶対URLを生成する。
        スクレイピング中に取得される相対URL(例: '/race/202010010811/')を
        完全なURL形式(例: 'https://db.netkeiba.com/race/202010010811/')に変換するために使用する。
        :param str relative_url: 変換する相対URL。
        :return: 生成された絶対URL。
        :rtype: str
        """
        return urllib.parse.urljoin('https://db.netkeiba.com', relative_url)

    def is_race_link(self, url):
        """
        与えられたURLがレース結果のリンクであるかどうかを判断する。
        スクレイピング中に取得されるURLがレース詳細のWebページに関連するものか、
        それとも他のWebページ(例: 馬や騎手のプロフィールWebページ)に関連するものかを識別するために使用される。

        URLに特定のキーワード(例:「horse」や「jockey」など)が含まれていないことを確認し、
        さらにURLがJavaScriptリンク(javascriptを含む)でないことも確認する。
        このチェックを通過したURLは、レース結果に関連するリンクとして扱われる。
        :param str url: 判定するURL。
        :return: URLがレース結果のリンクであればTrue、そうでなければFalse
        :rtype: bool
        """
        if 'javascript' in url:
            return False

        excluded_words = ['horse', 'jockey', 'result', 'sum', 'list', 'movie']
        return not any(word in url for word in excluded_words)

    def click_next_btn(self, browser, count_click_next):
        """
        画面下にスクロールして「次」をクリックする。
        :param webdriver.Chrome browser: 「次」をクリックするためのブラウザオブジェクト。
        :param int count_click_next: 「次へ」をクリックした回数をカウントする。
        :return: 次のWebページが存在しない場合は1、存在する場合は0と、次のWebページに遷移した後の「次へ」ボタンのクリック回数をtupleで返す。
        :rtype: tuple
        """
        # 画面を下にスクロールする
        wait_for_random_time(1, 3)
        scroll_webpage(browser, 2000)
        wait_for_random_time(1, 3)

        # 次をクリックする
        # 検索1Webページ目のxpathは2Webページ以降とは異なるため、count_click_nextで検索1Webページ目なのかを判定している
        if count_click_next == 0:
            xpath = '//*[@id="contents_liquid"]/div[2]/ul[1]/li[14]/a/span/span'
            elem_search = browser.find_element(By.XPATH, value=xpath)
            elem_search.click()
            no_click_next = 0
        else:
            # 検索最後のWebページで次をクリックしようとすると例外処理が発生する
            # exceptで例外処理を取得し、no_click_nextに1を代入する
            try:
                xpath = '//*[@id="contents_liquid"]/div[2]/ul[1]/li[14]/a/span/span'
                elem_search = browser.find_element(By.XPATH, value=xpath)
                wait_for_random_time(2, 5)
                elem_search.click()
                no_click_next = 0
            except:
                print('次のWebページ無し')
                no_click_next = 1

        count_click_next += 1  # Webページ数を判別するためのフラグに1を加算する

        return no_click_next, count_click_next

    def get_race_page_html(self, url):
        """
        指定されたURLのWebページからHTMLを取得する。
        :param str url: HTMLを取得するWebページのURL。
        :return: WebページのHTML。
        :rtype: BeautifulSoup or None
        """
        # 指定したURLからデータを取得する
        res = self.retry_request(url, max_retries=5, retry_delay=2)
        if not res:
            print("リクエスト失敗:", url)
            return None

        res.encoding = res.apparent_encoding  # resに含まれるテキストから文字コードを自動設定する
        soup = BeautifulSoup(res.text, 'lxml')  # content形式で取得したデータをhtml形式で分割する

        return soup

    def prepare_dataframe_to_save_race_results(self, soup):
        """
        スクレイピングしたHTMLからレース結果をデータフレームに整形する。
        :param BeautifulSoup soup: レース結果のWebページのHTMLコンテンツを解析したBeautifulSoupオブジェクト。
        :return: レース結果が整形されたデータフレーム。
        :rtype: pd.DataFrame
        """
        # tableデータを抽出する
        tables = soup.find('table', attrs={'class': 'race_table_01'})
        tables = tables.find_all('tr')

        # レース情報/結果を取得する
        race_results = pd.DataFrame()

        for table in tables[1:]:
            race_results_row = table.text.split('\n')
            race_results_row = pd.Series(race_results_row)
            race_results = pd.concat([race_results, race_results_row], axis=1)

        # 学習に必要な情報のみを抽出する
        # 着順:要素No.1、馬名:要素No.5、性齢:要素No.7、斤量:要素No.8、騎手:要素No.10、
        # タイム:要素No.12、上り:要素No.21、単勝:要素No.23、人気:要素No.24、馬体重:要素No.25の情報を抽出する
        race_results_extracted_columns = pd.DataFrame()

        # レース結果のテーブルが異なることがあるので、必要なカラム列の番号をそれぞれで用意する
        columns_list1 = [1, 5, 7, 8, 10, 12, 21, 23, 24, 25]
        columns_list2 = [1, 5, 7, 8, 10, 12, 18, 20, 21, 22]

        # テーブルの状態次第で必要なカラム列の番号を変える
        # 17列は「通過」の情報が記録されているが、テーブルによって'**'となっている場合となっていない場合の2種類のテーブルが存在している
        # 17列が'**'となっている場合は、カラム列はcolumns_list1を使用する
        speed_index = race_results.iloc[17]
        if speed_index.values[0] == '**':
            for i in columns_list1:
                extracted_columns = race_results.iloc[i]
                race_results_extracted_columns = pd.concat([race_results_extracted_columns,
                                                            extracted_columns], axis=1)
        else:
            for i in columns_list2:
                extracted_columns = race_results.iloc[i]
                race_results_extracted_columns = pd.concat([race_results_extracted_columns,
                                                            extracted_columns], axis=1)

        # カラム名を設定する
        columns_list = ['着順', '馬名', '性齢', '斤量', '騎手', 'タイム', '上り', '単勝', '人気', '馬体重']
        columns = pd.Series(columns_list)
        race_results_extracted_columns.columns = columns  # index名を設定する

        return race_results_extracted_columns

    def format_race_results_table(self, race_results_extracted_columns, soup):
        """
        スクレイピングしたHTMLから取得したレース結果データを整形し、保存用のデータフレームを作成する。
        抽出されたレース結果データ(pandas.DataFrame)と、レース情報が含まれるBeautifulSoupオブジェクトを受け取り、
        必要な追加情報(距離、天候、馬場状態など)をデータフレームに組み込んで最終的な形式を整える。
        :param pd.DataFrame race_results_extracted_columns: 抽出されたレース結果のデータフレーム
        :param BeautifulSoup soup: レースWebページのHTMLコンテンツを解析したBeautifulSoupオブジェクト
        :return: 整形されたレース結果のデータフレーム
        :rtype: pd.DataFrame
        """
        # 整形したレース情報を保存するデータフレームを用意する
        formatted_race_results = pd.DataFrame()

        try:
            male_horse_pedigree_list, female_horse_pedigree_list = self.get_horse_parents_name(soup)
            # 距離、天候、馬場、状態、開催日、レース名、開催場所、ラウンド、父親、母親の列を追加する
            race_results_extracted_columns['距離'] = self.get_race_distance(soup)
            race_results_extracted_columns['天候'] = self.get_race_weather(soup)[1]
            race_results_extracted_columns['馬場'] = self.get_race_condition(soup)[0]
            race_results_extracted_columns['状態'] = self.get_race_condition(soup)[1]
            race_results_extracted_columns['開催日'] = self.get_race_date(soup)
            race_results_extracted_columns['レース名'] = self.get_race_name(soup)
            race_results_extracted_columns['開催場所'] = self.get_race_place(soup)
            race_results_extracted_columns['ラウンド'] = self.get_race_round(soup)
            race_results_extracted_columns['父親'] = male_horse_pedigree_list
            race_results_extracted_columns['母親'] = female_horse_pedigree_list

            # 説明変数として使用しない列を削除する
            race_results_extracted_columns.drop(['着順', '上り'], axis=1, inplace=True)

            # 目的変数にするタイムを1列に変更する
            formatted_race_results = race_results_extracted_columns.reindex(columns=['タイム', '馬名', '父親', '母親',
                                                                                     '性齢', '斤量', '騎手', '単勝',
                                                                                     '人気', '馬体重', '距離', '天候',
                                                                                     '馬場', '状態', '開催日',
                                                                                     'レース名',
                                                                                     '開催場所', 'ラウンド'])
        except:
            print('レース情報を取得できませんでした')

        return formatted_race_results

    def get_detail_race_results(self, url):
        """
        指定されたURLからレースの詳細結果を取得し、整形されたデータフレームを返す。
        レース詳細のWebページのURLを受け取り、そのWebページからレース結果のデータをスクレイピングする。
        スクレイピングされたデータはまず `prepare_dataframe_to_save_race_results`によって整形され、
        その後 `format_race_results_table`でさらに詳細な情報が追加される。
        最終的に整形されたデータフレームが戻り値として返される。
        :param str url: レースの詳細WebページのURL。
        :return: 整形されたレース結果のデータフレーム。
        :rtype: pd.DataFrame
        """
        # 指定したurlのWebページからHTMLを取得する
        soup = self.get_race_page_html(url)

        # スクレイピングしたHTMLからレース結果をデータフレームに整形する
        race_results_extracted_columns = self.prepare_dataframe_to_save_race_results(soup)

        # スクレイピングしたHTMLから取得したレース結果データを整形し、保存用のデータフレームを作成する。
        formatted_race_results_table = self.format_race_results_table(race_results_extracted_columns, soup)

        return formatted_race_results_table

    def get_horse_parents_name(self, soup):
        """
        出走馬の血統情報(父馬と母馬の名前)を取得する。
        BeautifulSoupオブジェクトとして渡されたHTMLから、出走馬の血統情報を抽出するために使用する。
        各出走馬の詳細ページへのリンクをたどり、それぞれの馬の父馬と母馬の名前を取得してリストに格納する。
        :param BeautifulSoup soup:レース情報のHTMLを解析したBeautifulSoupオブジェクト
        :return: 各出走馬の父馬と母馬の名前のリストを含むタプル((父馬のリスト, 母馬のリスト))
        :rtype: tuple
        """
        race_table_data = soup.find(class_='race_table_01 nk_tb_common')  # 検索結果のテーブルを取得する
        male_horse_pedigree_list = []  # 各出走馬の父親の名前を保持するリスト
        female_horse_pedigree_list = []  # 各出走馬の母親の名前を保持するリスト

        link_url = self.get_horse_links(race_table_data)
        for link in link_url:
            soup_blood_table = self.get_blood_table(link)
            male_horse_pedigree_list.append(self.get_horse_father_name(soup_blood_table))
            female_horse_pedigree_list.append(self.get_horse_mother_name(soup_blood_table))

        return male_horse_pedigree_list, female_horse_pedigree_list

    def get_horse_links(self, race_table_data):
        """
        レースの結果テーブルから出走馬の詳細Webページへのリンクを取得する。
        :param BeautifulSoup race_table_data: レース結果テーブルのHTMLコンテンツを解析したBeautifulSoupオブジェクト。
        :return: 出走馬の詳細Webページへのリンクのリスト。
        :rtype: list
        """
        link_url = []
        href_list = race_table_data.find_all('a')
        for href_link in href_list:
            if '/horse/' in href_link.get('href'):
                # 出走馬情報のリンクWebページを作成する
                link_url.append(self.get_absolute_url(href_link.get('href')))

        return link_url

    def get_blood_table(self, link):
        """
        指定されたリンクから馬の血統情報を含むHTMLテーブルを取得する。
        :param str link: 馬の詳細ページへのURLリンク。
        :return: 馬の血統情報を含むHTMLテーブルのBeautifulSoupオブジェクト。
        :rtype: BeautifulSoup or None
        """
        res = self.retry_request(link)
        if res is not None:
            res.encoding = 'EUC-JP'
            soup_blood_table = BeautifulSoup(res.text, 'lxml')
        else:
            print('リクエスト失敗:', link)
            return None

        return soup_blood_table

    def get_horse_father_name(self, soup_blood_table):
        """
        BeautifulSoupオブジェクトから馬の父親の名前を取得する。
        :param BeautifulSoup soup_blood_table: 血統情報を含むHTMLテーブルのBeautifulSoupオブジェクト。
        :return: 馬の父親の名前。
        :rtype: str or None
        """
        elements_b_ml = soup_blood_table.find_all(class_='b_ml')
        if len(elements_b_ml) == 0:
            print('父親の名前の取得に失敗しました。')
            return None

        return elements_b_ml[0].text.replace('\n', '')

    def get_horse_mother_name(self, soup_blood_table):
        """
        BeautifulSoupオブジェクトから馬の母親の名前を取得する。
        :param BeautifulSoup soup_blood_table: 血統情報を含むHTMLテーブルのBeautifulSoupオブジェクト
        :return: 馬の母親の名前。
        :rtype: str or None
        """
        elements_b_fml = soup_blood_table.find_all(class_='b_fml')
        if len(elements_b_fml) == 0:
            print('父親の名前の取得に失敗しました。')
            return None

        return elements_b_fml[0].text.replace('\n', '')

    def retry_request(self, url, max_retries=3, retry_delay=1):
        """
        指定されたURLに対してリクエストを行い、必要に応じてリトライを実行する。
        :param str url: リクエストを送信するURL。
        :param int max_retries: リクエストの最大リトライ回数。デフォルトは3回。
        :param int retry_delay: リトライの間隔(秒)。デフォルトは1秒。
        :return: 成功した場合はリクエストのレスポンス、失敗した場合はNoneを返す。
        :rtype: requests.Response or None
        """
        retries = 0
        while retries < max_retries:
            try:
                res = requests.get(url)
                return res
            except requests.exceptions.RequestException:
                # リクエストエラーが発生した場合はリトライする
                retries += 1
                time.sleep(retry_delay)
        return None

    def get_race_name(self, soup):
        """
        指定されたBeautifulSoupオブジェクトからレース名を取得する。
        :param BeautifulSoup soup: レース情報が含まれているWebページのHTMLコンテンツを解析したBeautifulSoupオブジェクト。
        :return: レース名を文字列で返す。レース名が見つからない場合は空文字を返す。
        :rtype: str
        """
        race_name = soup.find_all('h1')
        race_name = race_name[1].text

        return race_name

    def get_race_round(self, soup):
        """
        指定されたBeautifulSoupオブジェクトからレースのラウンド数を取得する。
        :param BeautifulSoup soup: レース情報が含まれているWebページのHTMLコンテンツを解析したBeautifulSoupオブジェクト。
        :return: レースのラウンド数を文字列で返す。ラウンド数が見つからない場合は空文字を返す。
        :rtype: str
        """
        race_round = soup.find('dl', attrs={'class': 'racedata fc'})
        race_round = race_round.find_all('dt')
        race_round = race_round[0].text
        race_round = race_round.replace('\n', '')

        return race_round

    def get_race_date(self, soup):
        """
        指定されたBeautifulSoupオブジェクトからレースの開催日を取得する。
        :param BeautifulSoup soup: レース情報が含まれているWebページのHTMLコンテンツを解析したBeautifulSoupオブジェクト。
        :return: レースの開催日を文字列で返す。開催日が見つからない場合は空文字を返す。
        :rtype: str
        """
        # 開催日を取得する
        race_base_info = soup.find('p', attrs={'class': 'smalltxt'})
        rase_base_info_text = race_base_info.text.replace(u'\xa0', u' ')
        words = rase_base_info_text.split(' ')
        race_date = words[0]

        return race_date

    def get_race_place(self, soup):
        """
        指定されたBeautifulSoupオブジェクトからレースの開催場所を取得する。
        :param BeautifulSoup soup: レース情報が含まれているWebページのHTMLコンテンツを解析したBeautifulSoupオブジェクト。
        :return: レースの開催場所を文字列で返す。開催場所が見つからない場合は空文字を返す。
        :rtype: str
        """
        race_base_info = soup.find('p', attrs={'class': 'smalltxt'})
        rase_base_info_text = race_base_info.text.replace(u'\xa0', u' ')
        words = rase_base_info_text.split(' ')
        race_place = words[1]

        return race_place

    def get_race_distance(self, soup):
        """
        指定されたBeautifulSoupオブジェクトからレースの距離を取得する。
        :param BeautifulSoup soup: レース情報が含まれているWebページのHTMLコンテンツを解析したBeautifulSoupオブジェクト。
        :return: レースの距離を整数値で返す。距離が見つからない場合はNoneを返す。
        :rtype: int or None
        """
        race_info = soup.find('diary_snap_cut')
        race_info_text = race_info.text.replace(u'\xa0', u' ')  # ノーブレークスペースを削除
        race_info_text = race_info.text.replace(u'\xa5', u' ')  # Webページの文字参照を削除
        words = race_info_text.split('/')

        words[0] = re.sub(r'2周', '', words[0])  # レース情報の'2周'を空文字に置き換える
        race_distance = int(re.sub(r'\D', '', words[0]))  # レース距離だけを取り出してint型で保存する

        return race_distance

    def get_race_weather(self, soup):
        """
        指定されたBeautifulSoupオブジェクトからレースの開催時の天候を取得する。
        :param BeautifulSoup soup: レース情報が含まれているWebページのHTMLコンテンツを解析したBeautifulSoupオブジェクト。
        :return: レースの開催時の天候を文字列で返す。天候が見つからない場合は空文字を返す。
        :rtype: str
        """
        race_info = soup.find('diary_snap_cut')
        race_info_text = race_info.text.replace(u'\xa0', u' ')  # ノーブレークスペースを削除
        race_info_text = race_info.text.replace(u'\xa5', u' ')  # Webページの文字参照を削除
        words = race_info_text.split('/')
        race_weather = words[1].split(':')

        return race_weather

    def get_race_condition(self, soup):
        """
        指定されたBeautifulSoupオブジェクトからレースの開催時の馬場状態を取得する。
        :param BeautifulSoup soup: レース情報が含まれているWebページのHTMLコンテンツを解析したBeautifulSoupオブジェクト。
        :return: レースの開催時の馬場状態を文字列で返す。馬場状態が見つからない場合は空文字を返す。
        :rtype: str
        """
        race_info = soup.find('diary_snap_cut')
        race_info_text = race_info.text.replace(u'\xa0', u' ')  # ノーブレークスペースを削除
        race_info_text = race_info.text.replace(u'\xa5', u' ')  # Webページの文字参照を削除
        words = race_info_text.split('/')
        race_condition = words[2].split(':')

        return race_condition

    def combine_race_result(self, year, end_month, race_results_path):
        """
        指定された年と月の範囲におけるレース結果のCSVファイルを結合する。
        :param int year: 結合するレース結果の年。
        :param int end_month: 結合するレース結果の終了月。
        :param str race_results_path: レース結果のCSVファイルが保存されているパス。
        :return: 結合されたレース結果のデータフレーム。
        :rtype: pandas.DataFrame
        """
        # 取得したレース結果を結合するためのデータフレーム
        combined_data = pd.DataFrame()
        for i in range(end_month, 0, -1):
            race_results_csv_path = f"{race_results_path}{year}_{i}_raceresults.csv"
            is_csv_file = os.path.isfile(race_results_csv_path)
            if is_csv_file:
                # df_read_csv = pd.read_csv(race_results_csv_path, header=None, encoding='shift-jis')
                df_read_csv = pd.read_csv(race_results_csv_path, header=None, encoding='cp932')
                combined_data = pd.concat([combined_data, df_read_csv], ignore_index=True)

            # 結合したCSVファイルは削除する
            os.remove(race_results_csv_path)

        return combined_data


if __name__ == "__main__":
    # RaceResultsScraperクラスのインスタンスを作成する
    scraper = RaceResultsScraper()

    # 指定された開始年と終了年に基づいて年のリストを生成する
    year_list = create_year_range(scraper.start_year, scraper.end_year, scraper.current_year)

    # 各年の各月のレース結果を取得する
    for year in year_list:
        year_month_pairs_result = scraper.generate_year_month_pairs(year)

        # 戻り値がNoneでないことを確認
        if year_month_pairs_result is None:
            print(f"年月のペアを生成できませんでした: {year}")
            break
        else:
            year_month_pairs, end_month = year_month_pairs_result

        with Pool(scraper.pool_size) as pool:
            pool.starmap(scraper.scrape_race_details, year_month_pairs)

        # 取得したレース結果のパス
        race_results_path = './data/raceresults/'

        # 取得したレース結果を年ごとに結合する
        combined_data = scraper.combine_race_result(year, end_month, race_results_path)

        # 結合したデータをCSVファイルに保存する
        csv_combined_data_path = race_results_path + str(year) + '_1_' + str(year) + '_12_raceresults.csv'
        combined_data.to_csv(csv_combined_data_path, encoding='cp932', header=False, index=False, errors='ignore')

        # プログラムの実行時間を表示する
        run_time = time.time() - scraper.start_time
        print('実行時間:{:.2f}秒'.format(run_time))

コメント

タイトルとURLをコピーしました