Data Extraction#
The following notebook retrieves AIS data from the UN Global Platform.
Setup#
import pandas as pd
import geopandas as gpd
from shapely.geometry import mapping
import json
import pyspark.sql.functions as F
from ais import functions as af
from tqdm import tqdm
from IPython.core.interactiveshell import (
InteractiveShell,
) # allow multiple outputs in one jupyter cell
InteractiveShell.ast_node_interactivity = "all"
spark
SparkSession - in-memory
SparkContext
- Version
v3.5.0
- Master
k8s://https://10.100.0.1:443
- AppName
nb-d4ee6998-3c82-444f-931c-18193e6eb5b4-0f658
Port Boundaries#
Get port boundaries from IMF.
token = ""
project_id = 358
file_path = "data/AIS_Jasper/port_boundaries.geojson"
string = af.get_file_gitlab(
token,
project_id,
file_path,
org_path="https://code.officialstatistics.org",
branch="main",
csv_df=False,
)
imf = (
gpd.GeoDataFrame.from_features(json.loads(string))
.drop_duplicates()
.rename(columns={"geometry": "port_boundary"})
.set_geometry("port_boundary")
)
imf.info()
imf.head()
<class 'geopandas.geodataframe.GeoDataFrame'>
Int64Index: 1595 entries, 0 to 1594
Data columns (total 4 columns):
# Column Non-Null Count Dtype
--- ------ -------------- -----
0 port_boundary 1595 non-null geometry
1 Port_name 1595 non-null object
2 Country 1595 non-null object
3 Continent 1595 non-null object
dtypes: geometry(1), object(3)
memory usage: 62.3+ KB
port_boundary | Port_name | Country | Continent | |
---|---|---|---|---|
0 | POLYGON ((-149.93214 61.25829, -149.83906 61.2... | Anchorage | U.S.A. | North-America |
1 | POLYGON ((-148.68875 60.78098, -148.65289 60.7... | Whittier | U.S.A. | North-America |
2 | POLYGON ((-146.42760 61.11690, -146.38752 61.1... | Swanport | U.S.A. | North-America |
3 | POLYGON ((-130.38185 54.34481, -130.24392 54.3... | Prince Rupert | U.S.A. | North-America |
4 | POLYGON ((-123.13286 49.26700, -123.13218 49.3... | Vancouver | Canada | North-America |
imf = imf.drop_duplicates(subset=["Country", "Port_name"], keep="first")
Add Chokepoints#
len(imf)
1593
chokepoints_str = '{"type":"FeatureCollection","features":[{"type":"Feature","properties":{"Port_name":"Bab el-Mandeb Strait","Country":"chokepoint"},"geometry":{"type":"Polygon","coordinates":[[[43.309777,12.493873],[43.331355,12.474386],[43.514229,12.664987],[43.479704,12.669198],[43.479704,12.669198],[43.309777,12.493873]]]}},{"type":"Feature","properties":{"Port_name":"Suez Canal","Country":"chokepoint"},"geometry":{"type":"Polygon","coordinates":[[[32.553653,29.932182],[32.560088,29.926227],[32.583646,29.944185],[32.585828,29.955619],[32.580702,29.956848],[32.570668,29.942484],[32.570668,29.942484],[32.553653,29.932182]]]}},{"type":"Feature","properties":{"Port_name":"Cape of Good Hope","Country":"chokepoint"},"geometry":{"type":"Polygon","coordinates":[[[19.877499,-34.845363],[19.910717,-37.614427],[19.9628,-37.616709],[19.944734,-36.60437],[19.926568,-34.850397],[19.926568,-34.850397],[19.877499,-34.845363]]]}}]}'
chokepoints_gdf = (
gpd.GeoDataFrame.from_features(json.loads(chokepoints_str))
.rename(columns={"geometry": "port_boundary"})
.set_geometry("port_boundary")
)
imf = pd.concat([imf, chokepoints_gdf])
query_polys = [
(
f"{imf.Country.iloc[i]}:{imf.Port_name.iloc[i]}",
mapping(imf.port_boundary.iloc[i]),
)
for i in range(imf.shape[0])
]
len(query_polys)
query_polys[0]
1596
('U.S.A.:Anchorage',
{'type': 'Polygon',
'coordinates': (((-149.9321447362291, 61.258287735699795),
(-149.8390591326616, 61.2586723869542),
(-149.84713680900424, 61.20020539628371),
(-149.93829915629968, 61.19828214001165),
(-149.9321447362291, 61.258287735699795)),)})
Use port boundaries to identify hexabins to query in the AIS data. We choose resolution 10 because some polygons are too small to be fitted a lower resolution.
imf_df_hex = af.polygon_to_hex_df(query_polys, hex_resolution=10, overfill=False)
imf_df_hex.info()
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 6415967 entries, 0 to 6415966
Data columns (total 3 columns):
# Column Dtype
--- ------ -----
0 hex_id int64
1 polygon_name object
2 hex_resolution int64
dtypes: int64(2), object(1)
memory usage: 146.8+ MB
WARNING: there are hex ids contained in more than 1 polygon
imf_df_hex.tail()
hex_id | polygon_name | hex_resolution | |
---|---|---|---|
6415962 | 624811593067102207 | chokepoint:Cape of Good Hope | 10 |
6415963 | 625181089372012543 | chokepoint:Cape of Good Hope | 10 |
6415964 | 625181119436783615 | chokepoint:Cape of Good Hope | 10 |
6415965 | 625180943611559935 | chokepoint:Cape of Good Hope | 10 |
6415966 | 625180973944766463 | chokepoint:Cape of Good Hope | 10 |
len(imf_df_hex)
6415946
imf_df_hex.hex_id.duplicated().sum()
21
imf_df_hex = imf_df_hex[~(imf_df_hex.hex_id.duplicated())]
imf_df_hex.hex_id.duplicated().sum()
0
AIS Data#
imf_df_hex.tail(2)
hex_id | polygon_name | hex_resolution | |
---|---|---|---|
6415965 | 625180943611559935 | chokepoint:Cape of Good Hope | 10 |
6415966 | 625180973944766463 | chokepoint:Cape of Good Hope | 10 |
start_dates = pd.date_range("2023-01-01", "2024-03-31", freq="MS")
end_dates = pd.date_range("2023-01-01", "2024-03-31", freq="M")
keep_cols = [
"mmsi",
"dt_insert_utc",
"longitude",
"latitude",
"imo",
"vessel_name",
"vessel_type",
"vessel_type_cargo",
"vessel_class",
"length",
"width",
"flag_country",
"destination",
"draught",
"sog",
"cog",
"rot",
"heading",
"nav_status",
"dt_pos_utc",
"dt_static_utc",
"vessel_type_main",
"vessel_type_sub",
]
group_by_cols = ["mmsi", "imo", "vessel_name", "route_group"]
ACCESS_KEY = ""
SECRET_KEY = ""
for i in tqdm(range(0, len(start_dates))):
start_date = start_dates[i]
end_date = end_dates[i]
year, month = start_date.year, start_date.month
month_format = start_date.strftime("%m")
print(f"Processing {year}, Month: {month_format}")
# Get all AIS data
sdf = af.get_ais(
spark,
start_date=start_date,
end_date=end_date,
polygon_hex_df=imf_df_hex,
columns=keep_cols,
)
# Group AIS Data by unique route within each port
sdf_route = af.assign_route(
sdf.na.fill(0, ["imo", "mmsi"]),
ship_unique_identifier_cols=["mmsi", "imo", "vessel_name"],
route_order_by_cols=["dt_pos_utc", "dt_static_utc"],
polygon_col_name="polygon_name",
)
# Aggregate origin-destination data by route
sdf_agg = (
sdf_route.groupBy(group_by_cols)
.agg(
F.first("polygon_name").alias("polygon_name"),
F.first("length").alias("length"),
F.first("width").alias("width"),
F.first("longitude").alias("longitude"),
F.first("latitude").alias("latitude"),
F.last("longitude").alias("departure_longitude"),
F.last("latitude").alias("departure_latitude"),
F.min_by("vessel_type", "dt_pos_utc").alias("vessel_type"),
F.min("dt_pos_utc").alias("arrival_dt_pos_utc"),
F.max("dt_pos_utc").alias("departure_dt_pos_utc"),
F.min_by("draught", "dt_pos_utc").alias("arrival_draught"),
F.min_by("destination", "dt_pos_utc").alias("arrival_destination"),
F.max_by("draught", "dt_pos_utc").alias("departure_draught"),
F.max_by("destination", "dt_pos_utc").alias("departure_destination"),
F.mean("sog").alias("mean_sog"),
F.max("sog").alias("max_sog"),
F.min("sog").alias("min_sog"),
F.count("mmsi").alias("count_ais"),
)
.withColumn("year", F.year("arrival_dt_pos_utc"))
.withColumn("month", F.month("arrival_dt_pos_utc"))
.withColumn("poly_split", F.split("polygon_name", ":"))
.withColumn("Country", F.col("poly_split")[0])
.withColumn("Port", F.col("poly_split")[1])
.drop("polygon_name", "poly_split")
)
df = sdf_agg.toPandas()
df.to_csv(
f"s3://wbgdecinternal-ntl/Andres_Temp/AIS/red-sea/portcalls_gdf_{year}_{month_format}.csv",
index=False,
storage_options={"key": ACCESS_KEY, "secret": SECRET_KEY},
)
0%| | 0/6 [00:00<?, ?it/s]
Processing 2023, Month: 01
17%|█▋ | 1/6 [13:09<1:05:49, 789.83s/it]
Processing 2023, Month: 02
33%|███▎ | 2/6 [20:55<39:56, 599.17s/it]
Processing 2023, Month: 03
50%|█████ | 3/6 [28:53<27:11, 543.98s/it]
Processing 2023, Month: 04
67%|██████▋ | 4/6 [36:40<17:06, 513.48s/it]
Processing 2023, Month: 05
83%|████████▎ | 5/6 [44:41<08:21, 501.74s/it]
Processing 2023, Month: 06
100%|██████████| 6/6 [52:41<00:00, 526.85s/it]
spark.stop()