Process AIS Dataset#

Setup#

import pandas as pd
import geopandas as gpd
from os.path import join, expanduser
from geopy import distance
import boto3

pd.set_option("display.max_columns", None)
import sys

gn_path = join(expanduser("~"), "Repos", "GOSTnets")
sys.path.append(gn_path)
import GOSTnets as gn
import osmnx as ox
import pickle
%load_ext autoreload
%autoreload 2
from pandarallel import pandarallel

pandarallel.initialize(progress_bar=False, nb_workers=60)
INFO: Pandarallel will run on 60 workers.
INFO: Pandarallel will use Memory file system to transfer data between the main process and workers.

Read Data#

s3_client = boto3.client("s3")
objects = s3_client.list_objects_v2(
    Bucket="wbgdecinternal-ntl", Prefix="Andres_Temp/AIS/red-sea"
)
s3_files = []
for obj in objects["Contents"]:
    # print(obj["Key"])
    s3_files.append(obj["Key"])
# s3_files = s3_files[0:6] + s3_files[9:12] + s3_files[6:9] + s3_files[12:]
# s3_files = s3_files[0:6] + s3_files[9:12] + s3_files[6:9] + s3_files[15:] + s3_files[13:15]
s3_files = (
    s3_files[0:6] + s3_files[9:12] + s3_files[6:9] + s3_files[19:] + s3_files[13:19]
)
s3_files
['Andres_Temp/AIS/red-sea/portcalls_gdf_2023_01.csv',
 'Andres_Temp/AIS/red-sea/portcalls_gdf_2023_02.csv',
 'Andres_Temp/AIS/red-sea/portcalls_gdf_2023_03.csv',
 'Andres_Temp/AIS/red-sea/portcalls_gdf_2023_04.csv',
 'Andres_Temp/AIS/red-sea/portcalls_gdf_2023_05.csv',
 'Andres_Temp/AIS/red-sea/portcalls_gdf_2023_06.csv',
 'Andres_Temp/AIS/red-sea/portcalls_gdf_2023_7.csv',
 'Andres_Temp/AIS/red-sea/portcalls_gdf_2023_8.csv',
 'Andres_Temp/AIS/red-sea/portcalls_gdf_2023_9.csv',
 'Andres_Temp/AIS/red-sea/portcalls_gdf_2023_10.csv',
 'Andres_Temp/AIS/red-sea/portcalls_gdf_2023_11.csv',
 'Andres_Temp/AIS/red-sea/portcalls_gdf_2023_12.csv',
 'Andres_Temp/AIS/red-sea/portcalls_gdf_2024_1.csv',
 'Andres_Temp/AIS/red-sea/portcalls_gdf_2024_2.csv',
 'Andres_Temp/AIS/red-sea/portcalls_gdf_2024_3.csv',
 'Andres_Temp/AIS/red-sea/portcalls_gdf_2024_04.csv',
 'Andres_Temp/AIS/red-sea/portcalls_gdf_2024_05.csv',
 'Andres_Temp/AIS/red-sea/portcalls_gdf_2024_06.csv',
 'Andres_Temp/AIS/red-sea/portcalls_gdf_2024_07.csv',
 'Andres_Temp/AIS/red-sea/portcalls_gdf_2024_08.csv',
 'Andres_Temp/AIS/red-sea/portcalls_gdf_2024_09.csv']
dfs = [pd.read_csv(f"s3://wbgdecinternal-ntl/{s3_file}") for s3_file in s3_files]
[len(df) for df in dfs]
[733973,
 754343,
 870976,
 843526,
 887066,
 899082,
 934011,
 929877,
 921107,
 932575,
 867774,
 884053,
 864070,
 678337,
 757018,
 758891,
 759204,
 732224,
 821611,
 931175,
 913017]
df_all = pd.concat(dfs)
len(df_all)
17673910
group_by_cols = ["mmsi", "imo", "vessel_name"]  # route_group
del dfs
df = df_all.copy()
del df_all

Data Processing#

# df = pd.read_csv(f"s3://wbgdecinternal-ntl/{s3_files[0]}")
# group_by_cols = ["mmsi", "imo", "vessel_name"]  # route_group

Calculate Travel Time#

# df = df.head(1000)
df["geometry"] = gpd.points_from_xy(df.longitude, df.latitude)
df = gpd.GeoDataFrame(df, geometry="geometry", crs="EPSG:4326")
df.loc[:, "arrival_dt_pos_utc"] = pd.to_datetime(df.loc[:, "arrival_dt_pos_utc"])
df.loc[:, "departure_dt_pos_utc"] = pd.to_datetime(df.loc[:, "departure_dt_pos_utc"])
df["prev_departure_dt_pos_utc"] = df.groupby(group_by_cols)[
    "departure_dt_pos_utc"
].shift(1)
df["time_spent"] = df["departure_dt_pos_utc"] - df["arrival_dt_pos_utc"]
df["time_travel"] = df["arrival_dt_pos_utc"] - df["prev_departure_dt_pos_utc"]
df["time_spent"] = pd.to_timedelta(df["time_spent"])
df["time_travel"] = pd.to_timedelta(df["time_travel"])
df.loc[:, "days_spent"] = df["time_spent"].dt.days
df.loc[:, "days_travel"] = df["time_travel"].dt.days
# Convert to hours
df["time_spent"] = df["time_spent"].dt.total_seconds() / 3600
df["time_travel"] = df["time_travel"].dt.total_seconds() / 3600
df.loc[:, "unique_id"] = df.apply(
    lambda x: f"{x['mmsi']}_{x['imo']}_{x['vessel_name']}", axis=1
)
df["prev_geometry"] = df.groupby(group_by_cols)["geometry"].shift(1)
df["prev_port"] = df.groupby(group_by_cols)["Port"].shift(1)
df["prev_country"] = df.groupby(group_by_cols)["Country"].shift(1)

Remove False Positives#

df_filt = df.copy()
df_filt = df_filt.loc[df_filt.prev_departure_dt_pos_utc.notnull(), :].copy()
days_threshold = 60
df_filt = df_filt.loc[df_filt["days_travel"] <= days_threshold].copy()
print(f"Reduced from {len(df)} to {len(df_filt)}, {len(df_filt)/len(df)} pct")
Reduced from 17673910 to 16029171, 0.9069397207522274 pct
def calc_geodesic_distance(row):
    prev_coords = (row.prev_geometry.y, row.prev_geometry.x)
    next_coords = (row.geometry.y, row.geometry.x)
    return distance.distance(prev_coords, next_coords).km
df_filt.loc[:, "travel_distance"] = df_filt.parallel_apply(
    lambda x: calc_geodesic_distance(x), axis=1
)
df_filt.travel_distance.isna().sum()
0
time_threshold = 1
distance_threshold = 1000
print(
    f"Drop unlikely observations where travel time is less than {time_threshold} hour and travel distance is greater than {distance_threshold} km"
)
df2 = df_filt.loc[
    ~(
        (df_filt["time_travel"] < time_threshold)
        & (df_filt["travel_distance"] > distance_threshold)
    )
].copy()
Drop unlikely observations where travel time is less than 1 hour and travel distance is greater than 1000 km
print(f"Reduced from {len(df_filt)} to {len(df2)}")
Reduced from 16029171 to 15312307
# df2.to_csv(join(expanduser("~"), 'tmp', 'ais', 'portcalls_v2.csv'), index=False)
len(df2)
15312307
del df
del df_filt
# df2.loc[df2.Port==df2.prev_port].iloc[0]
# len(df2.loc[df2.Port==df2.prev_port])

Load Sea Routes Graph#

routes_dir = join(expanduser("~"), "tmp", "sea_routes")
with open(join(routes_dir, "G_sea_routes_fix.gpickle"), "rb") as f:
    G = pickle.load(f)
df2.reset_index(inplace=True, drop=True)
azimuthal = "ESRI:54032"
wgs = "EPSG:4326"
G.graph["crs"] = wgs  # wgs
fig, ax = ox.plot_graph(
    G,
    figsize=(12, 14),
    node_size=0.5,
    edge_linewidth=0.1,
    save=False,
    # filepath="./sea_routes.png",
    dpi=300,
)
# plt.savefig('./sea_routes.png', dpi=300)
../../_images/274744c95527d31fe2f7435b46eed1c685fb9f2c7a469d86a7632014511c17e3.png

Snap to Graph#

df_snapped = gn.pandana_snap_c(
    G, df2, source_crs=wgs, target_crs=azimuthal, add_dist_to_node_col=True
)
df_snapped.rename(
    columns={
        "NN": "end_id",
        "NN_dist": "end_port_dist",
    },
    inplace=True,
)
df_snapped.set_geometry("prev_geometry", inplace=True)
df_snapped.geometry
0              POINT (1.08364 49.43863)
1              POINT (4.36273 51.20349)
2              POINT (1.08392 49.43855)
3              POINT (4.94167 52.38167)
4              POINT (3.81155 51.34228)
                       ...             
15312302     POINT (-80.72000 -0.93000)
15312303     POINT (-79.91500 -2.27833)
15312304    POINT (-77.15557 -12.04702)
15312305      POINT (-79.64306 0.99516)
15312306    POINT (-77.14255 -12.03974)
Name: prev_geometry, Length: 15312307, dtype: geometry
df_snapped = gn.pandana_snap_c(
    G, df_snapped, source_crs=wgs, target_crs=azimuthal, add_dist_to_node_col=True
)
df_snapped.rename(
    columns={
        "NN": "start_id",
        "NN_dist": "start_port_dist",
    },
    inplace=True,
)
df3 = df_snapped.loc[df_snapped.start_id != df_snapped.end_id].copy()
print(f"Reduced from {len(df2)} to {len(df3)}")
Reduced from 15312307 to 11171321
del df2
del df_snapped
len(df3)
11171321

Check if route is already in distances#

distances = ox.graph_to_gdfs(G, nodes=False, edges=True)
distances.reset_index(inplace=True)
distances.head()
u v key geometry trip_count distance frequency n_components land_intersect index n_vertices
0 4410 3658 0 LINESTRING (-87.05329 45.76483, -87.03822 45.6... 1984063 460.638148 1.000000 1.0 1.0 7682 43
1 3658 7083 0 LINESTRING (-87.49139 41.67484, -87.47907 41.8... 1948666 372.753814 1.000000 1.0 1.0 7062 57
2 3658 2814 0 LINESTRING (-87.49139 41.67484, -87.44465 41.7... 1950240 362.893147 1.000000 1.0 1.0 7133 61
3 3658 7074 0 LINESTRING (-87.49139 41.67484, -87.44249 41.9... 2872419 357.698743 1.000000 1.0 1.0 9998 65
4 3658 7069 0 LINESTRING (-87.49139 41.67484, -87.45370 41.7... 1907240 518.318929 0.428571 1.0 1.0 6380 75
df4 = df3.copy()
# df4.loc[:, "sea_route"] = False
def check_route(row):
    find_route = distances.loc[
        (distances.u == row.start_id) & (distances.v == row.end_id)
    ].copy()
    if len(find_route) > 0:
        return True
    else:
        return False
df4.loc[:, "sea_route"] = df4.parallel_apply(lambda x: check_route(x), axis=1)
df4["sea_route"].isna().sum()
0
# for idx, row in tqdm(df4.iterrows()):
#     find_route = distances.loc[(distances.u==row.start_id) & (distances.v==row.end_id)].copy()
#     if len(find_route) > 0:
#         df4.loc[idx, "sea_route"] = True
df4.sea_route.value_counts()
sea_route
False    8455581
True     2715740
Name: count, dtype: int64
df4.to_parquet(join(expanduser("~"), "tmp", "ais", "portcalls_v3.parquet"), index=False)
# df4.to_csv(join(expanduser("~"), 'tmp', 'ais', 'portcalls_v3.csv'), index=False)
# del(df3)

Reload#

# df4 = pd.read_csv(join(expanduser("~"), 'tmp', 'ais', 'portcalls_v3.csv'))
# df4.loc[:, "geometry"] = df4.apply(lambda x: loads(x.geometry), axis=1)
# df4.loc[:, "prev_geometry"] = df4.apply(lambda x: loads(x.prev_geometry), axis=1)

Routing#

df_found = df4.loc[df4.sea_route == True].copy()
df_search = df4.loc[df4.sea_route == False].copy().reset_index(drop=True)

Get Distance From Existing Routes#

len(df_found), len(df_search)
(2715740, 8455581)
distances.sort_values(["u", "v", "distance"], inplace=True)
distances.head()
u v key geometry trip_count distance frequency n_components land_intersect index n_vertices
33466 26 1940 0 MULTILINESTRING ((-123.32715 48.40278, -123.31... 703167 10030.106970 0.200000 2.0 0.000181 12002821 544
33467 26 1940 1 MULTILINESTRING ((-123.32715 48.40278, -123.53... 1806381 13940.290558 0.800000 2.0 0.002746 12073163 725
33464 26 1973 0 MULTILINESTRING ((-123.32715 48.40278, -123.46... 1121874 7091.474099 0.100000 2.0 0.000337 11027902 742
33459 26 1975 1 MULTILINESTRING ((-123.32715 48.40278, -123.33... 2274155 14513.972521 0.666667 2.0 0.000000 6125524 882
33458 26 1975 0 MULTILINESTRING ((-123.32715 48.40278, -123.33... 120059 20701.123726 0.333333 2.0 0.000000 5954058 827
def get_distance(row):
    distance = distances.loc[
        (distances.u == row.start_id) & (distances.v == row.end_id)
    ].iloc[0]["distance"]
    return distance
df_found.loc[:, "distance"] = df_found.parallel_apply(lambda x: get_distance(x), axis=1)
# df_found.loc[:, "distance"] = df_found.apply(lambda x: get_distance(x), axis=1)
df_found["distance"].isna().sum()
0

Get Distance from Graph Routing#

import multiprocessing

multiprocessing.cpu_count()
256
# df_sub = df_search.head(100000).copy().reset_index(drop=True)
len(df_search)  # , len(df_sub)
8455581

4 minutes without cpu

%%time
routes = ox.shortest_path(
    G, list(df_search.start_id), list(df_search.end_id), weight="distance", cpus=12
)
CPU times: user 1min 39s, sys: 23.2 s, total: 2min 2s
Wall time: 22min 57s
len(routes)
8455581
df_search.loc[:, "route"] = routes
df_search.iloc[0].route
[2815, 2560, 3945]
# route = ox.utils_graph.route_to_gdf(G, routes[0], "distance")
def get_distance_graph(route):
    if route is not None:
        route_distance = ox.utils_graph.route_to_gdf(G, route, "distance")
        return route_distance["distance"].sum()
    else:
        return None
df_search.loc[:, "distance"] = df_search.parallel_apply(
    lambda x: get_distance_graph(x["route"]), axis=1
)
df_search["distance"].isna().sum()
1293001
len(df_search.loc[~df_search["distance"].isna()])
7162580
len(df_search.loc[df_search["distance"].isna()]) / len(df_search)
0.15291687230008205
row = df_search.loc[df_search["distance"].isna()].iloc[0].copy()
# row
route = ox.shortest_path(G, int(row.start_id), int(row.end_id), weight="distance")
route
df_new = pd.concat([df_found, df_search])
df_new.reset_index(inplace=True, drop=True)
# df_new.to_csv(join(expanduser("~"), 'tmp', 'ais', 'portcalls_v4.csv'), index=False)
# df_new.to_parquet(
#     join(expanduser("~"), "tmp", "ais", "portcalls_v4.parquet"), index=False
# )

Filter and Aggregate#

# df_new.loc[~df_new['distance'].isna()]['year']
# df_new.arrival_dt_pos_utc = pd.to_datetime(df_new.arrival_dt_pos_utc)
# df_new.loc[:, "year-month"] = df_new.loc[:, "arrival_dt_pos_utc"].dt.strftime("%Y-%m")
df_new.columns
Index(['mmsi', 'imo', 'vessel_name', 'route_group', 'length', 'width',
       'longitude', 'latitude', 'departure_longitude', 'departure_latitude',
       'vessel_type', 'arrival_dt_pos_utc', 'departure_dt_pos_utc',
       'arrival_draught', 'arrival_destination', 'departure_draught',
       'departure_destination', 'mean_sog', 'max_sog', 'min_sog', 'count_ais',
       'year', 'month', 'Country', 'Port', 'geometry',
       'prev_departure_dt_pos_utc', 'time_spent', 'time_travel', 'days_spent',
       'days_travel', 'unique_id', 'prev_geometry', 'prev_port',
       'prev_country', 'travel_distance', 'end_id', 'end_port_dist',
       'start_id', 'start_port_dist', 'sea_route', 'distance', 'route'],
      dtype='object')
group_cols = [
    "year",
    "month",
    "vessel_type",
    "Country",
    "Port",
    "prev_country",
    "prev_port",
]
data_cols = ["time_travel", "distance"]
df_filt = df_new.loc[~df_new["distance"].isna()].copy()
len(df_filt) / len(df_new)
0.8842571080000298
aois = ["Bab el-Mandeb Strait", "Cape of Good Hope", "Suez Canal"]
df_filt.loc[df_filt.Port == "Suez Canal", "Country"] = "Chokepoint Suez Canal"
df_filt.loc[
    df_filt.Port == "Cape of Good Hope", "Country"
] = "Chokepoint Cape of Good Hope"
df_filt.loc[
    df_filt.Port == "Bab el-Mandeb Strait", "Country"
] = "Chokepoint Bab el-Mandeb Strait"
df_filt.loc[df_filt.prev_port == "Suez Canal", "prev_country"] = "Chokepoint Suez Canal"
df_filt.loc[
    df_filt.prev_port == "Cape of Good Hope", "prev_country"
] = "Chokepoint Cape of Good Hope"
df_filt.loc[
    df_filt.prev_port == "Bab el-Mandeb Strait", "prev_country"
] = "Chokepoint Bab el-Mandeb Strait"
df_filt = df_filt.loc[df_filt["Country"] != df_filt["prev_country"]].copy()
len(df_filt) / len(df_new)
0.22451525652158774
len(df_filt)
2508132
# df_filt.loc[410609]
df_filt2 = df_filt.loc[
    (df_filt.Port.isin(aois)) | (df_filt.prev_port.isin(aois))
].copy()
len(df_filt), len(df_filt2)
(413, 2)
G = ox.distance.add_edge_lengths(G)
# route = df_filt.loc[410609]['route']
# route = df_filt2.iloc[1]["route"]
trip = df_filt2.iloc[1]
trip
mmsi                                                                 218553000
imo                                                                    9197478
vessel_name                                                             WIEBKE
route_group                                                                  3
length                                                                   152.0
width                                                                     20.0
longitude                                                            32.582492
latitude                                                             29.954372
departure_longitude                                                  32.563333
departure_latitude                                                   29.931667
vessel_type                                                              Cargo
arrival_dt_pos_utc                                         2023-01-29 19:49:59
departure_dt_pos_utc                                       2023-01-29 20:04:27
arrival_draught                                                            6.8
arrival_destination                                                      SGSIN
departure_draught                                                          6.8
departure_destination                                                    SGSIN
mean_sog                                                                  6.94
max_sog                                                                    7.0
min_sog                                                                    6.7
count_ais                                                                    5
year                                                                      2023
month                                                                        1
Country                                                  Chokepoint Suez Canal
Port                                                                Suez Canal
geometry                                       POINT (32.58249167 29.95437167)
prev_departure_dt_pos_utc                                  2023-01-19 17:29:10
time_spent                                                            0.241111
time_travel                                                         242.346944
days_spent                                                                   0
days_travel                                                               10.0
unique_id                                             218553000_9197478_WIEBKE
prev_geometry                                      POINT (4.22651 51.35933167)
prev_port                                                              Antwerp
prev_country                                                           Belgium
travel_distance                                                    3336.686391
end_id                                                              Suez Canal
end_port_dist                                                      2346.835598
start_id                                                                  2866
start_port_dist                                                   35081.829369
sea_route                                                                False
distance                                                           2815.173635
route                        [2866, Cape of Good Hope, 2053, 27090, 2111, B...
Name: 439, dtype: object
trip.start_id, trip.end_id
(2866, 'Suez Canal')
# r = ox.shortest_path(
#     G, 2866, 2214, weight="distance"
# )
trip.end_id
'Suez Canal'
r = ox.shortest_path(G, trip.start_id, trip.end_id, weight="distance")
---------------------------------------------------------------------------
ValueError                                Traceback (most recent call last)
Cell In[154], line 1
----> 1 r = ox.shortest_path(
      2     G, trip.start_id, trip.end_id, weight="distance"
      3 )

File ~/.conda/envs/ox2/lib/python3.12/site-packages/osmnx/routing.py:58, in shortest_path(G, orig, dest, weight, cpus)
     56 if not (hasattr(orig, "__iter__") and hasattr(dest, "__iter__")):
     57     msg = "orig and dest must either both be iterable or neither must be iterable"
---> 58     raise ValueError(msg)
     60 # if both orig and dest are iterable, ensure they have same lengths
     61 if len(orig) != len(dest):  # pragma: no cover

ValueError: orig and dest must either both be iterable or neither must be iterable
r
[[2866,
  'Cape of Good Hope',
  2053,
  27090,
  2111,
  'Bab el-Mandeb Strait',
  5322,
  6972,
  'Suez Canal']]
# route = [trip.start_id, trip.end_id]
route = trip.route
route
[2866,
 'Cape of Good Hope',
 2053,
 27090,
 2111,
 'Bab el-Mandeb Strait',
 5322,
 6972,
 'Suez Canal']
route
[2866,
 'Cape of Good Hope',
 2053,
 27090,
 2111,
 'Bab el-Mandeb Strait',
 5322,
 6972,
 'Suez Canal']
# edges = ox.graph_to_gdfs(G, nodes=False, edges=True)
# edges.reset_index(inplace=True)
# edges.head()
# edges.loc[(edges.u==2866) & (edges.v==842)]#.iloc[[0]].explore() #.explore()
# edges.loc[2866, 842].explore()
# ox.utils_graph.route_to_gdf(G, route, "distance")
route
[2866,
 'Cape of Good Hope',
 2053,
 27090,
 2111,
 'Bab el-Mandeb Strait',
 5322,
 6972,
 'Suez Canal']
ox.plot_graph_route(
    G,
    route,
    figsize=(12, 14),
    route_linewidth=6,
    node_size=0.5,
    edge_linewidth=0.1,
    bgcolor="k",
    route_color="r",
    save=False,
    # filepath="./route.png",
    dpi=300,
)
---------------------------------------------------------------------------
NotImplementedError                       Traceback (most recent call last)
Cell In[146], line 1
----> 1 ox.plot_graph_route(
      2     G,
      3     route,
      4     figsize=(12, 14),
      5     route_linewidth=6,
      6     node_size=0.5,
      7     edge_linewidth=0.1,
      8     bgcolor="k",
      9     route_color="r",
     10     save=False,
     11     # filepath="./route.png",
     12     dpi=300,
     13 )

File ~/.conda/envs/ox2/lib/python3.12/site-packages/osmnx/plot.py:319, in plot_graph_route(G, route, route_color, route_linewidth, route_alpha, orig_dest_size, ax, **pg_kwargs)
    316 data = min(G.get_edge_data(u, v).values(), key=lambda d: d["length"])
    317 if "geometry" in data:
    318     # if geometry attribute exists, add all its coords to list
--> 319     xs, ys = data["geometry"].xy
    320     x.extend(xs)
    321     y.extend(ys)

File ~/.conda/envs/ox2/lib/python3.12/site-packages/shapely/geometry/base.py:229, in BaseGeometry.xy(self)
    226 @property
    227 def xy(self):
    228     """Separate arrays of X and Y coordinate values"""
--> 229     raise NotImplementedError

NotImplementedError: 
../../_images/298d2742490bb817ba75bcff2c5c024519d77d68db6a0bf5452c01b181ee1150.png
# ox.plot_graph_route(
#     G,
#     route,
#     route_linewidth=6,
#     node_size=0.5,
#     bgcolor="k",
#     route_color="r",
#     save=False,
#     # filepath="./route.png",
#     dpi=300,
# )
fig, ax = ox.plot_graph(
    G,
    figsize=(12, 14),
    node_size=0.5,
    edge_linewidth=0.1,
    save=True,
    filepath="./sea_routes.png",
    dpi=300,
)
df_filt.to_parquet(
    join(expanduser("~"), "tmp", "ais", "portcalls_v5.parquet"), index=False
)
# df_filt.to_csv(join(expanduser("~"), "tmp", "ais", "portcalls_v5.csv"), index=False)
%%time
df_agg = df_filt.groupby(group_cols)[data_cols].sum()
CPU times: user 1.38 s, sys: 350 ms, total: 1.72 s
Wall time: 1.56 s
len(df_agg)
665086
df_agg = df_agg.reset_index()
df_agg.to_csv(join(expanduser("~"), "tmp", "ais", "port_calls_agg_v3.csv"), index=False)