競馬予測AIの作成⑲(順位予測処理の改修)

Python

はじめに

競馬予測システムにおける順位予測を行うプログラムを改修しました。
主な変更点は、保守性と可読性を向上させるために下記を実施しました。

  1. 関数ベースのプログラムからクラスベースに変更しました。
  2. 名前から用途が判るようにメソッド名と変数名を変更しました。
  3. コメントのスタイルをreStructuredTextスタイルに変更しました。
  4. 同じような処理はメソッドにまとめました。

参考にした書籍

bookfan 1号店 楽天市場店
¥3,190 (2024/09/07 23:02時点 | 楽天市場調べ)

プログラムの動作概要

競馬レースの情報を取得して予測モデルを使用してレースの順位を予測し、その結果をCSVファイルに保存するとともに、Twitterに上位3位の情報をツイートすることを目的としています。

プログラムの主な機能

プログラムの主な機能は以下の通りです。

No. 機能 説明
1 ロガー設定 ロガーの設定を行い、ログファイルを生成します。
2 レース情報の読み込み 指定されたCSVファイルからレース情報を読み込みます。
3 レース情報のエンコード レース情報をエンコードし、予測モデル用のデータ形式に変換します。
4 予測タイムの生成 予測モデルを使用して、各レースの予測タイムを生成します。
5 予測順位の追加 予測タイムに基づいて、各レースの予測順位を追加します。
6 ツイッターメッセージの保存 予測結果のトップ3をツイッターメッセージとして保存します。
7 CSVファイルへの出力 予測結果をCSVファイルに出力します。
8 天気と馬場状態の取得 指定されたURLから天気と馬場状態を取得します。
9 レース情報の保存 天気と馬場状態を追加したレース情報をCSVファイルとして保存します。
10 ツイート送信 ツイッターメッセージを読み込み、ツイートを送信します。
11 レース予測処理の実行 指定されたレース情報に対して予測処理を実行します。

プログラムで生成されるデータ

生成されるデータは以下の通りです。

No. データの種類 内容 ファイルパス例
1 ログファイル プログラムの実行ステータス、エラーメッセージ、処理時間など ./log/data/debug.log
2 レース予測結果のCSVファイル 各レースの予測順位と予測タイムを含むレース情報 ./data/predicted/predict_result_<location>_<timestamp>.csv
3 ツイートメッセージのテキストファイル 予測されたレースの上位3位の情報を含むツイート用メッセージ ./data/twitter_message/twitter_message_<location>.txt
4 レース情報のCSVファイル 天気と馬場状態を含むレース情報 ./data/races_info_<location_jp>.csv

プログラムのユーザインターフェイス

GUIのユーザインターフェイスは用意していません。

プログラムの構成

以下の主なモジュールから構成されます。

No. モジュール 概要
1 LoggerSetupクラス ログ設定を行うためのクラス
2 RacePredictionクラス レース予測を行うクラス
3 RacePredictionProcessingクラス レース予測処理全体を管理するクラス

プログラムのフローチャート

<補足>

  • 茶色:最初に実行される箇所
  • マルチプロセスで動作するので最初に実行される箇所は2箇所となります。
  • 緑色:他の関数やメソッドを呼び出さない
  • 灰色:通常の関数、メソッド
  • 関数名、メソッド名の前の数字はプログラム内の行数

プログラムの関数とメソッドの概要

No. クラス 関数/メソッド名 行数 概要
1 LoggerSetup setup_logger 36 ロガーを設定する。ログ設定ファイルのパスとログファイルのパスを指定。
2 RacePrediction init 53 RacePredictionクラスの初期化。モデルやエンコーダの読み込み、保存パスの設定などを行う。
3 load_race_info 88 CSVファイルからレース情報を読み込み、データフレームに変換する。
4 encode_race_info 99 レース情報をエンコードし、DMatrixに変換する。
5 predict_time_from_race_info 116 レース情報からタイムを予測し、予測結果をデータフレームに追加する。
6 labelencode_race_location_round 130 レース名、開催場所、ラウンドをラベルエンコーディングする。
7 predict_ranking 143 レースの順位を予測する。
8 set_path_for_twitter_message 155 開催場所ごとのツイッターメッセージ保存パスを設定する。
9 assign_prediction_ranks 168 タイムでソートして予測順位を追加する。
10 save_twitter_message 180 トップ3の予測結果をツイッターメッセージとして保存する。
11 predict_top3_ranks 194 順位予測を行い1~3位を抽出する。
12 remove_unnecessary_columns 232 不要なカラムの削除を行う。
13 sort_columns 245 カラムのソートを行う。
14 process_race_predictions 257 レース予測の処理を行う。
15 append_predicted_top3_ranks_to_csv 274 予測順位をCSVファイルに追記する。
16 run 290 レース予測処理を実行する。
17 RacePredictionProcessing init 308 RacePredictionProcessingクラスの初期化。
18 load_config 344 設定ファイルを読み込む。
19 load_today_cell_time_info 355 当日の出走時刻を読み込む。
20 preparing_raceinfo_data 367 レース情報のHTMLデータを取得し、パースする。
21 extract_list_from_df 390 データフレームから特定のパターンに一致するリストを抽出する。
22 extract_text 418 HTML要素からテキストを抽出する。
23 get_race_info 458 レース情報を取得する。
24 get_weather_condition_info 533 天気と馬場の情報を取得する。
25 set_weather_condition 579 天気と馬場の情報をレース情報のデータフレームに設定する。
26 reindex_df 601 データフレームの列を並び替える。
27 save_race_info 621 レース情報を保存する。
28 run_prediction_processing 643 レース予測処理を実行する。
29 tweet_predicted_rankings 678 予測した順位をツイートする。
30 tweet 699 ツイートを投稿する。
31 process_race_row 729 各レース行に対する処理を実行する。
32 run_prediction 763 レース予測処理を実行する。
33 main (エントリーポイント) main 処理 787 RacePredictionProcessingクラスのインスタンスを作成し、run_predictionメソッドを呼び出す。

プログラムの実行環境

本プログラムは、Pythonのバージョン3.12.2で実行できることを確認しています。

本プログラムを実行するために必要なパッケージをインストールします。
Pythonの仮想環境を起動し、pipコマンドで下記のパッケージをインストールしてください。

(venv)> pip install pandas
(venv)> pip install configparser
(venv)> pip install requests
(venv)> pip install tweepy
(venv)> pip install joblib
(venv)> pip install xgboost
(venv)> pip install beautifulsoup4
(venv)> pip install scikit-learn

プログラムの実行

本プログラムは競馬予測AIのmain.pyから呼び出されます。main.pyからの呼び出し方法は次回以降の記事で掲載する予定です。

プログラムの全コード

# 競馬レースの情報を取得し、予測モデルを使用してレースの順位を予測し、その結果をツイートする
# 呼び出し元:
#   main.py
# 実行前の設定:
#   configに必須情報を登録しておく
# 実行方法:
#   RacePredictionProcessingクラスのインスタンスを作成し、run_predictionメソッドを呼び出す
# 出力内容:
#   下記の3種類のファイルを出力する
#   1.ログファイル: プログラムの実行ステータス、エラーメッセージ、処理時間を記録
#   2.レース予測結果のCSVファイル:各レースの予測順位と予測タイムを含むレース情報を記録
#   3.ツイートメッセージ:ツイートするメッセージを保尊する。メッセージには予測されたレースの上位3位の情報が含まれる


import os
import datetime
import time
import pandas as pd
import configparser
import logging.config
import requests
import re
import urllib
import tweepy
import joblib
import xgboost as xgb
import itertools

from bs4 import BeautifulSoup
from datetime import datetime as dt

# ラベルエンコードのライブラリ
from sklearn.preprocessing import LabelEncoder


class LoggerSetup:
    @staticmethod
    def setup_logger(log_config_path, log_file_path):
        """
        loggerを設定する
        :param str log_config_path: ログ設定ファイルのパス
        :param str log_file_path: ログファイルのパス
        :return: 設定されたlogger
        :rtype: logging.Logger
        """
        os.makedirs(os.path.dirname(log_file_path), exist_ok=True)
        logging.config.fileConfig(log_config_path)

        return logging.getLogger(__name__)


class RacePrediction:
    def __init__(self, location, csv_file, config, logger):
        """
        RacePredictionクラスの初期化
        :param str location:  開催場所
        :param str csv_file:  レース情報のCSVファイルパス
        :param configparser.ConfigParser config: 設定情報
        :param logging.Logger logger:  logger
        """
        self.location = location
        self.csv_file = csv_file
        self.config = config
        self.logger = logger

        # モデルのパスを初期化
        self.model_dir = self.config['settings']['model_path']
        self.model_path = os.path.join(self.model_dir, 'gbm.pkl')
        self.enc_model_path = os.path.join(self.model_dir, 'oe_x.pkl')

        # 作成したモデルを読み込む
        self.ranking_model = joblib.load(self.model_path)

        # エンコーダモデルを読み込む
        self.encoder_model = joblib.load(self.enc_model_path)

        # ラベルエンコーダを初期化
        self.label_encoder = LabelEncoder()

        # 予測結果の保存パスを初期化
        self.predicted_path = self.config['settings']['predicted_path']
        os.makedirs(self.predicted_path, exist_ok=True)

        # ツイッターメッセージの保存パスを初期化
        self.twitter_message_dir = self.config['twitter']['message_path']
        os.makedirs(self.twitter_message_dir, exist_ok=True)

    def load_race_info(self):
        """
        レース情報をCSVファイルから読み込む
        :return: 読み込んだレース情報のデータフレーム
        :rtype: pd.DataFrame
        """
        df_race_info = pd.read_csv(self.csv_file, encoding='cp932')
        df_race_info['date'] = pd.to_datetime(df_race_info['date'], format='%Y年%m月%d日')

        return df_race_info

    def encode_race_info(self, df_race_info_add_predict):
        """
        レース情報をエンコードし、DMatrixに変換する
        :param pd.DataFrame df_race_info_add_predict: エンコード対象のレース情報のデータフレーム
        :return: DMatrixに変換されたレース情報
        :rtype: xgb.DMatrix
        """
        # 順位予測を行うためにレース情報をエンコードする
        encoded_race_info = self.encoder_model.fit_transform(df_race_info_add_predict)  # Dataframeからndarrayに変わるとカラムがなくなり、
        encoded_race_info = pd.DataFrame(encoded_race_info)  # 特徴量分析で特徴量名がf0,f1,f2,,となるのでndarrayからDataframeに戻す
        encoded_race_info.columns = list(df_race_info_add_predict.columns.values)  # カラムを付ける

        # データをdmatrixに変換する
        dmatrix_race_info = xgb.DMatrix(encoded_race_info)

        return dmatrix_race_info

    def predict_time_from_race_info(self, dmatrix_race_info, df_race_info_add_predict):
        """
        レース情報からタイムを予測する
        :param xgb.DMatrix dmatrix_race_info: DMatrixに変換したレース情報
        :param pd.DataFrame df_race_info_add_predict: 予測対象のレース情報のデータフレーム
        :return: 予測したタイムを追加したレース情報のデータフレーム
        :rtype: pd.DataFrame
        """
        # タイムを予測し、予測結果を予測対象レース情報に追加する
        predict_time = self.ranking_model.predict(dmatrix_race_info)
        df_race_info_add_predict['predict_time'] = predict_time

        return df_race_info_add_predict

    def labelencode_race_location_round(self, df):
        """
        レース名と開催場所、ラウンドのラベルエンコーディングを実施する
        :param pd.DataFrame df: ラベルエンコーディング対象のレース情報のデータフレーム
        :return: ラベルエンコードしたレース情報のデータフレーム
        :rtype: pd.DataFrame
        """
        df['race_name_encoded'] = self.label_encoder.fit_transform(df['race_name'].values)
        df['location_encoded'] = self.label_encoder.fit_transform(df['location'].values)
        df['round_encoded'] = self.label_encoder.fit_transform(df['round'].values)

        return df

    def predict_ranking(self, df_race_info):
        """
        レースの順位を予測する
        :param pd.DataFrame df_race_info: 予測対象のレース情報のデータフレーム
        :return: 予測結果を含むレース情報のデータフレーム
        :rtype: pd.DataFrame
        """
        dmatrix_race_info = self.encode_race_info(df_race_info)
        df_race_info = self.predict_time_from_race_info(dmatrix_race_info, df_race_info)

        return self.labelencode_race_location_round(df_race_info)

    def set_path_for_twitter_message(self, location):
        """
        開催場所ごとのツイッター用メッセージを保存するパスを設定する
        :param str location: 開催場所
        :return: 開催場所ごとのツイッター用メッセージのパス
        :rtype: str
        """
        twitter_message_path = os.path.join(self.twitter_message_dir, f'twitter_message_{location}.txt')
        if os.path.isfile(twitter_message_path):
            os.remove(twitter_message_path)

        return twitter_message_path

    def assign_prediction_ranks(self, df):
        """
        タイムでソートして予測順位を追加する
        :param pd.DataFrame df: 予測対象のレース情報のデータフレーム
        :return: 予測順位を追加したレース情報のデータフレーム
        :rtype: pd.DataFrame
        """
        df_sort_predict = df.sort_values('predict_time')
        df_sort_predict['predict_rank'] = range(1, len(df_sort_predict.index) + 1)

        return df_sort_predict

    def save_twitter_message(self, twitter_message_path, df_top3):
        """
        ツイッター用メッセージを保存する
        :param str twitter_message_path: 開催場所ごとのメッセージ保存テキストのパス
        :param pd.DataFrame df_top3: トップ3の予測結果を含むデータフレーム
        :return: 無し
        :rtype: None
        """
        with open(twitter_message_path, mode='a') as f:
            f.write('■' + str(df_top3.iloc[0, 15]) + ' ' + str(df_top3.iloc[0, 14]) + ' '
                    + str(df_top3.iloc[0, 16]) + '\n')
            for rank in range(3):
                f.write(f'{rank + 1}. {df_top3.iloc[rank, 0]}\n')

    def predict_top3_ranks(self, race_encoded_unique_list, df, twitter_message_path):
        """
        順位予測を行い1~3位を抽出する
        :param list race_encoded_unique_list: ユニークなエンコード済みレース名のリスト
        :param pd.DataFrame df: エンコード済みのレース情報のデータフレーム
        :param str twitter_message_path: ツイッター用メッセージの保存パス
        :return: トップ3の予測結果を含むデータフレーム
        :rtype: pd.DataFrame
        """
        # 予測順位を保存するデータフレームを用意する
        df_predict_eval = pd.DataFrame()

        # ユニークな開催場所、ラウンドを抽出する
        location_encoded_list = df['location_encoded'].unique()
        round_encoded_list = df['round_encoded'].unique()

        for i, j, k in itertools.product(race_encoded_unique_list, location_encoded_list, round_encoded_list):
            df_race = df[df['race_name_encoded'] == i]
            df_race_location = df_race[df_race['location_encoded'] == j]
            df_race_location_round = df_race_location[df_race_location['round_encoded'] == k]

            # 該当するデータの有無を確認し、該当データがなければ予測順位の追加処理を飛ばす
            if df_race_location_round.empty:
                continue

            # 予測順位を追加する
            df_race_location_round_assign_rank = self.assign_prediction_ranks(df_race_location_round)

            # 予測した1位,2位,3位を抽出する
            df_top3 = df_race_location_round_assign_rank[df_race_location_round_assign_rank['predict_rank'] < 4]
            df_predict_eval = pd.concat([df_predict_eval, df_top3])

            # 予測した1位,2位,3位をツイッター用メッセージとして保存する
            if twitter_message_path != 'no_message_needed':
                self.save_twitter_message(twitter_message_path, df_top3)

        return df_predict_eval

    def remove_unnecessary_columns(self, df_predict_eval):
        """
        不要なカラムの削除とソートを行う
        :param pd.DataFrame df_predict_eval: 予測順位のデータフレーム
        :return: 不要なカラムを削除した予測順位のデータフレーム
        :rtype: pd.DataFrame
        """
        # 列の並び替えと不要な情報を削除する
        unnecessary_columns = ['race_name_encoded', 'location_encoded', 'round_encoded', 'age', 'rider_weight',
                               'rider', 'horse_weight', 'weather', 'ground', 'condition']

        return df_predict_eval.drop(unnecessary_columns, axis=1)

    def sort_columns(self, df_predict_eval):
        """
        ソートを行う
        :param pd.DataFrame df_predict_eval: 不要なカラムを削除した予測順位のデータフレーム
        :return: ソートした予測順位のデータフレーム
        :rtype: pd.DataFrame
        """
        sorted_columns = ['predict_rank', 'predict_time', 'horse', 'odds', 'popular',
                          'distance', 'date', 'race_name', 'location', 'round']

        return df_predict_eval.reindex(columns=sorted_columns)

    def process_race_predictions(self, df_race_info):
        """
        レース予測の処理を行う
        :param pd.DataFrame df_race_info: レース情報のデータフレーム
        :return: 予測結果を含むデータフレーム
        :rtype: pd.DataFrame
        """
        df_race_info = self.predict_ranking(df_race_info).dropna()
        race_encoded_unique_list = df_race_info['race_name_encoded'].unique()
        self.logger.info(f'race_encoded_unique_list: {" ".join(map(str, race_encoded_unique_list))}')

        twitter_message_path = self.set_path_for_twitter_message(self.location)
        df_predict_eval = self.predict_top3_ranks(race_encoded_unique_list, df_race_info, twitter_message_path)
        df_predict_eval = self.remove_unnecessary_columns(df_predict_eval)

        return self.sort_columns(df_predict_eval)

    def append_predicted_top3_ranks_to_csv(self, df_predict_eval):
        """
        ラウンドごとの1~3位の予測順位をCSVファイルに追記する
        :param pd.DataFrame df_predict_eval: 予測結果のデータフレーム
        :return: 無し
        :rtype: None
        """
        # CSVのファイル名に実行日時を付与する
        timestamp_today_predict_result = datetime.datetime.now().strftime('%Y%m%d')

        # 1~3位の順位予測の結果をCSVに追記する
        today_raceresult_file = os.path.join(self.predicted_path,
                                             f'predict_result_{self.location}_{timestamp_today_predict_result}.csv')
        df_predict_eval[:3].to_csv(today_raceresult_file, mode='a', encoding='cp932',
                                   index=False, header=False, errors='ignore')

    def run(self):
        """
        レース予測処理を実行する
        :return: 無し
        :rtype: None
        """
        start_time = time.time()
        self.logger.info(f'start_time: {start_time}')

        df_race_info = self.load_race_info()
        df_race_info = self.process_race_predictions(df_race_info)
        self.append_predicted_top3_ranks_to_csv(df_race_info)

        run_time = time.time() - start_time
        self.logger.info(f'{os.path.basename(__file__)} {self.location} 実行時間: {run_time:.2f}秒')


class RacePredictionProcessing:
    def __init__(self, location):
        """
        RacePredictionProcessingクラスの初期化
        :param str location: 開催場所
        """
        self.config_ini = r'./config.ini'
        self.log_config_path = r'./log/logging.conf'
        self.log_file_path = r'./log/data/debug.log'
        self.location = location
        self.time_buffer = 1400

        # 設定の読み込み
        self.config = self.load_config()

        # twitterに投稿するメッセージを保存するディレクトリ
        self.twitter_message_dir = self.config['twitter']['message_path']

        # 開催場所の日本語名を取得する
        self.location_jp = dict(self.config['loc_romaji_jp'])[location]

        # loggerを設定する
        self.logger = LoggerSetup.setup_logger(self.log_config_path, self.log_file_path)

        # 処理で使用するCSVファイルのパス設定
        self.location_cell_time_info_csv = dict(self.config['location_cell_time_info_csv'])
        self.location_races_info_csv = dict(self.config['location_races_info_csv'])
        self.race_info_path = self.config['settings']['race_info_path']

        # 当日の出走時刻の読み込み
        self.today_cell_time_info = self.load_today_cell_time_info()

        # 天気と馬場状態の初期化
        self.weather = ''
        self.shiba_condition = ''
        self.dart_condition = ''

    def load_config(self):
        """
        設定を読み込む
        :return: 読み込んだ設定
        :rtype: configparser.ConfigParser
        """
        config = configparser.ConfigParser(interpolation=None)
        config.read(self.config_ini, 'UTF-8')

        return config

    def load_today_cell_time_info(self):
        """
        当日の出走時刻を読み込む
        :return: 読み込んだ出走時刻のデータフレーム
        :rtype: pd.DataFrame
        """
        cell_time_info = pd.read_csv(self.config['location_cell_time_info_csv'][self.location], encoding='CP932')
        cell_time_info['date_time'] = cell_time_info['race_date'].str.cat(cell_time_info['cell_time'], sep=' ')
        today = datetime.datetime.now().strftime('%Y年%#m月%#d日')

        return cell_time_info[cell_time_info['race_date'] == today]

    def preparing_raceinfo_data(self, url):
        """
        レース情報のHTMLデータを取得し、パースする
        指定されたURLからレース情報のHTMLデータを取得し、BeautifulSoupを使用してパースする
        また、HTML内の特定の要素(ブリンカー着用のクラスおよびタグ)を削除し、不要な改行を取り除く
        :param str url: レース情報のURL
        :return: パースされたHTMLデータ
        :rtype: BeautifulSoup
        """
        soup_response = requests.get(url)
        soup_response.encoding = soup_response.apparent_encoding
        soup = BeautifulSoup(soup_response.text, 'html5lib')

        # ブリンカー着用のクラスを削除する
        for tag in soup.find_all('span', class_='horse_icon blinker'):
            tag.decompose()

        # ブリンカー着用のタグを削除しても、不要な改行が残るので、改行を空文字で置換する
        for tag in soup.find_all('td', class_='num'):
            tag.string = tag.get_text(strip=True)

        return soup

    def extract_list_from_df(self, df, row_index, pattern, extract_method=None, default_value='', post_process=None):
        """
        データフレームから特定のパターンに一致するリストを抽出する
        指定されたデータフレームの特定の行から正規表現パターンに一致する要素を抽出し、リストとして返す
        オプションで、抽出方法や抽出後の処理、デフォルト値を指定することができる
        :param pd.DataFrame df: データフレーム
        :param int row_index: 行インデックス
        :param str pattern: 抽出パターン(省略可能)
        :param callable extract_method: 抽出方法(省略可能)
        :param str default_value: デフォルト値(省略可能)
        :param callable post_process: 後処理方法(省略可能)
        :return: 抽出したリスト
        :rtype: list
        """
        extracted_list = []
        for item in df.iloc[row_index]:
            item = item.replace(' ', '')
            match = re.search(pattern, item)
            if match:
                result = match.group(1) if extract_method is None else extract_method(match.group(1))
            else:
                result = default_value
            if post_process:
                result = post_process(result)
            extracted_list.append(result)

        return extracted_list

    def extract_text(self, soup, tag, attrs, extract_array_index=None, next_elem=None, split_text=None,
                     split_array_index=None, replace=None, regex=None):
        """
        指定されたHTML要素からテキストを抽出する
        オプションで、次の要素の属性値を抽出したり、テキストを分割して特定の部分を取り出したり、
        置換や正規表現による変換を行うことができる
        :param BeautifulSoup soup: パースされたHTMLデータ
        :param str tag: 抽出対象のタグ
        :param dict attrs: タグの属性
        :param int extract_array_index: 抽出する要素のインデックス(省略可能)
        :param str next_elem: 次の要素(省略可能)
        :param str split_text: 分割テキスト(省略可能)
        :param int split_array_index: 分割したテキストのインデックス(省略可能)
        :param tuple replace: 置換対象と置換後のテキスト(省略可能)
        :param tuple regex: 正規表現パターンと置換テキスト(省略可能)
        :return: 抽出されたテキスト
        :rtype: str
        """
        elements = soup.find_all(tag, attrs=attrs)
        if next_elem is not None:
            element = elements[extract_array_index].next_element.attrs[next_elem]
        else:
            element = elements[extract_array_index]

        if split_array_index is not None:
            element = element.text.split(split_text)[split_array_index]

        if replace is not None:
            old, new = replace
            element = element.replace(old, new)

        if regex is not None:
            pattern, repl = regex
            element = re.sub(pattern, repl, element.text)

        if not isinstance(element, str):
            element = element.text

        return element

    def get_race_info(self, soup):
        """
        レース情報を取得するメソッド
        :param BeautifulSoup soup: パースされたHTMLデータ
        :return: レース情報のデータフレーム
        :rtype: pd.DataFrame

        レース情報のHTMLデータから必要な情報を抽出し、データフレームに変換する
        抽出項目
        - 人気,馬名,オッズ,馬体重,父馬,母馬,年齢,騎手体重,騎手名,ラウンド,レース日,開催場所,レース名,距離,コース(芝・ダート)
        """
        self.logger.info('レース情報を取得する')
        race_info = pd.DataFrame()

        tbody = soup.find('tbody').find_all('tr')
        df = pd.DataFrame()

        for tb in tbody[0:]:
            tmp = tb.text.split('\n')
            contains_exclude = False
            for item in tmp[:19]:
                if '除外' in item or '取消' in item:
                    contains_exclude = True
                    break

            if contains_exclude:
                continue

            df_row = pd.Series(tmp)
            df = pd.concat([df, df_row], axis=1)

        popular_list = self.extract_list_from_df(df, 5, r'\((\d+)番人気\)', post_process=str)
        horse_list = self.extract_list_from_df(df, 5, r'([\u30A1-\u30FF]+)')
        odds_list = self.extract_list_from_df(df, 5, r'([\d.]+)\(', post_process=float)
        horse_weight_list = self.extract_list_from_df(df, 7, r'(\d+)kg', post_process=int, default_value=0)
        father_list = self.extract_list_from_df(df, 9, r':(.+)')
        mother_list = self.extract_list_from_df(df, 10, r':(.+)\(', default_value='不明')
        age_list = self.extract_list_from_df(df, 13, r'(.+)/', post_process=lambda x: x.replace('せん', 'セ'))
        rider_weight_list = self.extract_list_from_df(df, 15, r'(\d+\.?\d*)kg', post_process=str)
        rider_list = self.extract_list_from_df(df, 17, r'([^\s▲△☆◇★]+)', default_value='不明')

        round = self.extract_text(soup=soup, tag='div', attrs={'class': 'race_number'}, extract_array_index=0,
                                  next_elem='alt', replace=('レース', ' R'))

        race_date = self.extract_text(soup=soup, tag='div', attrs={'class': 'cell date'}, extract_array_index=0,
                                      split_text='(', split_array_index=0)
        race_location = self.extract_text(soup=soup, tag='div', attrs={'class': 'cell date'}, extract_array_index=0,
                                          split_text=' ', split_array_index=1, replace=('日', '日目'))

        race_name = self.extract_text(soup=soup, tag='span', attrs={'class': 'race_name'}, extract_array_index=0)
        race_distance = self.extract_text(soup=soup, tag='div', attrs={'class': 'cell course'}, extract_array_index=0,
                                          regex=(r'\D', ''))
        ground = self.extract_text(soup=soup, tag='div', attrs={'class': 'cell course'}, extract_array_index=0)
        ground = 'ダート' if 'ダート' in ground else '芝' if '芝' in ground else ground

        race_info = pd.DataFrame({
            'horse': horse_list,
            'father': father_list,
            'mother': mother_list,
            'age': age_list,
            'rider_weight': rider_weight_list,
            'rider': rider_list,
            'odds': odds_list,
            'popular': popular_list,
            'horse_weight': horse_weight_list,
            'distance': race_distance,
            'ground': ground,
            'date': race_date,
            'race_name': race_name,
            'location': race_location,
            'round': round
        })

        return race_info

    def get_weather_condition_info(self):
        self.logger.info('天気と馬場の情報を取得する')
        url = 'https://www.jra.go.jp/keiba/baba/'
        res = requests.get(url)
        res.encoding = res.apparent_encoding
        soup = BeautifulSoup(res.text, 'lxml')

        class_cell_txt = soup.find('div', attrs={'class': 'cell txt'})
        self.weather = class_cell_txt.text.replace('天候:', '')
        self.logger.info(f'天気:{self.weather}')

        location_list = []
        location_url = []
        class_nav_tab = soup.find_all('div', attrs={'class': 'nav tab'})
        tag_a = class_nav_tab[0].find_all('a')

        for i in tag_a:
            location_list.append(i.text)
            url = urllib.parse.urljoin(url, i.get('href'))
            location_url.append(url)

        location_list = [s.replace('競馬場', '') for s in location_list]
        index_num = location_list.index(self.location_jp)
        url = location_url[index_num]

        res = requests.get(url)
        res.encoding = res.apparent_encoding
        soup = BeautifulSoup(res.text, 'lxml')

        class_data_list_unit = soup.find_all('div', attrs={'class': 'data_list_unit'})
        for i in class_data_list_unit:
            tag_h4 = i.find_all('h4')

            if len(tag_h4) == 0:
                continue

            if tag_h4[0].text == '芝':
                tag_p = i.find_all('p')
                self.shiba_condition = tag_p[0].text
                self.logger.info(f'芝:{self.shiba_condition}')

            if tag_h4[0].text == 'ダート':
                tag_p = i.find_all('p')
                self.dart_condition = tag_p[0].text
                self.logger.info(f'ダート:{self.dart_condition}')

    def set_weather_condition(self, df):
        """
        天気と馬場の情報を設定する
        :param pd.DataFrame df: レース情報のデータフレーム
        :return: 天気と馬場の情報を設定したレース情報のデータフレーム
        :rtype: pd.DataFrame

        指定されたレース情報のデータフレームに天気と馬場の情報を追加する
        - `self.weather` の情報を天気列に設定する
        - 芝コースに関する情報を `self.shiba_condition` から取得し、該当するレコードの馬場状態に設定する
        - ダートコースに関する情報を `self.dart_condition` から取得し、該当するレコードの馬場状態に設定する
        """
        self.logger.info('天気と馬場の情報を設定する')
        df = df.reset_index(drop=True)
        df.loc[df['location'].str.contains(self.location_jp), 'weather'] = self.weather
        df.loc[(df['location'].str.contains(self.location_jp)) &
               (df['ground'].str.contains('芝')), 'condition'] = self.shiba_condition
        df.loc[(df['location'].str.contains(self.location_jp)) &
               (df['ground'].str.contains('ダート')), 'condition'] = self.dart_condition

        return df

    def reindex_df(self, df):
        """
        データフレームの列を並び替えるメソッド
        :param pd.DataFrame df: データフレーム
        :return: 列を並び替えたデータフレーム
        :rtype: pd.DataFrame

        指定されたデータフレームの列を特定の順序に並び替える
        以下の順序で列が並び替える
        - horse,father,mother,age,rider_weight,rider,odds,popular,horse_weight,
        distance,weather,ground,condition,date,race_name,location,round
        列の並び替えを行ったデータフレームを返す
        """
        self.logger.info('列を並び替える')
        df = df.reindex(columns=['horse', 'father', 'mother', 'age', 'rider_weight', 'rider', 'odds', 'popular',
                                 'horse_weight', 'distance', 'weather', 'ground', 'condition', 'date', 'race_name',
                                 'location', 'round'])

        return df

    def save_race_info(self, races_info):
        """
        レース情報を保存する
        :param pd.DataFrame races_info: レース情報のデータフレーム
        :return: 無し
        :rtype: None

        以下の手順でレース情報を保存する
        1. JRAの公式サイトから天気と馬場の情報を取得し、インスタンス変数に格納する
        2. 取得した天気と馬場の情報をレース情報のデータフレームに設定する
        3. データフレームの列を特定の順序に並び替える
        4. 保存先ディレクトリを作成し、開催場所に一致するレース情報をCSVファイルとして保存する
        """
        self.get_weather_condition_info()
        races_info = self.set_weather_condition(races_info)
        races_info = self.reindex_df(races_info)
        os.makedirs(self.race_info_path, exist_ok=True)
        races_info_loc = races_info.loc[races_info['location'].str.contains(self.location_jp)]
        if len(races_info_loc) > 0:
            csv_file = f'{self.race_info_path}races_info_{self.location_jp}.csv'
            races_info_loc.to_csv(csv_file, encoding='cp932', index=False, errors='ignore')

    def run_prediction_processing(self, race_info):
        """
        レース予測処理を実行する
        :param pd.Series race_info: レース情報のシリーズ
        :return: 無し
        :rtype: None

        以下の手順でレース予測処理を実行する:
        1. 指定されたレース情報のURLからHTMLデータを取得し、パースする
        2. パースされたHTMLデータからレース情報を抽出する
        3. 抽出したレース情報を保存する
        4. レース予測を行うために `RacePrediction` クラスを初期化し、予測処理を実行する
        5. 予測結果をツイートするためのメッセージを作成し、ツイートする
        """
        self.logger.info(f'{self.location_jp} {race_info["date_time"]} レースの情報を取得する')
        self.logger.info(f'取得先URL:{race_info["race_info_url"]}')

        soup = self.preparing_raceinfo_data(race_info['race_info_url'])
        races_info = pd.DataFrame()

        try:
            races_info = self.get_race_info(soup)
        except BaseException as err:
            self.logger.exception(f'Raise Exception:{err}')
            self.logger.info('予測に必要なデータが掲載されていません')

        self.save_race_info(races_info)

        race_prediction = RacePrediction(self.location, self.location_races_info_csv[self.location],
                                         self.config, self.logger)
        race_prediction.run()

        message_file = os.path.join(self.twitter_message_dir, f'twitter_message_{self.location}.txt')
        self.tweet_predicted_rankings(message_file)

    def tweet_predicted_rankings(self, message_file):
        """
        予測した順位をツイートする
        :param str message_file: ツイートメッセージが含まれるファイルのパス
        :return: 無し
        :rtype: None

        指定されたファイルからツイートメッセージを読み込み、ツイートする
        メッセージには予測したレースの上位3位の情報が含まれており、ハッシュタグとリンクを追加してツイートする
        ファイルが存在しない場合は何も行わない
        """
        if os.path.isfile(message_file):
            with open(message_file) as f:
                lines = f.readlines()

            lines.append('#競馬 #予測 #AI\n競馬AIの作成過程\n')
            lines.append('https://relaxing-living-life.com/')
            message = lines[0] + lines[1] + lines[2] + lines[3] + lines[-2] + lines[-1]
            self.logger.info(f'Xに投稿するメッセージ:{message}')
            self.tweet(message)

    def tweet(self, message):
        """
        ツイートを投稿する
        :param str message: 投稿するツイートメッセージ
        :return: 無し
        :rtype: None

        指定したメッセージをツイートする
        Tweepyを使用してTwitter APIにアクセスし、ツイートを投稿する
        ツイートの投稿に成功した場合、その旨をログに記録し、失敗した場合はエラーメッセージをログに記録する
        """
        self.logger.info('Xに投稿する')
        try:
            client = tweepy.Client(
                consumer_key=self.config['twitter']['API_KEY'],
                consumer_secret=self.config['twitter']['API_SECRET'],
                access_token=self.config['twitter']['ACCESS_TOKEN'],
                access_token_secret=self.config['twitter']['ACCESS_TOKEN_SECRET'],
            )

            response = client.create_tweet(text=message)
            if response:
                self.logger.info(f'Tweet posted successfully: {response}')
            else:
                self.logger.warning('Tweet posting failed without response')
        except tweepy.TweepyException as e:
            self.logger.error(f'Tweepy error occurred: {e}')
        except Exception as e:
            self.logger.error(f'Unexpected error occurred: {e}')

    def process_race_row(self, row):
        """
        各レース行に対する処理を実行する
        :param pd.Series row: レース情報のシリーズ
        :return: 無し
        :rtype: None

        以下の手順で各レース行に対する処理を実行する
        1. 現在時刻を取得する
        2. レースの出走時刻を取得する
        3. 現在時刻が出走時刻を過ぎている場合、ログに記録し処理を終了する
        4. 出走時刻までの時間差を計算し、設定されたバッファ時間を差し引いた後の秒数を取得する
        5. その秒数だけ待機し、待機後に `run_prediction_processing` メソッドを呼び出して予測処理を実行する
        6. もし待機時間が0以下の場合は、すぐに予測処理を実行する
        """
        self.logger.info('各レース行に対する処理。データの取得、レース情報の保存、順位予測、Xへの投稿、の処理を開始する')
        now = datetime.datetime.now()
        cell_date_time = dt.strptime(row['date_time'], '%Y年%m月%d日 %H:%M')

        if now > cell_date_time:
            self.logger.info('出走済み')
            return

        diff_time = cell_date_time - now
        seconds_to_run_prediction = diff_time.seconds - self.time_buffer

        if seconds_to_run_prediction > 0:
            for wait_time in range(seconds_to_run_prediction, 0, -1):
                print(f"\r\033[K#次の実行まで残り {wait_time} 秒 {self.location} ", end="")
                time.sleep(1)
            self.run_prediction_processing(row)
        else:
            self.run_prediction_processing(row)

    def run_prediction(self):
        """
        レース予測処理を実行する
        :return: 無し
        :rtype: None

        予測処理の起点になるメソッド
        当日の出走レースに対して予測処理を実行する
        以下の手順で処理を行う:
        1. 処理開始時刻をログに記録する
        2. 当日の各レース情報を順次処理する
        3. 各レース情報に対して `process_race_row` メソッドを呼び出し、予測処理を実行する
        4. 処理終了時刻をログに記録し、実行時間を計測してログに記録する
        """
        start_time = time.time()
        self.logger.info(f'start_time:{start_time}')

        for _, row in self.today_cell_time_info.iterrows():
            self.process_race_row(row)

        run_time = time.time() - start_time
        self.logger.info(f'{os.path.basename(__file__)}, {self.location}, 実行時間:{run_time:.2f}秒')


if __name__ == "__main__":
    # 検証用
    race_prediction_processing = RacePredictionProcessing('hanshin')
    race_prediction_processing.run_prediction()

以上です。

コメント

タイトルとURLをコピーしました