"""Generate a synthetic mobile-location ping dataset for the DDP Day workshop.

Workshop: "Mobile location data for transport" — DDP Day, Washington D.C.,
5 June 2026. This module produces ``data/sample_pings_dc.parquet``, a small,
deterministic, MIT/MPL-safe stand-in for an Irys or Quadrant feed that
attendees can load in seconds on a free Codespace.

Simulation model (see ``data/README.md`` for the full spec):

* Bounding box ~ 38.79–38.99 N, -77.12 to -76.91 W (Washington, D.C.).
* ~10,000 simulated devices over 7 days (Mon 1 Jun 2026 → Sun 7 Jun 2026 UTC).
* Each device draws a home cell from D.C. census block groups, weighted by
  2020 decennial population (a land-area proxy; see ``_load_tiger_block_groups``).
  With 70% probability the device also draws a WORK cell — but work is drawn
  from a short list of D.C. EMPLOYMENT CENTRES (downtown / K St, Federal
  Triangle, Capitol Hill, Foggy Bottom, L'Enfant, Navy Yard, ...) weighted by
  relative job density, NOT from the residential distribution. This mirrors how
  real commutes concentrate on a handful of job hubs, which is what lets an OD
  matrix aggregate above a k-anonymity threshold instead of being all
  singletons. Devices without a work cell still emit pings (errands, social).
* Heavy-tailed panel activity (the real-world imbalance the workshop teaches):
  each device is "transient" (20%), "occasional" (28%), "regular" (37%), or
  "power" (15%). The class sets how many of the 7 days the device is present and
  a per-device WEEKLY PING BUDGET drawn from a log-normal. After a device's full
  bursty week is simulated, it is thinned (whole bursts dropped at random) down
  to that budget. This reproduces the two pathologies a real SDK panel shows and
  this sample previously lacked: a sparse tail (~8% of devices emit a single
  ping, ~30% emit <=5) and heavy skew (the top 10% of devices hold ~half of all
  pings, Gini ~0.66). The class label is NOT written to the file — analysts must
  discover the imbalance, exactly as with a real feed.
* Weekdays (Mon–Fri):
    - Home → work AM peak (departure ~07:30 local, σ ~45 min).
    - Work → home PM peak (departure ~17:30 local, σ ~60 min).
    - ~20% of work-having devices take a short midday hop near work.
* Weekends (Sat–Sun):
    - 1–3 random trips per day to a "leisure" POI sampled from a small list
      of D.C. landmarks (National Mall, Georgetown, U Street, Anacostia
      waterfront, etc.). Lower overall ping volume than weekdays.
* Trip path: linear interpolation between origin and destination, with small
  Gaussian jitter (~10 m std) along the path.
* Bursty emission (the event-triggered SDK signature): pings are NOT evenly
  spaced. They cluster into bursts — an app wakes, fires a handful of pings a
  few seconds apart, then goes quiet for many minutes. Burst onsets are Poisson;
  each burst holds ``1 + Poisson(λ)`` pings. The result is the real-data shape:
  the majority of consecutive gaps are sub-minute (within a burst) with a long
  tail of multi-minute silences (between bursts).
* Horizontal error: Gaussian noise on the reported lat/lon. σ = 8 m at "open
  sky" (movement segments, leisure POIs) and σ = 25 m inside a rough
  "urban canyon" box covering downtown D.C. (38.89–38.91 N, -77.04 to
  -77.02 W).
* Indoor gaps: when the device is stationary at home or work for >2 h, ~50%
  of pings are dropped (the SDK goes quiet when the phone is idle).
* Source split: roughly 50/50 between ``"irys"`` and ``"quadrant"``. The
  ``device_id`` is a 16-hex-char prefix of SHA-256, prefixed with ``ir_`` or
  ``qd_``. This stable scheme mirrors how MAIDs are pseudonymised in real
  feeds — see Subject Brief §1–2 for the real Irys / Quadrant schemas.

Determinism: the entire pipeline draws from a single ``numpy.random.default_rng``
seeded with ``SEED`` (default 20260605). No ``random``, no ``time.time()``,
no clock-dependent state. The DC boundary file is cached to
``_planning/dc_boundaries.geojson`` (gitignored) so reruns are offline.

Usage::

    python scripts/generate_sample.py
    python scripts/generate_sample.py --output data/sample_pings_dc.parquet \
        --n-devices 2000 --seed 20260605

Constraints:
    * No paid APIs, no GPU, no Docker.
    * Stdlib + (pandas, numpy, pyarrow, shapely, geopandas), no new deps.
    * Must complete in < 60 s on a free Codespace (2 cores, 8 GB RAM).
    * Output file target: < 100 MB Parquet (zstd).
"""

from __future__ import annotations

import argparse
import hashlib
import logging
import sys
import time
import warnings
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Iterable
from urllib.error import URLError

import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq

# -----------------------------------------------------------------------------
# Constants / defaults
# -----------------------------------------------------------------------------

SEED = 20260605
N_DEVICES_DEFAULT = 10_000

REPO_ROOT = Path(__file__).resolve().parent.parent
DEFAULT_OUTPUT = REPO_ROOT / "data" / "sample_pings_dc.parquet"
BOUNDARY_CACHE = REPO_ROOT / "_planning" / "dc_boundaries.geojson"

# DC census block group TIGER/Line 2020 (state FIPS 11 = District of Columbia).
# ~540 KB ZIP; no API key required.
TIGER_BG_URL = "https://www2.census.gov/geo/tiger/TIGER2020/BG/tl_2020_11_bg.zip"

# Bounding box for D.C. (spec).
DC_BBOX = (38.79, 38.99, -77.12, -76.91)  # (lat_min, lat_max, lon_min, lon_max)

# Rough "urban canyon" box around downtown D.C. — higher horizontal error here.
URBAN_CANYON = (38.89, 38.91, -77.04, -77.02)  # (lat_min, lat_max, lon_min, lon_max)

# Eastern Time is UTC-4 on every day in our window (1–7 Jun 2026 is fully DST).
LOCAL_UTC_OFFSET_HOURS = -4

# Movement / timing parameters.
# Bursty emission: pings cluster into bursts (an app wakes, fires a few pings
# seconds apart, then sleeps). Burst onsets follow a Poisson process with the
# mean inter-burst gap below; each burst holds 1 + Poisson(BURST_SIZE_LAMBDA)
# pings spaced ~WITHIN_BURST_GAP_MIN apart. This is what makes most consecutive
# gaps sub-minute with a long tail of multi-minute silences, the way a real SDK
# feed looks — instead of an evenly spaced GPS-logger cadence.
INTER_BURST_MOVING_MIN = 3.0         # mean minutes between burst onsets, moving
INTER_BURST_STATIONARY_MIN = 6.0     # mean minutes between burst onsets, idle
WITHIN_BURST_GAP_MIN = 0.25          # mean minutes between pings in a burst (~15 s)
BURST_SIZE_LAMBDA = 2.2              # burst size = 1 + Poisson(λ); mean ≈ 3.2 pings
BURST_GAP_MIN = 2.0                  # gaps larger than this delimit bursts when thinning
ACCURACY_OPEN_SIGMA_M = 8.0
ACCURACY_URBAN_SIGMA_M = 25.0
JITTER_STD_M = 10.0
AM_DEPART_LOCAL_HOUR = 7.5     # 07:30 local
AM_DEPART_STD_HOURS = 0.75     # 45 min
PM_DEPART_LOCAL_HOUR = 17.5    # 17:30 local
PM_DEPART_STD_HOURS = 1.0      # 60 min
COMMUTE_SPEED_KMH = 22.0       # typical urban driving / mixed-mode speed
WEEKEND_TRIP_SPEED_KMH = 18.0
LEISURE_DURATION_HOURS = (1.0, 3.5)  # uniform range
WORK_PROBABILITY = 0.70
MIDDAY_HOP_PROBABILITY = 0.20

# Heavy-tailed device-activity model. Each device is assigned one class; the
# class sets how many of the 7 days it is present and a WEEKLY PING BUDGET drawn
# from a log-normal (median, sigma). The device's full bursty week is simulated
# and then thinned — whole bursts dropped at random — down to the budget. This
# is what produces the real panel shape: a sparse tail (transient/occasional
# devices reduced to 1..a-few clustered sightings) under a heavy head (power
# users that keep hundreds of pings). Calibrated against a real Quadrant daily
# feed (Bangkok, 2024-01-01): ~8% single-ping devices, ~30% <=5 pings, top 10%
# of devices holding ~half the pings, Gini ~0.66. The class label is NOT written
# to the output — analysts must discover the imbalance.
ACTIVITY_CLASSES = ("transient", "occasional", "regular", "power")
ACTIVITY_PROBS = (0.20, 0.28, 0.37, 0.15)    # sum to 1.0
ACTIVITY_BUDGET = {                           # weekly ping budget ~ lognormal(median, sigma)
    "transient": (2.0, 0.70),                 # 1..~5 pings — the sparse tail
    "occasional": (7.0, 0.75),                # ~2..30 pings
    "regular": (170.0, 0.55),                 # ~60..450 — the backbone, home/work inferable
    "power": (680.0, 0.60),                   # ~250..2700 — power users dominate volume
}
ACTIVITY_ACTIVE_DAYS = {                      # days present out of 7 (low, high)
    "transient": (1, 1),
    "occasional": (1, 3),
    "regular": (5, 7),
    "power": (7, 7),
}

# Hardcoded D.C. leisure POIs (lat, lon, name). These are well-known landmarks
# / districts that real foot-traffic studies often anchor on.
LEISURE_POIS: list[tuple[float, float, str]] = [
    (38.8893, -77.0502, "Lincoln Memorial / National Mall west"),
    (38.8895, -77.0353, "Washington Monument / Mall central"),
    (38.8977, -77.0365, "White House Ellipse"),
    (38.9072, -77.0369, "U Street corridor"),
    (38.9051, -77.0619, "Georgetown waterfront"),
    (38.9221, -77.0469, "Adams Morgan"),
    (38.8761, -76.9947, "Anacostia waterfront"),
    (38.8898, -77.0091, "Capitol Hill / Eastern Market"),
    (38.9203, -77.0210, "H Street NE"),
    (38.8483, -77.0451, "Nationals Park / Navy Yard"),
]

# D.C. employment centres (lat, lon, name, relative job weight). WORK cells are
# drawn from THIS short list — not the residential population surface — so that
# commute destinations concentrate on a handful of job hubs the way real
# commutes do. That concentration is what lets the OD matrix aggregate above a
# k-anonymity threshold instead of dissolving into singletons. Weights are
# rough relative employment magnitudes (downtown / federal core largest).
EMPLOYMENT_CENTERS: list[tuple[float, float, str, float]] = [
    (38.9012, -77.0390, "Downtown / Golden Triangle (K St)", 100.0),
    (38.8940, -77.0307, "Federal Triangle", 70.0),
    (38.8899, -77.0091, "Capitol Hill / Capitol complex", 55.0),
    (38.9000, -77.0466, "Foggy Bottom / GWU / State / IMF-WB", 50.0),
    (38.8841, -77.0214, "L'Enfant Plaza / SW Federal", 48.0),
    (38.8977, -77.0227, "Penn Quarter / Judiciary Square", 42.0),
    (38.8993, -77.0086, "Union Station / NoMa", 45.0),
    (38.8760, -77.0020, "Navy Yard / Capitol Riverfront", 40.0),
    (38.9096, -77.0434, "Dupont Circle", 35.0),
    (38.9051, -77.0619, "Georgetown", 28.0),
    (38.9293, -77.0320, "Columbia Heights", 22.0),
    (38.9596, -77.0858, "Friendship Heights", 25.0),
    (38.9436, -77.0776, "Van Ness / UDC", 16.0),
    (38.8570, -76.9960, "St. Elizabeths / DHS (Anacostia)", 18.0),
]
EMPLOYMENT_SPREAD_M = 120.0  # intra-centre Gaussian σ for work-point jitter (m).
                             # Kept tight (~1 city block) so each centre snaps to
                             # mostly ONE ~1 km OD cell — that concentration is
                             # what lets commute flows clear a k-anonymity floor.

# Fallback DC ward populations (2020 Decennial). Used only when the TIGER
# download is unavailable. Coordinates are ward centroids (approximate). Used
# to weight home/work draws when the precise block-group geometry is missing.
# Source: DC Open Data + 2020 Census P1 redistricting summary, accessed
# 25 May 2026.
DC_WARD_FALLBACK: list[tuple[str, float, float, int]] = [
    # (ward_id, centroid_lat, centroid_lon, 2020 population)
    ("Ward 1", 38.9269, -77.0356, 88_840),
    ("Ward 2", 38.9050, -77.0500, 81_590),
    ("Ward 3", 38.9418, -77.0772, 86_330),
    ("Ward 4", 38.9620, -77.0353, 86_580),
    ("Ward 5", 38.9320, -76.9870, 92_650),
    ("Ward 6", 38.8830, -76.9990, 109_960),
    ("Ward 7", 38.8930, -76.9410, 78_240),
    ("Ward 8", 38.8460, -76.9990, 86_380),
]

EARTH_RADIUS_M = 6_371_000.0
LAT_DEG_M = 111_320.0  # metres per degree latitude (good enough at DC scale)


logger = logging.getLogger("generate_sample")


# -----------------------------------------------------------------------------
# Geography: load DC block groups, build a population-weighted point sampler
# -----------------------------------------------------------------------------

@dataclass
class PointSampler:
    """Population-weighted sampler of (lat, lon) home / work / leisure cells.

    Each "cell" is one row with a representative (lat, lon) and a population
    weight. We sample a cell with probability proportional to population and
    then jitter inside it (Gaussian, σ scaled to the cell's approximate size).
    """

    lats: np.ndarray            # centroid latitudes
    lons: np.ndarray            # centroid longitudes
    weights: np.ndarray         # population (or proxy)
    spread_deg: np.ndarray      # per-cell σ for intra-cell jitter, degrees
    source: str                 # "tiger" or "fallback" — recorded in README

    def __post_init__(self) -> None:
        w = np.asarray(self.weights, dtype=np.float64)
        w = np.where(w > 0, w, 0.0)
        if w.sum() <= 0:
            raise ValueError("Population weights sum to zero.")
        self._probs = w / w.sum()

    def sample(self, n: int, rng: np.random.Generator) -> tuple[np.ndarray, np.ndarray]:
        idx = rng.choice(len(self.lats), size=n, p=self._probs, replace=True)
        lat0 = self.lats[idx]
        lon0 = self.lons[idx]
        spread = self.spread_deg[idx]
        # Gaussian jitter inside the cell. Longitude spread is roughly the same
        # in degrees at DC latitude (cos(38.9°) ≈ 0.78); we keep it simple.
        lat = lat0 + rng.normal(0.0, spread)
        lon = lon0 + rng.normal(0.0, spread / np.cos(np.deg2rad(lat0)))
        # Clip into the DC bounding box so jitter cannot leak outside.
        lat = np.clip(lat, DC_BBOX[0], DC_BBOX[1])
        lon = np.clip(lon, DC_BBOX[2], DC_BBOX[3])
        return lat, lon


def _load_tiger_block_groups() -> PointSampler | None:
    """Try to load DC block groups from TIGER/Line, fall back to None on failure.

    Block-group population is loaded from the 2020 Decennial Census P1 table
    (total population) attached as an attribute on the TIGER shapefile via
    ``ALAND`` / ``AWATER`` — but the shapefile itself does not include the
    population count. To avoid a second download (and an API key for ACS),
    we approximate block-group population by **land area**: bigger BGs are
    almost always less dense in DC's footprint. The single-source TIGER
    download is enough to demo a realistic spatial distribution at workshop
    scale. The README documents this approximation.

    Returns ``None`` if the file cannot be downloaded or parsed; the caller
    will then use the ward-level fallback.
    """
    import geopandas as gpd  # local import keeps top-level import fast

    BOUNDARY_CACHE.parent.mkdir(parents=True, exist_ok=True)

    try:
        if BOUNDARY_CACHE.exists():
            logger.info("Using cached DC boundary file: %s", BOUNDARY_CACHE)
            bgs = gpd.read_file(BOUNDARY_CACHE)
        else:
            logger.info("Downloading DC block groups from TIGER: %s", TIGER_BG_URL)
            with warnings.catch_warnings():
                warnings.simplefilter("ignore")
                bgs = gpd.read_file(TIGER_BG_URL)
            # Reproject to WGS-84 for safety and write a small GeoJSON cache.
            bgs = bgs.to_crs(epsg=4326)
            bgs.to_file(BOUNDARY_CACHE, driver="GeoJSON")
            logger.info("Cached DC boundary file: %s (%d block groups)",
                        BOUNDARY_CACHE, len(bgs))
    except (URLError, OSError, Exception) as exc:  # broad: any IO failure
        logger.warning(
            "Could not load TIGER block groups (%s). Falling back to ward "
            "centroids. Workshop demo is unaffected.", exc
        )
        return None

    bgs = bgs.to_crs(epsg=4326)
    # Reps point inside polygon (better than centroid for concave shapes).
    reps = bgs.geometry.representative_point()
    lats = reps.y.to_numpy()
    lons = reps.x.to_numpy()

    # Approximate population weight ∝ inverse of land area (denser BG = more
    # people for a given footprint). ALAND is in square metres. We avoid
    # divide-by-zero and clip so a single tiny BG cannot dominate.
    land_m2 = bgs["ALAND"].to_numpy(dtype=np.float64)
    land_m2 = np.where(land_m2 > 1.0, land_m2, 1.0)
    # Heuristic: weight = sqrt(typical-BG-area / area). This pushes more draws
    # into small (dense) BGs without making the largest BGs disappear.
    typical = np.median(land_m2)
    weights = np.sqrt(typical / land_m2)

    # Per-cell jitter σ (degrees): scale from the polygon's bounding-box width
    # so urban BGs (small) jitter less than rural ones (big). Cap at ~250 m so
    # we never throw a "home" wildly across a neighbourhood.
    bounds = bgs.geometry.bounds  # minx, miny, maxx, maxy
    widths_deg = (bounds["maxx"] - bounds["minx"]).to_numpy()
    heights_deg = (bounds["maxy"] - bounds["miny"]).to_numpy()
    spread_deg = np.minimum(np.maximum(widths_deg, heights_deg) / 4.0,
                            250.0 / LAT_DEG_M)
    spread_deg = np.maximum(spread_deg, 30.0 / LAT_DEG_M)  # at least ~30 m

    return PointSampler(lats=lats, lons=lons, weights=weights,
                        spread_deg=spread_deg, source="tiger")


def _ward_fallback_sampler() -> PointSampler:
    """Build a sampler from the hardcoded DC ward populations.

    The ward is much coarser than a block group (~80–110k people each), so we
    use a larger intra-cell jitter (~3 km std) and accept that the spatial
    pattern looks blockier. This path only triggers when TIGER is unreachable.
    """
    lats = np.array([row[1] for row in DC_WARD_FALLBACK])
    lons = np.array([row[2] for row in DC_WARD_FALLBACK])
    pops = np.array([row[3] for row in DC_WARD_FALLBACK], dtype=np.float64)
    spread_deg = np.full(len(lats), 3_000.0 / LAT_DEG_M)
    return PointSampler(lats=lats, lons=lons, weights=pops,
                        spread_deg=spread_deg, source="fallback")


def build_point_sampler() -> PointSampler:
    """Return a population-weighted sampler, trying TIGER first then fallback."""
    sampler = _load_tiger_block_groups()
    if sampler is not None:
        return sampler
    return _ward_fallback_sampler()


def build_employment_sampler() -> PointSampler:
    """Return a job-weighted sampler over the D.C. employment centres.

    Unlike the residential sampler, this draws from the short fixed list in
    ``EMPLOYMENT_CENTERS`` so that commute *destinations* concentrate on a few
    job hubs — the property that lets an OD matrix aggregate above a
    k-anonymity threshold instead of being all singletons.
    """
    lats = np.array([c[0] for c in EMPLOYMENT_CENTERS], dtype=np.float64)
    lons = np.array([c[1] for c in EMPLOYMENT_CENTERS], dtype=np.float64)
    weights = np.array([c[3] for c in EMPLOYMENT_CENTERS], dtype=np.float64)
    spread_deg = np.full(len(lats), EMPLOYMENT_SPREAD_M / LAT_DEG_M)
    return PointSampler(lats=lats, lons=lons, weights=weights,
                        spread_deg=spread_deg, source="employment_centers")


# Gravity parameters for home -> work assignment.
GRAVITY_ALPHA = 2.0     # distance-decay exponent; higher = stronger "work near home"
GRAVITY_D0_KM = 1.0     # softening so the nearest centre doesn't get all the mass


def sample_work_gravity(
    home_lat: np.ndarray, home_lon: np.ndarray, rng: np.random.Generator
) -> tuple[np.ndarray, np.ndarray]:
    """Assign a work location to each device by a gravity (distance-decay) model.

    Each device's workplace is drawn from ``EMPLOYMENT_CENTERS`` with probability
    proportional to ``job_weight / (distance_from_home + D0)**ALPHA``. Real
    commuting has strong distance decay (people disproportionately work near
    where they live), so this both adds realism and *correlates* home and work:
    nearby homes funnel into the same nearby job hub, which concentrates
    (origin, destination) pairs and lets more OD flows survive a k-anonymity
    threshold than independent home/work draws would.

    Returns jittered work (lat, lon) arrays, clipped to the DC bounding box.
    """
    clat = np.array([c[0] for c in EMPLOYMENT_CENTERS], dtype=np.float64)
    clon = np.array([c[1] for c in EMPLOYMENT_CENTERS], dtype=np.float64)
    cw = np.array([c[3] for c in EMPLOYMENT_CENTERS], dtype=np.float64)

    # Equirectangular distance (km) from every home to every centre: (n, n_c).
    dlat_m = (home_lat[:, None] - clat[None, :]) * LAT_DEG_M
    dlon_m = (home_lon[:, None] - clon[None, :]) * LAT_DEG_M * np.cos(np.deg2rad(home_lat[:, None]))
    dist_km = np.sqrt(dlat_m ** 2 + dlon_m ** 2) / 1_000.0

    w = cw[None, :] / np.power(dist_km + GRAVITY_D0_KM, GRAVITY_ALPHA)
    w /= w.sum(axis=1, keepdims=True)
    cdf = np.cumsum(w, axis=1)
    u = rng.random(len(home_lat))
    idx = (u[:, None] > cdf).sum(axis=1).clip(0, len(EMPLOYMENT_CENTERS) - 1)

    spread = EMPLOYMENT_SPREAD_M / LAT_DEG_M
    base_lat = clat[idx]
    base_lon = clon[idx]
    wlat = base_lat + rng.normal(0.0, spread, size=len(idx))
    wlon = base_lon + rng.normal(0.0, spread / np.cos(np.deg2rad(base_lat)), size=len(idx))
    wlat = np.clip(wlat, DC_BBOX[0], DC_BBOX[1])
    wlon = np.clip(wlon, DC_BBOX[2], DC_BBOX[3])
    return wlat, wlon


# -----------------------------------------------------------------------------
# Device + ping generation
# -----------------------------------------------------------------------------

def _make_device_ids(n: int, source_assignment: np.ndarray) -> np.ndarray:
    """Build stable, hash-based device IDs.

    Mirrors the pseudonymisation patterns used by real vendors:
    Irys-style IDs get ``ir_`` prefix, Quadrant-style get ``qd_``. The hash
    input is a deterministic string keyed on the device index, so reruns of
    this script with the same seed and ``--n-devices`` produce identical IDs.
    """
    ids = np.empty(n, dtype=object)
    for i in range(n):
        h = hashlib.sha256(f"workshop-device-{i}".encode()).hexdigest()[:16]
        prefix = "ir_" if source_assignment[i] == "irys" else "qd_"
        ids[i] = f"{prefix}{h}"
    return ids


def _is_urban_canyon(lat: np.ndarray, lon: np.ndarray) -> np.ndarray:
    """Boolean mask: is each (lat, lon) inside the rough downtown box."""
    lat_min, lat_max, lon_min, lon_max = URBAN_CANYON
    return (lat >= lat_min) & (lat <= lat_max) & (lon >= lon_min) & (lon <= lon_max)


def _haversine_km(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
    """Great-circle distance in km. Scalar inputs only."""
    rlat1, rlat2 = np.deg2rad(lat1), np.deg2rad(lat2)
    dlat = rlat2 - rlat1
    dlon = np.deg2rad(lon2 - lon1)
    a = np.sin(dlat / 2.0) ** 2 + np.cos(rlat1) * np.cos(rlat2) * np.sin(dlon / 2.0) ** 2
    return 2.0 * EARTH_RADIUS_M * np.arcsin(np.sqrt(a)) / 1_000.0


def _bursty_times(
    duration_min: float, inter_burst_min: float, rng: np.random.Generator
) -> np.ndarray:
    """Event times (minutes from segment start) for a *bursty* point process.

    Real SDK pings arrive in clusters: an app wakes, fires a handful of pings a
    few seconds apart, then goes quiet for many minutes. We model burst onsets as
    a Poisson process (mean gap ``inter_burst_min``); each burst holds
    ``1 + Poisson(BURST_SIZE_LAMBDA)`` pings spaced ~``WITHIN_BURST_GAP_MIN``
    apart (Exponential). The signature this produces — most consecutive gaps
    sub-minute (within a burst), with a long tail of multi-minute silences
    (between bursts) — is what an evenly spaced Poisson cadence cannot. Fully
    vectorised (no per-burst Python loop) so it stays cheap at 12k devices.

    Returns a sorted 1-D array of times in ``[0, duration_min)``.
    """
    if duration_min <= 0:
        return np.empty(0, dtype=np.float64)
    # Oversample burst onsets, then keep those landing inside the segment.
    n_est = int(duration_min / max(inter_burst_min, 1e-6) * 2) + 5
    onsets = np.cumsum(rng.exponential(inter_burst_min, size=n_est))
    onsets = onsets[onsets < duration_min]
    if onsets.size == 0:
        onsets = np.array([rng.uniform(0.0, duration_min)])
    sizes = 1 + rng.poisson(BURST_SIZE_LAMBDA, size=onsets.size)
    total = int(sizes.sum())
    first_idx = np.cumsum(sizes) - sizes          # global index of each burst's first ping
    within = rng.exponential(WITHIN_BURST_GAP_MIN, size=total)
    within[first_idx] = 0.0                        # first ping sits exactly at the onset
    cum = np.cumsum(within)
    base = np.concatenate(([0.0], cum[:-1]))       # exclusive cumsum
    offsets = cum - np.repeat(base[first_idx], sizes)
    times = np.repeat(onsets, sizes) + offsets
    times = times[(times >= 0.0) & (times < duration_min)]
    if times.size == 0:
        times = np.array([min(duration_min * 0.5, duration_min)])
    return np.sort(times)


def _generate_trip_pings(
    rng: np.random.Generator,
    origin_lat: float, origin_lon: float,
    dest_lat: float, dest_lon: float,
    depart_ts_ms: int,
    speed_kmh: float,
    density: float = 1.0,
) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
    """Generate ping (lat, lon, timestamp_ms) along a straight-line trip.

    Linear interpolation in lat/lon is fine at DC scale (~10 km extent ⇒
    sub-metre distortion vs. great-circle). Ping times follow the bursty process
    in :func:`_bursty_times` (clusters of sub-minute pings separated by minute+
    silences), so the trace looks event-triggered rather than evenly sampled.
    ``density`` scales the burst rate (reserved; defaults to 1.0). Position is
    jittered with ~10 m Gaussian along the path.

    Returns three arrays: lat, lon, timestamp_ms (all length variable).
    """
    distance_km = _haversine_km(origin_lat, origin_lon, dest_lat, dest_lon)
    if distance_km < 0.05:  # < 50 m: no trip, single ping at origin
        return (np.array([origin_lat]), np.array([origin_lon]),
                np.array([depart_ts_ms], dtype=np.int64))

    duration_min = (distance_km / speed_kmh) * 60.0

    # Bursty ping times along the trip, then anchor the departure and arrival so
    # the trip stays bracketed in the trace (the caller reads the arrival time).
    inter_burst = INTER_BURST_MOVING_MIN / max(density, 1e-6)
    times = _bursty_times(duration_min, inter_burst, rng)
    times = np.unique(np.concatenate([[0.0], times, [duration_min]]))
    fractions = times / duration_min

    # Linear interpolation in lat/lon, then jitter perpendicular to motion is
    # approximated by independent Gaussian draws in each axis (sufficient at
    # workshop fidelity).
    lats = origin_lat + (dest_lat - origin_lat) * fractions
    lons = origin_lon + (dest_lon - origin_lon) * fractions
    jitter_lat = rng.normal(0.0, JITTER_STD_M / LAT_DEG_M, size=lats.shape)
    jitter_lon = rng.normal(0.0, JITTER_STD_M / (LAT_DEG_M * np.cos(np.deg2rad(lats))))
    lats = lats + jitter_lat
    lons = lons + jitter_lon

    timestamps_ms = (depart_ts_ms + times * 60_000.0).astype(np.int64)
    return lats, lons, timestamps_ms


def _generate_stationary_pings(
    rng: np.random.Generator,
    lat: float, lon: float,
    start_ts_ms: int, end_ts_ms: int,
    drop_indoor_fraction: float = 0.5,
    density: float = 1.0,
) -> tuple[np.ndarray, np.ndarray, np.ndarray]:
    """Generate sparser pings while the device sits at a fixed location.

    Ping times follow the same bursty process as trips (:func:`_bursty_times`)
    but with a longer mean gap between bursts, since an idle phone emits less
    often. If the dwell exceeds 2 h, ~50% of pings are dropped uniformly to mimic
    the SDK going quiet when the phone is idle indoors. ``density`` scales the
    burst rate (reserved; defaults to 1.0).
    """
    duration_min = max(0.0, (end_ts_ms - start_ts_ms) / 60_000.0)
    if duration_min < 1.0:
        return (np.array([], dtype=np.float64),
                np.array([], dtype=np.float64),
                np.array([], dtype=np.int64))

    inter_burst = INTER_BURST_STATIONARY_MIN / max(density, 1e-6)
    times = _bursty_times(duration_min, inter_burst, rng)

    # Jitter the reported position slightly even though the device is "still"
    # — that mimics GPS noise indoors.
    lats = lat + rng.normal(0.0, JITTER_STD_M / LAT_DEG_M, size=times.shape)
    lons = lon + rng.normal(0.0, JITTER_STD_M / (LAT_DEG_M * np.cos(np.deg2rad(lat))),
                            size=times.shape)
    timestamps_ms = (start_ts_ms + times * 60_000.0).astype(np.int64)

    # Indoor gaps: drop ~50% of pings on dwells >2 h (uniform mask).
    if duration_min > 120.0 and drop_indoor_fraction > 0:
        keep = rng.random(size=timestamps_ms.shape) > drop_indoor_fraction
        lats, lons, timestamps_ms = lats[keep], lons[keep], timestamps_ms[keep]

    return lats, lons, timestamps_ms


def _local_hour_to_utc_ms(day_utc_midnight_ms: int, local_hour: float) -> int:
    """Convert a local-hour-of-day to a UTC ms timestamp on a given day.

    Eastern Daylight Time = UTC-4 for the entire window (1–7 Jun 2026).
    Local 07:30 ⇒ UTC 11:30.
    """
    utc_hour = local_hour - LOCAL_UTC_OFFSET_HOURS  # subtract a negative
    return int(day_utc_midnight_ms + utc_hour * 3_600_000.0)


def _simulate_device_week(
    rng: np.random.Generator,
    home_lat: float, home_lon: float,
    work_lat: float | None, work_lon: float | None,
    week_start_utc_ms: int,
    pois: np.ndarray,  # shape (n_pois, 2), (lat, lon)
    active_days: set[int],
) -> list[tuple[np.ndarray, np.ndarray, np.ndarray]]:
    """Simulate one device for 7 days. Returns a list of (lat, lon, ts) chunks.

    ``active_days`` is the subset of day indices (0=Mon ... 6=Sun) on which the
    device emits any pings at all. Days outside the set are skipped entirely —
    this is how the activity model produces devices that "appear only a few
    times" (transient/occasional classes) versus full-week regulars.

    This produces the device's *full* bursty week. The caller then thins it down
    to the device's weekly ping budget (see :func:`_thin_to_budget`), which is
    what creates the sparse tail and heavy skew of the panel.
    """
    # Local wrappers bake in rng so the many trip / stationary call sites below
    # stay readable.
    def gen_stat(lat, lon, s, e, drop_indoor_fraction=0.5):
        return _generate_stationary_pings(
            rng, lat, lon, s, e, drop_indoor_fraction=drop_indoor_fraction)

    def gen_trip(o_lat, o_lon, d_lat, d_lon, dep, spd):
        return _generate_trip_pings(rng, o_lat, o_lon, d_lat, d_lon, dep, spd)

    chunks: list[tuple[np.ndarray, np.ndarray, np.ndarray]] = []
    has_work = work_lat is not None and work_lon is not None
    midday_hopper = has_work and (rng.random() < MIDDAY_HOP_PROBABILITY)

    for day in range(7):
        if day not in active_days:
            continue
        day_start_ms = week_start_utc_ms + day * 86_400_000
        is_weekend = day >= 5  # 0=Mon ... 5=Sat, 6=Sun

        if not is_weekend and has_work:
            # ---- Weekday with a workplace ----
            am_depart_local = rng.normal(AM_DEPART_LOCAL_HOUR, AM_DEPART_STD_HOURS)
            pm_depart_local = rng.normal(PM_DEPART_LOCAL_HOUR, PM_DEPART_STD_HOURS)
            am_depart_local = float(np.clip(am_depart_local, 5.5, 10.5))
            pm_depart_local = float(np.clip(pm_depart_local, 14.5, 21.0))

            am_depart_ms = _local_hour_to_utc_ms(day_start_ms, am_depart_local)
            pm_depart_ms = _local_hour_to_utc_ms(day_start_ms, pm_depart_local)

            # Stationary at home before AM departure
            chunks.append(gen_stat(home_lat, home_lon, day_start_ms, am_depart_ms))
            # Commute home → work
            commute_to_work = gen_trip(
                home_lat, home_lon, work_lat, work_lon,  # type: ignore[arg-type]
                am_depart_ms, COMMUTE_SPEED_KMH)
            chunks.append(commute_to_work)
            # Time at work (with optional midday hop)
            work_arrive_ms = int(commute_to_work[2].max())
            if midday_hopper:
                lunch_ms = _local_hour_to_utc_ms(day_start_ms, 12.0 + rng.uniform(-0.5, 1.0))
                if lunch_ms > work_arrive_ms + 30 * 60_000 and lunch_ms < pm_depart_ms - 60 * 60_000:
                    # Random nearby point (~600 m offset)
                    offset_m = rng.normal(0.0, 600.0)
                    bearing = rng.uniform(0.0, 2 * np.pi)
                    dlat = (offset_m * np.cos(bearing)) / LAT_DEG_M
                    dlon = (offset_m * np.sin(bearing)) / (LAT_DEG_M * np.cos(np.deg2rad(work_lat)))  # type: ignore[arg-type]
                    lunch_lat = float(work_lat) + dlat  # type: ignore[arg-type]
                    lunch_lon = float(work_lon) + dlon  # type: ignore[arg-type]
                    # work → lunch → back
                    chunks.append(gen_stat(
                        float(work_lat), float(work_lon),  # type: ignore[arg-type]
                        work_arrive_ms, lunch_ms))
                    out = gen_trip(float(work_lat), float(work_lon),  # type: ignore[arg-type]
                                   lunch_lat, lunch_lon, lunch_ms,
                                   WEEKEND_TRIP_SPEED_KMH)
                    chunks.append(out)
                    lunch_arrive_ms = int(out[2].max())
                    lunch_end_ms = lunch_arrive_ms + int(rng.uniform(30, 60) * 60_000)
                    chunks.append(gen_stat(
                        lunch_lat, lunch_lon, lunch_arrive_ms, lunch_end_ms,
                        drop_indoor_fraction=0.0))
                    back = gen_trip(lunch_lat, lunch_lon,
                                    float(work_lat), float(work_lon),  # type: ignore[arg-type]
                                    lunch_end_ms, WEEKEND_TRIP_SPEED_KMH)
                    chunks.append(back)
                    back_arrive_ms = int(back[2].max())
                    chunks.append(gen_stat(
                        float(work_lat), float(work_lon),  # type: ignore[arg-type]
                        back_arrive_ms, pm_depart_ms))
                else:
                    chunks.append(gen_stat(
                        float(work_lat), float(work_lon),  # type: ignore[arg-type]
                        work_arrive_ms, pm_depart_ms))
            else:
                chunks.append(gen_stat(
                    float(work_lat), float(work_lon),  # type: ignore[arg-type]
                    work_arrive_ms, pm_depart_ms))

            # Commute work → home
            commute_home = gen_trip(
                float(work_lat), float(work_lon),  # type: ignore[arg-type]
                home_lat, home_lon, pm_depart_ms, COMMUTE_SPEED_KMH)
            chunks.append(commute_home)
            # Evening at home
            home_arrive_ms = int(commute_home[2].max())
            chunks.append(gen_stat(
                home_lat, home_lon, home_arrive_ms, day_start_ms + 86_400_000))

        elif not is_weekend and not has_work:
            # ---- Weekday, no workplace: errands close to home ----
            # 0–2 short excursions to a leisure POI or just a nearby random point
            n_excursions = rng.integers(0, 3)
            cursor_ms = day_start_ms
            cursor_lat, cursor_lon = home_lat, home_lon
            for _ in range(int(n_excursions)):
                # Errand happens between 09:00 and 19:00 local
                t_local = rng.uniform(9.0, 19.0)
                depart_ms = max(_local_hour_to_utc_ms(day_start_ms, t_local),
                                cursor_ms + 30 * 60_000)
                # Half the errands go to a leisure POI, half to a random nearby point
                if rng.random() < 0.5 and len(pois) > 0:
                    dest_lat, dest_lon = pois[rng.integers(0, len(pois))]
                else:
                    offset_m = rng.normal(0.0, 1200.0)
                    bearing = rng.uniform(0.0, 2 * np.pi)
                    dest_lat = home_lat + (offset_m * np.cos(bearing)) / LAT_DEG_M
                    dest_lon = home_lon + (offset_m * np.sin(bearing)) / (
                        LAT_DEG_M * np.cos(np.deg2rad(home_lat)))
                # Home → dest → home
                chunks.append(gen_stat(cursor_lat, cursor_lon, cursor_ms, depart_ms))
                out = gen_trip(cursor_lat, cursor_lon,
                               float(dest_lat), float(dest_lon),
                               depart_ms, WEEKEND_TRIP_SPEED_KMH)
                chunks.append(out)
                arrive_ms = int(out[2].max())
                stay_min = rng.uniform(30, 120)
                stay_end_ms = arrive_ms + int(stay_min * 60_000)
                chunks.append(gen_stat(
                    float(dest_lat), float(dest_lon),
                    arrive_ms, stay_end_ms, drop_indoor_fraction=0.0))
                back = gen_trip(float(dest_lat), float(dest_lon),
                                home_lat, home_lon, stay_end_ms,
                                WEEKEND_TRIP_SPEED_KMH)
                chunks.append(back)
                cursor_ms = int(back[2].max())
                cursor_lat, cursor_lon = home_lat, home_lon
            # Fill the rest of the day at home
            chunks.append(gen_stat(
                home_lat, home_lon, cursor_ms, day_start_ms + 86_400_000))

        else:
            # ---- Weekend (Sat or Sun): 1–3 leisure trips, lower volume ----
            n_trips = int(rng.integers(1, 4))
            cursor_ms = day_start_ms
            cursor_lat, cursor_lon = home_lat, home_lon
            for _ in range(n_trips):
                t_local = rng.uniform(10.0, 20.0)
                depart_ms = max(_local_hour_to_utc_ms(day_start_ms, t_local),
                                cursor_ms + 30 * 60_000)
                if depart_ms >= day_start_ms + 86_400_000 - 60 * 60_000:
                    break
                dest_lat, dest_lon = pois[rng.integers(0, len(pois))]
                chunks.append(gen_stat(
                    cursor_lat, cursor_lon, cursor_ms, depart_ms,
                    drop_indoor_fraction=0.6))  # extra indoor quiet on weekends
                out = gen_trip(cursor_lat, cursor_lon,
                               float(dest_lat), float(dest_lon),
                               depart_ms, WEEKEND_TRIP_SPEED_KMH)
                chunks.append(out)
                arrive_ms = int(out[2].max())
                stay_min = rng.uniform(*LEISURE_DURATION_HOURS) * 60.0
                stay_end_ms = arrive_ms + int(stay_min * 60_000)
                # At a leisure POI: open sky, but we still tag indoors=False so
                # we don't drop pings here.
                chunks.append(gen_stat(
                    float(dest_lat), float(dest_lon),
                    arrive_ms, stay_end_ms, drop_indoor_fraction=0.0))
                back = gen_trip(float(dest_lat), float(dest_lon),
                                home_lat, home_lon, stay_end_ms,
                                WEEKEND_TRIP_SPEED_KMH)
                chunks.append(back)
                cursor_ms = int(back[2].max())
                cursor_lat, cursor_lon = home_lat, home_lon
            # Fill rest of day at home with heavier indoor drop (people stay in)
            chunks.append(gen_stat(
                home_lat, home_lon, cursor_ms, day_start_ms + 86_400_000,
                drop_indoor_fraction=0.7))

    return chunks


# -----------------------------------------------------------------------------
# Main driver
# -----------------------------------------------------------------------------

def _gini(counts: np.ndarray) -> float:
    """Gini coefficient of a 1-D array of non-negative counts (0 = even, 1 = all in one)."""
    x = np.sort(np.asarray(counts, dtype=np.float64))
    n = x.size
    if n == 0 or x.sum() == 0:
        return float("nan")
    cum = np.cumsum(x)
    return float((n + 1 - 2 * (cum.sum() / cum[-1])) / n)


def _thin_to_budget(
    ts_ms_sorted: np.ndarray, budget: int, rng: np.random.Generator
) -> np.ndarray:
    """Indices to KEEP so a device emits ~``budget`` pings, dropping whole bursts.

    A device's full bursty week is thinned by dropping entire bursts at random
    (a burst = a run of pings with <= ``BURST_GAP_MIN`` between them). Dropping
    *whole bursts* — rather than individual pings — is what keeps a heavily
    thinned device looking bursty: a transient device ends up with one or two
    tight clusters of sightings, not an evenly spaced handful. The few pings of
    the last-added burst are trimmed to land on the budget exactly.

    ``ts_ms_sorted`` must be ascending. Returns ascending indices into it.
    """
    n = ts_ms_sorted.size
    if budget >= n:
        return np.arange(n)
    if budget <= 0:
        return np.empty(0, dtype=np.int64)

    gap_min = np.diff(ts_ms_sorted) / 60_000.0
    burst_id = np.empty(n, dtype=np.int64)
    burst_id[0] = 0
    np.cumsum(gap_min > BURST_GAP_MIN, out=burst_id[1:])
    n_bursts = int(burst_id[-1]) + 1
    sizes = np.bincount(burst_id, minlength=n_bursts)

    order = rng.permutation(n_bursts)
    csum = np.cumsum(sizes[order])
    take = int(np.searchsorted(csum, budget, side="left")) + 1
    take = min(take, n_bursts)
    kept = order[:take]
    idx = np.flatnonzero(np.isin(burst_id, kept))

    if idx.size > budget:  # trim the last-added burst, keeping its earliest pings
        last_positions = np.flatnonzero(burst_id == int(order[take - 1]))
        drop = last_positions[-(idx.size - budget):]
        idx = np.setdiff1d(idx, drop, assume_unique=True)
    return idx


def generate(n_devices: int, seed: int, output: Path) -> dict:
    """Build the synthetic dataset and write it to ``output`` as Parquet."""
    t0 = time.perf_counter()
    rng = np.random.default_rng(seed)

    home_sampler = build_point_sampler()
    logger.info("Home sampler: %s (n_cells=%d); work: gravity over %d employment centres",
                home_sampler.source, len(home_sampler.lats), len(EMPLOYMENT_CENTERS))

    # Home cells from the residential surface for every device. Work cells for
    # ~70% are assigned by a gravity model over the EMPLOYMENT CENTRES, so
    # destinations both concentrate AND correlate with home (distance decay).
    home_lat, home_lon = home_sampler.sample(n_devices, rng)
    work_mask = rng.random(n_devices) < WORK_PROBABILITY
    work_lat_all, work_lon_all = sample_work_gravity(home_lat, home_lon, rng)
    work_lat = np.where(work_mask, work_lat_all, np.nan)
    work_lon = np.where(work_mask, work_lon_all, np.nan)

    # Assign sources (~50/50) and stable device IDs.
    sources = np.where(rng.random(n_devices) < 0.5, "irys", "quadrant")
    device_ids = _make_device_ids(n_devices, sources)

    # Heavy-tailed activity: assign each device a class, then a per-device weekly
    # ping BUDGET (log-normal) and an active-day count. Drawn in bulk (fixed
    # class order) for determinism. The budget is enforced by thinning the
    # device's simulated week (drop whole bursts) inside the per-device loop.
    u = rng.random(n_devices)
    cum = np.cumsum(ACTIVITY_PROBS)
    class_idx = np.searchsorted(cum, u, side="right").clip(0, len(ACTIVITY_CLASSES) - 1)
    activity_class = np.array(ACTIVITY_CLASSES)[class_idx]
    device_budget = np.empty(n_devices, dtype=np.int64)
    n_active = np.empty(n_devices, dtype=np.int64)
    for cls in ACTIVITY_CLASSES:  # fixed order -> deterministic rng consumption
        m = activity_class == cls
        cnt = int(m.sum())
        if cnt == 0:
            continue
        median, sigma = ACTIVITY_BUDGET[cls]
        draws = rng.lognormal(mean=np.log(median), sigma=sigma, size=cnt)
        device_budget[m] = np.maximum(1, np.rint(draws)).astype(np.int64)
        dlo, dhi = ACTIVITY_ACTIVE_DAYS[cls]
        n_active[m] = rng.integers(dlo, dhi + 1, size=cnt)

    pois = np.array([(p[0], p[1]) for p in LEISURE_POIS])

    # Week start = Monday 1 June 2026 00:00:00 UTC
    week_start_utc = datetime(2026, 6, 1, 0, 0, 0, tzinfo=timezone.utc)
    week_start_ms = int(week_start_utc.timestamp() * 1_000)

    # Per-device simulation. Pre-allocating per device avoids one giant list.
    all_dev_idx: list[np.ndarray] = []
    all_lat: list[np.ndarray] = []
    all_lon: list[np.ndarray] = []
    all_ts: list[np.ndarray] = []

    for i in range(n_devices):
        wlat = float(work_lat[i]) if work_mask[i] else None
        wlon = float(work_lon[i]) if work_mask[i] else None
        # Which of the 7 days is this device present? (one rng.choice/device)
        k = int(n_active[i])
        if k >= 7:
            active_days = set(range(7))
        else:
            active_days = {int(d) for d in rng.choice(7, size=k, replace=False)}
        chunks = _simulate_device_week(
            rng,
            float(home_lat[i]), float(home_lon[i]),
            wlat, wlon,
            week_start_ms,
            pois,
            active_days,
        )
        # Concatenate this device's full bursty week, sort by time, then thin it
        # down to its weekly budget by dropping whole bursts. Thinning here (not
        # during simulation) is what produces the sparse tail and heavy skew
        # while preserving the within-burst sub-minute cadence.
        kept_chunks = [c for c in chunks if c[0].size]
        if not kept_chunks:
            continue
        dev_lat = np.concatenate([c[0] for c in kept_chunks]).astype(np.float64)
        dev_lon = np.concatenate([c[1] for c in kept_chunks]).astype(np.float64)
        dev_ts = np.concatenate([c[2] for c in kept_chunks]).astype(np.int64)
        order = np.argsort(dev_ts, kind="mergesort")
        dev_lat, dev_lon, dev_ts = dev_lat[order], dev_lon[order], dev_ts[order]
        keep = _thin_to_budget(dev_ts, int(device_budget[i]), rng)
        if keep.size == 0:
            continue
        all_lat.append(dev_lat[keep])
        all_lon.append(dev_lon[keep])
        all_ts.append(dev_ts[keep])
        all_dev_idx.append(np.full(keep.size, i, dtype=np.int32))

    lat_arr = np.concatenate(all_lat) if all_lat else np.zeros(0)
    lon_arr = np.concatenate(all_lon) if all_lon else np.zeros(0)
    ts_arr = np.concatenate(all_ts) if all_ts else np.zeros(0, dtype=np.int64)
    dev_arr = np.concatenate(all_dev_idx) if all_dev_idx else np.zeros(0, dtype=np.int32)

    # Clip to the 7-day window (Mon 2026-06-01 00:00 → Sun 2026-06-07 23:59:59.999 UTC).
    # Returning-home pings from late weekend trips can otherwise spill into Monday
    # of the following week.
    week_end_ms = week_start_ms + 7 * 86_400_000 - 1
    in_window = (ts_arr >= week_start_ms) & (ts_arr <= week_end_ms)
    lat_arr = lat_arr[in_window]
    lon_arr = lon_arr[in_window]
    ts_arr = ts_arr[in_window]
    dev_arr = dev_arr[in_window]

    # Clip lat/lon back into the bounding box (jitter can leak slightly).
    lat_arr = np.clip(lat_arr, DC_BBOX[0], DC_BBOX[1])
    lon_arr = np.clip(lon_arr, DC_BBOX[2], DC_BBOX[3])

    # Apply horizontal-accuracy noise. σ depends on urban-canyon vs. open sky.
    in_urban = _is_urban_canyon(lat_arr, lon_arr)
    sigma_m = np.where(in_urban, ACCURACY_URBAN_SIGMA_M, ACCURACY_OPEN_SIGMA_M)
    # Reported accuracy is the *expected* σ, with mild log-normal noise so it
    # doesn't all look identical (real feeds vary).
    accuracy = sigma_m * rng.lognormal(mean=0.0, sigma=0.2, size=sigma_m.shape)

    # Add the horizontal error to the *reported* position.
    lat_noise_deg = rng.normal(0.0, sigma_m / LAT_DEG_M)
    lon_noise_deg = rng.normal(0.0, sigma_m / (LAT_DEG_M * np.cos(np.deg2rad(lat_arr))))
    lat_arr = np.clip(lat_arr + lat_noise_deg, DC_BBOX[0], DC_BBOX[1])
    lon_arr = np.clip(lon_arr + lon_noise_deg, DC_BBOX[2], DC_BBOX[3])

    # Build the DataFrame.
    device_id_col = device_ids[dev_arr]
    source_col = sources[dev_arr]

    # Round timestamps to whole seconds for a cleaner ISO column. We keep the
    # raw ms in ``timestamp_utc`` for analyses that need full precision.
    iso = pd.to_datetime(ts_arr, unit="ms", utc=True).strftime("%Y-%m-%dT%H:%M:%SZ")

    df = pd.DataFrame({
        "device_id": device_id_col.astype(str),
        "timestamp_utc": ts_arr.astype(np.int64),
        "timestamp_iso": iso,
        "lat": lat_arr.astype(np.float64),
        "lon": lon_arr.astype(np.float64),
        "horizontal_accuracy_m": accuracy.astype(np.float32),
        "source": pd.Categorical(source_col, categories=["irys", "quadrant"]),
    })

    # Sort by (device_id, timestamp_utc) for monotonic per-device order +
    # better Parquet compression.
    df.sort_values(["device_id", "timestamp_utc"], kind="mergesort", inplace=True)
    df.reset_index(drop=True, inplace=True)

    # Write Parquet via pyarrow with dictionary encoding on the string columns
    # for further size savings.
    output.parent.mkdir(parents=True, exist_ok=True)
    table = pa.Table.from_pandas(df, preserve_index=False)
    pq.write_table(
        table,
        output,
        compression="zstd",  # ~25-35% smaller than snappy on this data; pyarrow
                             # bundles the zstd codec so it reads with no extra deps
        use_dictionary=True,
        write_statistics=True,
    )

    elapsed = time.perf_counter() - t0
    file_size_mb = output.stat().st_size / (1024 * 1024)

    # Pings-per-device distribution + the panel-imbalance and bursty-cadence
    # metrics the workshop teaches (and that this sample is calibrated to show).
    counts = df.groupby("device_id", observed=True).size().to_numpy()
    n_dev = counts.size
    total_pings = int(counts.sum())
    sorted_desc = np.sort(counts)[::-1]

    def _top_share(frac: float) -> float:
        k = max(1, int(np.ceil(frac * n_dev)))
        return float(sorted_desc[:k].sum() / total_pings) if total_pings else 0.0

    # Inter-ping gaps (the bursty cadence): minutes between consecutive pings
    # within a device. df is already sorted by (device_id, timestamp_utc).
    gaps_min = (
        df.groupby("device_id", observed=True)["timestamp_utc"].diff() / 60_000.0
    ).to_numpy()
    gaps_min = gaps_min[~np.isnan(gaps_min)]
    has_gaps = gaps_min.size > 0

    summary = {
        "rows": len(df),
        "devices_simulated": int(n_devices),
        "devices_present": int(n_dev),
        "time_min_iso": df["timestamp_iso"].min(),
        "time_max_iso": df["timestamp_iso"].max(),
        "lat_min": float(df["lat"].min()),
        "lat_max": float(df["lat"].max()),
        "lon_min": float(df["lon"].min()),
        "lon_max": float(df["lon"].max()),
        "source_split": df["source"].value_counts().to_dict(),
        "pings_per_device_p50": int(np.percentile(counts, 50)),
        "pings_per_device_p90": int(np.percentile(counts, 90)),
        "pings_per_device_p99": int(np.percentile(counts, 99)),
        "pings_per_device_max": int(counts.max()),
        "pct_devices_1_ping": round(float((counts == 1).mean()), 3),
        "pct_devices_le5_pings": round(float((counts <= 5).mean()), 3),
        "gini_pings_per_device": round(_gini(counts), 3),
        "top1pct_share_of_pings": round(_top_share(0.01), 3),
        "top10pct_share_of_pings": round(_top_share(0.10), 3),
        "median_gap_min": round(float(np.median(gaps_min)), 2) if has_gaps else float("nan"),
        "pct_gaps_sub_minute": round(float((gaps_min < 1.0).mean()), 3) if has_gaps else float("nan"),
        "file_size_mb": round(file_size_mb, 2),
        "elapsed_s": round(elapsed, 2),
        "sampler_source": home_sampler.source,
    }
    return summary


def _print_summary(summary: dict, output: Path) -> None:
    print("\n=== Synthetic ping dataset generated ===")
    print(f"  Output             : {output}")
    print(f"  Rows               : {summary['rows']:,}")
    print(f"  Devices (present)  : {summary['devices_present']:,} of {summary['devices_simulated']:,} simulated")
    print(f"  Time range (UTC)   : {summary['time_min_iso']} → {summary['time_max_iso']}")
    print(f"  Lat range          : {summary['lat_min']:.4f} → {summary['lat_max']:.4f}")
    print(f"  Lon range          : {summary['lon_min']:.4f} → {summary['lon_max']:.4f}")
    print(f"  Source split       : {dict(summary['source_split'])}")
    print(f"  Pings/device       : p50={summary['pings_per_device_p50']:,} "
          f"p90={summary['pings_per_device_p90']:,} "
          f"p99={summary['pings_per_device_p99']:,} "
          f"max={summary['pings_per_device_max']:,}")
    print(f"  Sparse tail        : {summary['pct_devices_1_ping'] * 100:.1f}% single-ping, "
          f"{summary['pct_devices_le5_pings'] * 100:.1f}% ≤5 pings")
    print(f"  Skew               : Gini={summary['gini_pings_per_device']:.3f}; "
          f"top 1%/10% of devices = {summary['top1pct_share_of_pings'] * 100:.1f}%/"
          f"{summary['top10pct_share_of_pings'] * 100:.1f}% of pings")
    print(f"  Cadence (bursty)   : median gap {summary['median_gap_min']:.2f} min; "
          f"{summary['pct_gaps_sub_minute'] * 100:.1f}% of gaps < 1 min")
    print(f"  File size          : {summary['file_size_mb']:.2f} MB")
    print(f"  Wall time          : {summary['elapsed_s']:.2f} s")
    print(f"  Sampler source     : {summary['sampler_source']}")
    if summary["file_size_mb"] >= 100:
        print("  WARNING: file size exceeds 100 MB target.", file=sys.stderr)


def parse_args(argv: Iterable[str] | None = None) -> argparse.Namespace:
    p = argparse.ArgumentParser(description=__doc__.split("\n\n", 1)[0])
    p.add_argument("--output", type=Path, default=DEFAULT_OUTPUT,
                   help=f"Output Parquet path (default: {DEFAULT_OUTPUT.relative_to(REPO_ROOT)})")
    p.add_argument("--n-devices", type=int, default=N_DEVICES_DEFAULT,
                   help=f"Number of synthetic devices (default: {N_DEVICES_DEFAULT})")
    p.add_argument("--seed", type=int, default=SEED,
                   help=f"Random seed (default: {SEED})")
    p.add_argument("--quiet", action="store_true", help="Suppress info logging")
    return p.parse_args(argv)


def main(argv: Iterable[str] | None = None) -> int:
    args = parse_args(argv)
    logging.basicConfig(
        level=logging.WARNING if args.quiet else logging.INFO,
        format="%(asctime)s %(levelname)s %(name)s: %(message)s",
    )
    summary = generate(n_devices=args.n_devices, seed=args.seed, output=args.output)
    _print_summary(summary, args.output)
    return 0


if __name__ == "__main__":
    raise SystemExit(main())
