Data Extraction

Data Extraction#

The following notebook retrieves AIS data from the UN Global Platform.

Setup#

import pandas as pd
import geopandas as gpd
from shapely.geometry import mapping

import json

import pyspark.sql.functions as F
from ais import functions as af
from tqdm import tqdm
from IPython.core.interactiveshell import (
    InteractiveShell,
)  # allow multiple outputs in one jupyter cell

InteractiveShell.ast_node_interactivity = "all"
spark

SparkSession - in-memory

SparkContext

Spark UI

Version
v3.5.0
Master
k8s://https://10.100.0.1:443
AppName
nb-d4ee6998-3c82-444f-931c-18193e6eb5b4-0f658

Port Boundaries#

Get port boundaries from IMF.

token = ""
project_id = 358
file_path = "data/AIS_Jasper/port_boundaries.geojson"

string = af.get_file_gitlab(
    token,
    project_id,
    file_path,
    org_path="https://code.officialstatistics.org",
    branch="main",
    csv_df=False,
)

imf = (
    gpd.GeoDataFrame.from_features(json.loads(string))
    .drop_duplicates()
    .rename(columns={"geometry": "port_boundary"})
    .set_geometry("port_boundary")
)
imf.info()
imf.head()
<class 'geopandas.geodataframe.GeoDataFrame'>
Int64Index: 1595 entries, 0 to 1594
Data columns (total 4 columns):
 #   Column         Non-Null Count  Dtype   
---  ------         --------------  -----   
 0   port_boundary  1595 non-null   geometry
 1   Port_name      1595 non-null   object  
 2   Country        1595 non-null   object  
 3   Continent      1595 non-null   object  
dtypes: geometry(1), object(3)
memory usage: 62.3+ KB
port_boundary Port_name Country Continent
0 POLYGON ((-149.93214 61.25829, -149.83906 61.2... Anchorage U.S.A. North-America
1 POLYGON ((-148.68875 60.78098, -148.65289 60.7... Whittier U.S.A. North-America
2 POLYGON ((-146.42760 61.11690, -146.38752 61.1... Swanport U.S.A. North-America
3 POLYGON ((-130.38185 54.34481, -130.24392 54.3... Prince Rupert U.S.A. North-America
4 POLYGON ((-123.13286 49.26700, -123.13218 49.3... Vancouver Canada North-America
imf = imf.drop_duplicates(subset=["Country", "Port_name"], keep="first")

Add Chokepoints#

len(imf)
1593
chokepoints_str = '{"type":"FeatureCollection","features":[{"type":"Feature","properties":{"Port_name":"Bab el-Mandeb Strait","Country":"chokepoint"},"geometry":{"type":"Polygon","coordinates":[[[43.309777,12.493873],[43.331355,12.474386],[43.514229,12.664987],[43.479704,12.669198],[43.479704,12.669198],[43.309777,12.493873]]]}},{"type":"Feature","properties":{"Port_name":"Suez Canal","Country":"chokepoint"},"geometry":{"type":"Polygon","coordinates":[[[32.553653,29.932182],[32.560088,29.926227],[32.583646,29.944185],[32.585828,29.955619],[32.580702,29.956848],[32.570668,29.942484],[32.570668,29.942484],[32.553653,29.932182]]]}},{"type":"Feature","properties":{"Port_name":"Cape of Good Hope","Country":"chokepoint"},"geometry":{"type":"Polygon","coordinates":[[[19.877499,-34.845363],[19.910717,-37.614427],[19.9628,-37.616709],[19.944734,-36.60437],[19.926568,-34.850397],[19.926568,-34.850397],[19.877499,-34.845363]]]}}]}'
chokepoints_gdf = (
    gpd.GeoDataFrame.from_features(json.loads(chokepoints_str))
    .rename(columns={"geometry": "port_boundary"})
    .set_geometry("port_boundary")
)
imf = pd.concat([imf, chokepoints_gdf])
query_polys = [
    (
        f"{imf.Country.iloc[i]}:{imf.Port_name.iloc[i]}",
        mapping(imf.port_boundary.iloc[i]),
    )
    for i in range(imf.shape[0])
]
len(query_polys)

query_polys[0]
1596
('U.S.A.:Anchorage',
 {'type': 'Polygon',
  'coordinates': (((-149.9321447362291, 61.258287735699795),
    (-149.8390591326616, 61.2586723869542),
    (-149.84713680900424, 61.20020539628371),
    (-149.93829915629968, 61.19828214001165),
    (-149.9321447362291, 61.258287735699795)),)})

Use port boundaries to identify hexabins to query in the AIS data. We choose resolution 10 because some polygons are too small to be fitted a lower resolution.

imf_df_hex = af.polygon_to_hex_df(query_polys, hex_resolution=10, overfill=False)
imf_df_hex.info()
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 6415967 entries, 0 to 6415966
Data columns (total 3 columns):
 #   Column          Dtype 
---  ------          ----- 
 0   hex_id          int64 
 1   polygon_name    object
 2   hex_resolution  int64 
dtypes: int64(2), object(1)
memory usage: 146.8+ MB
WARNING: there are hex ids contained in more than 1 polygon
imf_df_hex.tail()
hex_id polygon_name hex_resolution
6415962 624811593067102207 chokepoint:Cape of Good Hope 10
6415963 625181089372012543 chokepoint:Cape of Good Hope 10
6415964 625181119436783615 chokepoint:Cape of Good Hope 10
6415965 625180943611559935 chokepoint:Cape of Good Hope 10
6415966 625180973944766463 chokepoint:Cape of Good Hope 10
len(imf_df_hex)
6415946
imf_df_hex.hex_id.duplicated().sum()
21
imf_df_hex = imf_df_hex[~(imf_df_hex.hex_id.duplicated())]
imf_df_hex.hex_id.duplicated().sum()
0

AIS Data#

imf_df_hex.tail(2)
hex_id polygon_name hex_resolution
6415965 625180943611559935 chokepoint:Cape of Good Hope 10
6415966 625180973944766463 chokepoint:Cape of Good Hope 10
start_dates = pd.date_range("2023-01-01", "2024-03-31", freq="MS")
end_dates = pd.date_range("2023-01-01", "2024-03-31", freq="M")

keep_cols = [
    "mmsi",
    "dt_insert_utc",
    "longitude",
    "latitude",
    "imo",
    "vessel_name",
    "vessel_type",
    "vessel_type_cargo",
    "vessel_class",
    "length",
    "width",
    "flag_country",
    "destination",
    "draught",
    "sog",
    "cog",
    "rot",
    "heading",
    "nav_status",
    "dt_pos_utc",
    "dt_static_utc",
    "vessel_type_main",
    "vessel_type_sub",
]
group_by_cols = ["mmsi", "imo", "vessel_name", "route_group"]

ACCESS_KEY = ""
SECRET_KEY = ""
for i in tqdm(range(0, len(start_dates))):
    start_date = start_dates[i]
    end_date = end_dates[i]
    year, month = start_date.year, start_date.month
    month_format = start_date.strftime("%m")
    print(f"Processing {year}, Month: {month_format}")

    # Get all AIS data
    sdf = af.get_ais(
        spark,
        start_date=start_date,
        end_date=end_date,
        polygon_hex_df=imf_df_hex,
        columns=keep_cols,
    )

    # Group AIS Data by unique route within each port
    sdf_route = af.assign_route(
        sdf.na.fill(0, ["imo", "mmsi"]),
        ship_unique_identifier_cols=["mmsi", "imo", "vessel_name"],
        route_order_by_cols=["dt_pos_utc", "dt_static_utc"],
        polygon_col_name="polygon_name",
    )

    # Aggregate origin-destination data by route
    sdf_agg = (
        sdf_route.groupBy(group_by_cols)
        .agg(
            F.first("polygon_name").alias("polygon_name"),
            F.first("length").alias("length"),
            F.first("width").alias("width"),
            F.first("longitude").alias("longitude"),
            F.first("latitude").alias("latitude"),
            F.last("longitude").alias("departure_longitude"),
            F.last("latitude").alias("departure_latitude"),
            F.min_by("vessel_type", "dt_pos_utc").alias("vessel_type"),
            F.min("dt_pos_utc").alias("arrival_dt_pos_utc"),
            F.max("dt_pos_utc").alias("departure_dt_pos_utc"),
            F.min_by("draught", "dt_pos_utc").alias("arrival_draught"),
            F.min_by("destination", "dt_pos_utc").alias("arrival_destination"),
            F.max_by("draught", "dt_pos_utc").alias("departure_draught"),
            F.max_by("destination", "dt_pos_utc").alias("departure_destination"),
            F.mean("sog").alias("mean_sog"),
            F.max("sog").alias("max_sog"),
            F.min("sog").alias("min_sog"),
            F.count("mmsi").alias("count_ais"),
        )
        .withColumn("year", F.year("arrival_dt_pos_utc"))
        .withColumn("month", F.month("arrival_dt_pos_utc"))
        .withColumn("poly_split", F.split("polygon_name", ":"))
        .withColumn("Country", F.col("poly_split")[0])
        .withColumn("Port", F.col("poly_split")[1])
        .drop("polygon_name", "poly_split")
    )

    df = sdf_agg.toPandas()
    df.to_csv(
        f"s3://wbgdecinternal-ntl/Andres_Temp/AIS/red-sea/portcalls_gdf_{year}_{month_format}.csv",
        index=False,
        storage_options={"key": ACCESS_KEY, "secret": SECRET_KEY},
    )
  0%|          | 0/6 [00:00<?, ?it/s]
Processing 2023, Month: 01
 17%|█▋        | 1/6 [13:09<1:05:49, 789.83s/it]
Processing 2023, Month: 02
 33%|███▎      | 2/6 [20:55<39:56, 599.17s/it]  
Processing 2023, Month: 03
 50%|█████     | 3/6 [28:53<27:11, 543.98s/it]
Processing 2023, Month: 04
 67%|██████▋   | 4/6 [36:40<17:06, 513.48s/it]
Processing 2023, Month: 05
 83%|████████▎ | 5/6 [44:41<08:21, 501.74s/it]
Processing 2023, Month: 06
100%|██████████| 6/6 [52:41<00:00, 526.85s/it]
spark.stop()