diff options
| -rw-r--r-- | main.py | 165 |
1 files changed, 98 insertions, 67 deletions
@@ -1,22 +1,35 @@ import argparse +import math import sys from io import StringIO -from textwrap import dedent +from typing import Any, Dict, List, Optional, Tuple +import numpy as np import pandas as pd import requests -from geopy.distance import geodesic from geopy.geocoders import Nominatim from geopy.location import Location +from tabulate import tabulate ENDPOINT = "https://www.fuel-finder.service.gov.uk/internal/v1.0.2/csv/get-latest-fuel-prices-csv" +SORT_KV = { + "e10": "e10_price", + "e5": "e5_price", + "b7s": "diesel_price", + "distance": "distance", +} + +HEADERS = { + "User-Agent": "Mozilla/5.0 (Macintosh; U; Intel Mac OS X 10_6_4; en-US) AppleWebKit/533.45 (KHTML, like Gecko) Chrome/48.0.2094.221 Safari/602" +} + def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser() parser.add_argument("-a", "--address", type=str, required=True) parser.add_argument("-r", "--radius", type=int, default=5) - parser.add_argument("-s", "--sort", type=str, default="e10") + parser.add_argument("-s", "--sort", type=str, default="e10", choices=SORT_KV.keys()) return parser.parse_args() @@ -24,84 +37,102 @@ def get_location(address: str) -> tuple[float, float]: geolocator = Nominatim(user_agent="FuelNearMe") result = geolocator.geocode(address) if not isinstance(result, Location): - print("[*] Failed to get location. Please check if the address is valid.") - sys.exit(1) + raise ValueError(f"Failed to get location from address: '{address}'") return (result.latitude, result.longitude) -def get_latest_data(): - response = requests.get(ENDPOINT) +def get_latest_data() -> tuple[pd.DataFrame, Optional[str]]: + response = requests.get(ENDPOINT, headers=HEADERS, timeout=10) + response.raise_for_status() return pd.read_csv(StringIO(response.text)), response.headers.get("Last-Modified") -def process_data(dframe): - price_cols = [c for c in dframe.columns if "fuel_price" in c] - dframe[price_cols] = dframe[price_cols].fillna(0.0) - return dframe.fillna("N/A") - - -def filter_df(dframe, arguments, loc): - near_stations = [] - for station, latitude, longitude, e5_price, e10_price, diesel_price in zip( - dframe["forecourts.trading_name"], - dframe["forecourts.location.latitude"], - dframe["forecourts.location.longitude"], - dframe["forecourts.fuel_price.E5"], - dframe["forecourts.fuel_price.E10"], - dframe["forecourts.fuel_price.B7S"], - ): - distance_from_current_location = geodesic((latitude, longitude), loc).miles - if distance_from_current_location < arguments.radius: - station_dict = { - "station_name": station, - "distance": round(distance_from_current_location, 1), - "e5_price": round(e5_price / 100, 2), - "e10_price": round(e10_price / 100, 2), - "diesel_price": round(diesel_price / 100, 2), - } - near_stations.append(station_dict) - return near_stations - - -def sort_list_of_stations(stations_list, arguments): - match arguments.sort: - case "e10": - sort_by = "e10_price" - return sorted(stations_list, key=lambda d: d[sort_by]) - case "e5": - sort_by = "e5_price" - return sorted(stations_list, key=lambda d: d[sort_by]) - case "b7s": - sort_by = "diesel_price" - return sorted(stations_list, key=lambda d: d[sort_by]) - case "distance": - sort_by = "distance" - return sorted(stations_list, key=lambda d: d[sort_by]) - - -def output_stations(stations): - for number, row in enumerate(stations): - output = dedent(f""" - {number + 1}. {row["station_name"]} - Distance: {row["distance"]} miles - E5 Price: £{row["e5_price"]:.2f}/L - E10 Price: £{row["e10_price"]:.2f}/L - B7S (Standard Diesel) Price: £{row["diesel_price"]:.2f}/L""") - print(output) +def filter_df( + dframe: pd.DataFrame, arguments: argparse.Namespace, loc: Tuple[float, float] +) -> List[Dict[str, Any]]: + + def bounding_box() -> pd.DataFrame: + lat, lon = loc + deg_lat = arguments.radius / 69.0 + deg_lon = arguments.radius / (69.0 * math.cos(math.radians(lat))) + return dframe[ + dframe["forecourts.location.latitude"].between(lat - deg_lat, lat + deg_lat) + & dframe["forecourts.location.longitude"].between( + lon - deg_lon, lon + deg_lon + ) + ] + + def haversine_miles(lat2: np.ndarray, lon2: np.ndarray) -> np.ndarray: + R = 3958.8 + lat1, lon1 = np.radians(loc[0]), np.radians(loc[1]) + lat2, lon2 = np.radians(lat2), np.radians(lon2) + dlat = lat2 - lat1 + dlon = lon2 - lon1 + a = np.sin(dlat / 2) ** 2 + np.cos(lat1) * np.cos(lat2) * np.sin(dlon / 2) ** 2 + return R * 2 * np.arcsin(np.sqrt(a)) + + def pence_to_pounds(col: pd.Series) -> pd.Series: + return (col / 100).round(2).where(col.notna(), other="N/A") + + df = bounding_box().copy() + + df["distance"] = haversine_miles( + df["forecourts.location.latitude"].to_numpy(), + df["forecourts.location.longitude"].to_numpy(), + ).round(1) + + df = df[df["distance"] < arguments.radius] + + df = df.assign( + e5_price=pence_to_pounds(df["forecourts.fuel_price.E5"]), + e10_price=pence_to_pounds(df["forecourts.fuel_price.E10"]), + diesel_price=pence_to_pounds(df["forecourts.fuel_price.B7S"]), + ) + + return df.rename(columns={"forecourts.trading_name": "station_name"})[ + ["station_name", "distance", "e5_price", "e10_price", "diesel_price"] + ].to_dict(orient="records") + + +def sort_stations(stations: list[dict], sort: str) -> list[dict]: + sort_key = SORT_KV[sort] + return sorted(stations, key=lambda d: d[sort_key] if d[sort_key] != "N/A" else 999) + + +def output_stations(stations: List[Dict[str, Any]]) -> None: + if not stations: + print("[*] No stations found.") + return + print( + tabulate( + stations, + headers={ + "station_name": "Station Name", + "distance": "Distance (miles)", + "e5_price": "E5 (£/L)", + "e10_price": "E10 (£/L)", + "diesel_price": "B7S (£/L)", + }, + floatfmt=".2f", + ) + ) def main(): args = parse_args() - location = get_location(args.address) - df, last_modified = get_latest_data() - print(f"Last modified: {last_modified}") + try: + location = get_location(args.address) + except ValueError as e: + print(f"[*] {e}") + sys.exit(1) + df, last_modified = get_latest_data() - df_processed = process_data(df) + print(f"Last updated: {last_modified}") - df_filtered = filter_df(df_processed, args, location) + df_filtered = filter_df(df, args, location) - sorted_stations_list = sort_list_of_stations(df_filtered, args) + sorted_stations_list = sort_stations(df_filtered, args.sort) output_stations(sorted_stations_list) |
