#!/usr/bin/env python """Download full game feeds.""" import argparse import asyncio from pathlib import Path import aiofiles import aiofiles.os import backoff import httpx import msgpack import toolz from tqdm.asyncio import tqdm api_url = "https://statsapi.web.nhl.com/api/v1{}".format def fetch(url: str): @backoff.on_exception( backoff.expo, (httpx.ConnectTimeout, httpx.ConnectError), max_time=30, ) async def _(c: httpx.Client, **kwargs): rsp = await c.get(api_url(url.format(**kwargs))) return rsp.json() return _ fetch_teams = fetch("/teams") fetch_roster = fetch("/teams/{team_id}/roster?season={season}") fetch_player_info = fetch("/people/{player_id}") fetch_player_stats = fetch("/people/{player_id}/stats?stats={stats_type}") def player_datadir(player_id: int) -> Path: return Path(f"data/players/{player_id}") def player_data(player_id: int, what: str) -> Path: return player_datadir(player_id) / f"{what}.msgpack" async def save_player_data(player_id: int, what: str, data): datadir = player_datadir(player_id) if not datadir.exists(): await aiofiles.os.mkdir(datadir) async with aiofiles.open(datadir / f"{what}.msgpack", "wb") as f: await f.write(msgpack.dumps(data)) async def load_player_data(player_id: int, what: str): async with aiofiles.open(player_data(player_id, what), "rb") as f: return msgpack.unpackb(await f.read()) async def get_player_stats(c, player_id, stats_type): if player_data(player_id, stats_type).exists(): stats = await load_player_data(player_id, stats_type) return player_id, stats rsp = await fetch_player_stats(c, player_id=player_id, stats_type=stats_type) stats = toolz.get_in(["stats", 0, "splits"], rsp) await save_player_data(player_id, stats_type, stats) return player_id, stats def seasons_in_nhl(yoy_stats) -> list[str]: return [ y["season"] for y in yoy_stats if y["league"]["name"] == "National Hockey League" ] async def get_player_gamelog(c, player_id, yoy_stats): if player_data(player_id, "gameLog").exists(): total_game_log = await load_player_data(player_id, "gameLog") return player_id, total_game_log game_logs = await asyncio.gather( *[ fetch_player_stats( c, player_id=player_id, stats_type=f"gameLog&season={season}" ) for season in seasons_in_nhl(yoy_stats) ] ) total_game_log = list( toolz.mapcat( toolz.curried.get_in(["stats", 0, "splits"]), game_logs, ) ) await save_player_data(player_id, "gameLog", total_game_log) return player_id, total_game_log async def get_player_info(c, player_id): if player_data(player_id, "info").exists(): return await load_player_data(player_id, "info") rsp = await fetch_player_info(c, player_id=player_id) info = toolz.get_in(["people", 0], rsp) await save_player_data(player_id, "info", info) return player_id, info async def main(season: int, force: bool = False): season_id = f"{season}{season + 1}" c = httpx.AsyncClient( limits=httpx.Limits(max_keepalive_connections=5, max_connections=10) ) async with c: teams = await fetch_teams(c) teams = teams["teams"] # -- Team rosters futs = tqdm.as_completed( [fetch_roster(c, team_id=team["id"], season=season_id) for team in teams], desc="Rosters", ) rosters = [] for r in futs: r = await r rosters.append(r["roster"]) player_ids = [p["person"]["id"] for p in toolz.concat(rosters)] # -- Player info futs = tqdm.as_completed( [get_player_info(c, player_id=pid) for pid in player_ids], desc="Player info", ) for r in futs: await r # -- Year-on-year stats futs = tqdm.as_completed( [ get_player_stats(c, player_id=pid, stats_type="yearByYear") for pid in player_ids ], desc="Year-on-Year stats", ) yoy_stats = [] for r in futs: yoy_stats.append(await r) # -- Game-by-game stats futs = tqdm.as_completed( [ get_player_gamelog(c, player_id=pid, yoy_stats=stats) for pid, stats in yoy_stats ], desc="Game-by-Game stats", ) for r in futs: await r if __name__ == "__main__": parser = argparse.ArgumentParser(description="Download stats for all NHL players") parser.add_argument("season", type=int, help="The season's roster to include") args = parser.parse_args() asyncio.run(main(args.season))