aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--main.py165
1 files changed, 98 insertions, 67 deletions
diff --git a/main.py b/main.py
index 8d6357a..16b0300 100644
--- a/main.py
+++ b/main.py
@@ -1,22 +1,35 @@
import argparse
+import math
import sys
from io import StringIO
-from textwrap import dedent
+from typing import Any, Dict, List, Optional, Tuple
+import numpy as np
import pandas as pd
import requests
-from geopy.distance import geodesic
from geopy.geocoders import Nominatim
from geopy.location import Location
+from tabulate import tabulate
ENDPOINT = "https://www.fuel-finder.service.gov.uk/internal/v1.0.2/csv/get-latest-fuel-prices-csv"
+SORT_KV = {
+ "e10": "e10_price",
+ "e5": "e5_price",
+ "b7s": "diesel_price",
+ "distance": "distance",
+}
+
+HEADERS = {
+ "User-Agent": "Mozilla/5.0 (Macintosh; U; Intel Mac OS X 10_6_4; en-US) AppleWebKit/533.45 (KHTML, like Gecko) Chrome/48.0.2094.221 Safari/602"
+}
+
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser()
parser.add_argument("-a", "--address", type=str, required=True)
parser.add_argument("-r", "--radius", type=int, default=5)
- parser.add_argument("-s", "--sort", type=str, default="e10")
+ parser.add_argument("-s", "--sort", type=str, default="e10", choices=SORT_KV.keys())
return parser.parse_args()
@@ -24,84 +37,102 @@ def get_location(address: str) -> tuple[float, float]:
geolocator = Nominatim(user_agent="FuelNearMe")
result = geolocator.geocode(address)
if not isinstance(result, Location):
- print("[*] Failed to get location. Please check if the address is valid.")
- sys.exit(1)
+ raise ValueError(f"Failed to get location from address: '{address}'")
return (result.latitude, result.longitude)
-def get_latest_data():
- response = requests.get(ENDPOINT)
+def get_latest_data() -> tuple[pd.DataFrame, Optional[str]]:
+ response = requests.get(ENDPOINT, headers=HEADERS, timeout=10)
+ response.raise_for_status()
return pd.read_csv(StringIO(response.text)), response.headers.get("Last-Modified")
-def process_data(dframe):
- price_cols = [c for c in dframe.columns if "fuel_price" in c]
- dframe[price_cols] = dframe[price_cols].fillna(0.0)
- return dframe.fillna("N/A")
-
-
-def filter_df(dframe, arguments, loc):
- near_stations = []
- for station, latitude, longitude, e5_price, e10_price, diesel_price in zip(
- dframe["forecourts.trading_name"],
- dframe["forecourts.location.latitude"],
- dframe["forecourts.location.longitude"],
- dframe["forecourts.fuel_price.E5"],
- dframe["forecourts.fuel_price.E10"],
- dframe["forecourts.fuel_price.B7S"],
- ):
- distance_from_current_location = geodesic((latitude, longitude), loc).miles
- if distance_from_current_location < arguments.radius:
- station_dict = {
- "station_name": station,
- "distance": round(distance_from_current_location, 1),
- "e5_price": round(e5_price / 100, 2),
- "e10_price": round(e10_price / 100, 2),
- "diesel_price": round(diesel_price / 100, 2),
- }
- near_stations.append(station_dict)
- return near_stations
-
-
-def sort_list_of_stations(stations_list, arguments):
- match arguments.sort:
- case "e10":
- sort_by = "e10_price"
- return sorted(stations_list, key=lambda d: d[sort_by])
- case "e5":
- sort_by = "e5_price"
- return sorted(stations_list, key=lambda d: d[sort_by])
- case "b7s":
- sort_by = "diesel_price"
- return sorted(stations_list, key=lambda d: d[sort_by])
- case "distance":
- sort_by = "distance"
- return sorted(stations_list, key=lambda d: d[sort_by])
-
-
-def output_stations(stations):
- for number, row in enumerate(stations):
- output = dedent(f"""
- {number + 1}. {row["station_name"]}
- Distance: {row["distance"]} miles
- E5 Price: £{row["e5_price"]:.2f}/L
- E10 Price: £{row["e10_price"]:.2f}/L
- B7S (Standard Diesel) Price: £{row["diesel_price"]:.2f}/L""")
- print(output)
+def filter_df(
+ dframe: pd.DataFrame, arguments: argparse.Namespace, loc: Tuple[float, float]
+) -> List[Dict[str, Any]]:
+
+ def bounding_box() -> pd.DataFrame:
+ lat, lon = loc
+ deg_lat = arguments.radius / 69.0
+ deg_lon = arguments.radius / (69.0 * math.cos(math.radians(lat)))
+ return dframe[
+ dframe["forecourts.location.latitude"].between(lat - deg_lat, lat + deg_lat)
+ & dframe["forecourts.location.longitude"].between(
+ lon - deg_lon, lon + deg_lon
+ )
+ ]
+
+ def haversine_miles(lat2: np.ndarray, lon2: np.ndarray) -> np.ndarray:
+ R = 3958.8
+ lat1, lon1 = np.radians(loc[0]), np.radians(loc[1])
+ lat2, lon2 = np.radians(lat2), np.radians(lon2)
+ dlat = lat2 - lat1
+ dlon = lon2 - lon1
+ a = np.sin(dlat / 2) ** 2 + np.cos(lat1) * np.cos(lat2) * np.sin(dlon / 2) ** 2
+ return R * 2 * np.arcsin(np.sqrt(a))
+
+ def pence_to_pounds(col: pd.Series) -> pd.Series:
+ return (col / 100).round(2).where(col.notna(), other="N/A")
+
+ df = bounding_box().copy()
+
+ df["distance"] = haversine_miles(
+ df["forecourts.location.latitude"].to_numpy(),
+ df["forecourts.location.longitude"].to_numpy(),
+ ).round(1)
+
+ df = df[df["distance"] < arguments.radius]
+
+ df = df.assign(
+ e5_price=pence_to_pounds(df["forecourts.fuel_price.E5"]),
+ e10_price=pence_to_pounds(df["forecourts.fuel_price.E10"]),
+ diesel_price=pence_to_pounds(df["forecourts.fuel_price.B7S"]),
+ )
+
+ return df.rename(columns={"forecourts.trading_name": "station_name"})[
+ ["station_name", "distance", "e5_price", "e10_price", "diesel_price"]
+ ].to_dict(orient="records")
+
+
+def sort_stations(stations: list[dict], sort: str) -> list[dict]:
+ sort_key = SORT_KV[sort]
+ return sorted(stations, key=lambda d: d[sort_key] if d[sort_key] != "N/A" else 999)
+
+
+def output_stations(stations: List[Dict[str, Any]]) -> None:
+ if not stations:
+ print("[*] No stations found.")
+ return
+ print(
+ tabulate(
+ stations,
+ headers={
+ "station_name": "Station Name",
+ "distance": "Distance (miles)",
+ "e5_price": "E5 (£/L)",
+ "e10_price": "E10 (£/L)",
+ "diesel_price": "B7S (£/L)",
+ },
+ floatfmt=".2f",
+ )
+ )
def main():
args = parse_args()
- location = get_location(args.address)
- df, last_modified = get_latest_data()
- print(f"Last modified: {last_modified}")
+ try:
+ location = get_location(args.address)
+ except ValueError as e:
+ print(f"[*] {e}")
+ sys.exit(1)
+ df, last_modified = get_latest_data()
- df_processed = process_data(df)
+ print(f"Last updated: {last_modified}")
- df_filtered = filter_df(df_processed, args, location)
+ df_filtered = filter_df(df, args, location)
- sorted_stations_list = sort_list_of_stations(df_filtered, args)
+ sorted_stations_list = sort_stations(df_filtered, args.sort)
output_stations(sorted_stations_list)
git.ajschof.me — hosted by ajschofield — powered by cgit