1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
|
import argparse
import sys
from io import StringIO
from textwrap import dedent
import pandas as pd
import requests
from colorama import Back, Fore, Style, just_fix_windows_console
from geopy.distance import geodesic
from geopy.geocoders import Nominatim
just_fix_windows_console()
ENDPOINT = "https://www.fuel-finder.service.gov.uk/internal/v1.0.2/csv/get-latest-fuel-prices-csv"
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser()
parser.add_argument("-a", "--address", type=str, required=True)
parser.add_argument("-r", "--radius", type=int, default=5)
parser.add_argument("-s", "--sort", type=str, default="e10")
return parser.parse_args()
def get_location(address):
loc = Nominatim(user_agent="FuelNearMe")
getLoc = loc.geocode(address)
if not getLoc:
print("[*] Failed to get location. Please check if the address is valid.")
sys.exit(1)
latitude = getLoc.latitude
longitude = getLoc.longitude
return (latitude, longitude)
def get_latest_data():
response = requests.get(ENDPOINT)
return pd.read_csv(StringIO(response.text)), response.headers.get("Last-Modified")
def process_data(dframe):
price_cols = [c for c in dframe.columns if "fuel_price" in c]
dframe[price_cols] = dframe[price_cols].fillna(0.0)
return dframe.fillna("N/A")
def filter_df(dframe, arguments, loc):
near_stations = []
for station, latitude, longitude, e5_price, e10_price, diesel_price in zip(
dframe["forecourts.trading_name"],
dframe["forecourts.location.latitude"],
dframe["forecourts.location.longitude"],
dframe["forecourts.fuel_price.E5"],
dframe["forecourts.fuel_price.E10"],
dframe["forecourts.fuel_price.B7S"],
):
distance_from_current_location = geodesic((latitude, longitude), loc).miles
if distance_from_current_location < arguments.radius:
station_dict = {
"station_name": station,
"distance": round(distance_from_current_location, 1),
"e5_price": round(e5_price / 100, 2),
"e10_price": round(e10_price / 100, 2),
"diesel_price": round(diesel_price / 100, 2),
}
near_stations.append(station_dict)
return near_stations
def sort_list_of_stations(stations_list, arguments):
match arguments.sort:
case "e10":
sort_by = "e10_price"
return sorted(stations_list, key=lambda d: d[sort_by])
case "e5":
sort_by = "e5_price"
return sorted(stations_list, key=lambda d: d[sort_by])
case "b7s":
sort_by = "diesel_price"
return sorted(stations_list, key=lambda d: d[sort_by])
case "distance":
sort_by = "distance"
return sorted(stations_list, key=lambda d: d[sort_by])
def output_stations(stations):
for number, row in enumerate(stations):
output = dedent(f"""
{number + 1}. {row["station_name"]}
Distance: {row["distance"]} miles
E5 Price: £{row["e5_price"]:.2f}/L
E10 Price: £{row["e10_price"]:.2f}/L
B7S (Standard Diesel) Price: £{row["diesel_price"]:.2f}/L""")
print(output)
def main():
args = parse_args()
location = get_location(args.address)
df, last_modified = get_latest_data()
print(f"Last modified: {last_modified}")
df_processed = process_data(df)
print(f"\n{Fore.MAGENTA}Stations: " + Style.RESET_ALL + str(len(df_processed)))
df_filtered = filter_df(df_processed, args, location)
sorted_stations_list = sort_list_of_stations(df_filtered, args)
output_stations(sorted_stations_list)
if __name__ == "__main__":
main()
|