Introduction to SkyCorridor¶

SkyCorridor is a deterministic simulation engine for 3D urban air corridor management. It evaluates thousands of rule configurations to find which consistently produce safe, efficient operations — mapping the Pareto frontier between safety and throughput.

This notebook walks through the complete workflow:

  1. Load a built-in example scenario
  2. Visualize the airspace environment in interactive 3D
  3. Run a Monte Carlo experiment (2 rule sets × 10 trials)
  4. Analyze results — DataFrames, safety-vs-throughput plots
  5. Replay a simulated trial in 3D

Audience: Researchers and engineers evaluating rule-based air traffic management strategies.
Time to complete: ~5 minutes (including simulation runtime).

In [1]:
%pip install skycorridor pyvista trame trame-vuetify trame-vtk polars plotly nest_asyncio2 -q
Note: you may need to restart the kernel to use updated packages.
[notice] A new release of pip is available: 24.0 -> 26.0.1
[notice] To update, run: python.exe -m pip install --upgrade pip

1 — Setup¶

Import skycorridor (the Rust simulation engine with a Python interface) and configure PyVista for inline 3D rendering. The trame backend gives interactive rotation/zoom inside the notebook; if unavailable, static PNG screenshots are used instead.

In [23]:
import skycorridor as sc
import numpy as np
import pyvista as pv
import polars as pl
import plotly.express as px

try:
    pv.set_jupyter_backend("trame")
except Exception:
    pv.set_jupyter_backend("static")

print(sc.info())
skycorridor v0.1.0

2 — Load a Built-in Example¶

SkyCorridor ships with pre-configured example experiments. Each ExperimentSpec bundles:

  • Layout — the airspace geometry (bounds, voxel grid, vertiport positions)
  • Scenario — demand pattern (origin-destination rates) and uncertainty (wind, GPS error)
  • Rule set(s) — the control logic being evaluated (separation, conflict detection, scheduling)

We start with the simplest example — a two-vertiport corridor — to inspect the API.

In [24]:
example = sc.examples.simple_corridor()

print(example)
print(f"  Layouts:    {example.num_layouts}")
print(f"  Scenarios:  {example.num_scenarios}")
print(f"  Rule sets:  {example.num_rule_sets}")
print(f"  Trials:     {example.num_trials}")
print(f"  Total runs: {example.total_runs}")
ExperimentSpec(name='Simple Corridor', configs=1, total_runs=10)
  Layouts:    1
  Scenarios:  1
  Rule sets:  1
  Trials:     10
  Total runs: 10

3 — Visualize the Airspace in 3D¶

Before running simulations, it helps to see the operating environment. Below we construct a synthetic 4-vertiport airspace with:

Layer Colour Description
Corridor voxels Blue (transparent) Flyable airspace channels
No-fly zones Red Building / obstacle volumes
Merge points Gold Where corridors intersect — highest conflict risk
Vertiports Green markers Takeoff / landing pads with labels
Routes Coloured splines Pre-computed flight paths
Wind field Arrows (coolwarm) Per-voxel wind vectors

This mirrors the geometry the Rust core generates from procedural or imported city data. Rotate and zoom the 3D view with your mouse.

In [25]:
import math, random

random.seed(42)

NX, NY, NZ = 20, 20, 6
VOXEL_SIZE = 50.0
ORIGIN = [-500.0, -500.0, 0.0]

voxels = {}
for x in range(NX):
    for y in range(NY):
        for z in range(NZ):
            cx, cy = x - NX // 2, y - NY // 2
            in_corridor = abs(cx) <= 2 or abs(cy) <= 2
            in_building = z < 3 and 3 <= abs(cx) <= 5 and 3 <= abs(cy) <= 5

            if in_building:
                voxels[f"{x},{y},{z}"] = {"tags": ["no_fly"]}
            elif in_corridor:
                tags = ["corridor"]
                if abs(cx) <= 2 and abs(cy) <= 2 and z == 3:
                    tags.append("merge")
                angle = math.radians(random.uniform(0, 360))
                speed = random.uniform(0, 5)
                voxels[f"{x},{y},{z}"] = {
                    "tags": tags,
                    "corridor_id": "main",
                    "wind": {
                        "velocity": [
                            speed * math.cos(angle),
                            speed * math.sin(angle),
                            random.uniform(-0.5, 0.5),
                        ],
                        "variability": random.uniform(0, 1),
                        "gust_probability": 0.05,
                        "max_gust": 8.0,
                    },
                }

voxel_config = {
    "layout_id": "demo",
    "grid": {
        "origin": ORIGIN,
        "voxel_size": [VOXEL_SIZE] * 3,
        "grid_size": [NX, NY, NZ],
    },
    "voxels": voxels,
    "features": {"merge_points": [], "conflict_zones": [], "corridors": []},
    "version": 1,
}

vertiports = [
    {"id": "VP_N", "name": "North Hub",  "position": [0.0,  400.0, 0.0]},
    {"id": "VP_S", "name": "South Hub",  "position": [0.0, -400.0, 0.0]},
    {"id": "VP_E", "name": "East Hub",   "position": [400.0, 0.0,  0.0]},
    {"id": "VP_W", "name": "West Hub",   "position": [-400.0, 0.0, 0.0]},
]


def _route(origin, dest, pts):
    wps = [{"position": p, "waypoint_type": "cruise"} for p in pts]
    wps[0]["waypoint_type"] = "takeoff"
    wps[-1]["waypoint_type"] = "landing"
    return {"origin": origin, "destination": dest, "waypoints": wps}


routes = [
    _route("VP_N", "VP_S", [
        [0, 400, 0], [0, 300, 75], [0, 150, 150],
        [0, 0, 175], [0, -150, 150], [0, -300, 75], [0, -400, 0],
    ]),
    _route("VP_E", "VP_W", [
        [400, 0, 0], [300, 0, 75], [150, 0, 150],
        [0, 0, 175], [-150, 0, 150], [-300, 0, 75], [-400, 0, 0],
    ]),
]

env = sc.Environment({"voxels": voxel_config, "vertiports": vertiports, "routes": routes})
print(env)
<Environment [voxels(1158), vertiports(4), routes(2)]>
In [16]:
env.view.show_all()
Widget(value='<iframe src="http://localhost:54189/index.html?ui=P_0x25c4c9dedd0_2&reconnect=auto" class="pyvis…

4 — Run a Monte Carlo Experiment¶

Now we set up a proper experiment comparing two rule strategies:

Rule Set Strategy Key Difference
Static Baseline Fixed separation distances, simple scheduling Conservative — high safety margin, lower throughput
Dynamic Adaptive Density-dependent separation, demand-capacity balancing Responsive — trades some margin for better throughput

We run 10 independent trials per rule set (same layout and scenario), each with a different random seed for stochastic demand generation. With 1 layout × 1 scenario × 2 rule sets × 10 trials, this gives us 20 total simulation runs.

The Rust engine parallelizes across CPU cores, so this typically completes in seconds.

In [17]:
spec = sc.ExperimentSpec(
    "Introduction Demo",
    num_trials=10,
    max_time=1800.0,
    master_seed=42,
)

spec.add_layout("corridor_4vp", "4-Vertiport Corridor")
spec.add_scenario(
    "medium", "Medium Demand",
    ["VP_N", "VP_S", "VP_E", "VP_W"],
    rate_per_pair=5.0,
)

spec.add_rules(sc.RuleSetSpec.static_baseline())
spec.add_rules(sc.RuleSetSpec.dynamic_adaptive())

print(spec.validate())
print(spec)

results = sc.run(spec)
print(results)
valid
ExperimentSpec(name='Introduction Demo', configs=2, total_runs=20)
Results(trials=20)

5 — Explore the Results¶

results.to_dataframe() returns a Polars DataFrame with one row per trial. Columns include:

  • Identity: layout_id, scenario_id, rule_set_id, trial_index
  • Safety: los_events, near_los_events, min_distance, min_horizontal_distance, conflict_count
  • Performance: throughput, mean_delay, p95_delay, completed_vehicles
  • Utilization: mean_corridor_utilization, max_corridor_utilization
In [18]:
df = results.to_dataframe()
print(f"Shape: {df.shape[0]} rows × {df.shape[1]} columns")
df.head(10)
Shape: 20 rows × 29 columns
Out[18]:
shape: (10, 29)
spec_hashlayout_idscenario_idrule_set_idtrial_indexmaster_seedstatussimulation_durationwall_clock_timelos_eventsnear_los_eventsmin_horizontal_distancemin_vertical_distancemin_distanceconflict_countemergency_maneuversgo_aroundstotal_vehiclescompleted_vehiclesthroughputmean_delaymedian_delayp95_delayp99_delaymax_delaymean_queue_timemax_queue_timemean_corridor_utilizationmax_corridor_utilization
strstrstrstri64i64strf64f64i64i64f64f64f64i64i64i64i64i64f64f64f64f64f64f64f64f64f64f64
"21af5369b43bf0582cc779fab000a1…"corridor_4vp""medium""static_baseline"042"Completed"1800.00.003269410210.00.00.03600252346.0255745.21739144.054.061.061.00.00.00.00.0
"21af5369b43bf0582cc779fab000a1…"corridor_4vp""medium""static_baseline"142"Completed"1800.00.003026360.00.043.6004591800222142.02334641.043.044.052.052.00.00.00.00.0
"21af5369b43bf0582cc779fab000a1…"corridor_4vp""medium""static_baseline"242"Completed"1800.00.005959716260.00.00.04700272652.02890541.73076943.057.066.066.00.00.00.00.0
"21af5369b43bf0582cc779fab000a1…"corridor_4vp""medium""static_baseline"342"Completed"1800.00.011865871818.00.055.0363524200252448.02668141.37543.053.060.060.00.00.00.00.0
"21af5369b43bf0582cc779fab000a1…"corridor_4vp""medium""static_baseline"442"Completed"1800.00.003813261742.9418210.062.8012742700181734.01889941.043.051.051.051.00.00.00.00.0
"21af5369b43bf0582cc779fab000a1…"corridor_4vp""medium""static_baseline"542"Completed"1800.00.0112456670.00.030.01600202040.02223541.4543.044.052.052.00.00.00.00.0
"21af5369b43bf0582cc779fab000a1…"corridor_4vp""medium""static_baseline"642"Completed"1800.00.0132621250.00.00.05300343366.03668743.06060643.058.073.073.00.00.00.00.0
"21af5369b43bf0582cc779fab000a1…"corridor_4vp""medium""static_baseline"742"Completed"1800.00.00801556160.00.034.02600363570.03891144.043.058.064.064.00.00.00.00.0
"21af5369b43bf0582cc779fab000a1…"corridor_4vp""medium""static_baseline"842"Completed"1800.00.016914814290.00.00.05400424182.04558144.12195143.054.062.062.00.00.00.00.0
"21af5369b43bf0582cc779fab000a1…"corridor_4vp""medium""static_baseline"942"Completed"1800.00.00706091621.00.050.0800272754.03001741.40740743.053.053.053.00.00.00.00.0
In [19]:
print(results.summary())
SkyCorridor Results Summary
============================================================
Total trials: 20  |  Passed: 4 (20.0%)
Mean throughput: 53.4 veh/hr (std 14.6)
Worst min separation: 0.0 m

Per-configuration breakdown:
------------------------------------------------------------
  [corridor_4vp | medium | dynamic_adaptive]  pass 4/10  throughput 53.4  LOS 14
  [corridor_4vp | medium | static_baseline]  pass 0/10  throughput 53.4  LOS 90

Throughput vs Safety¶

The fundamental question in air corridor management: can we move more vehicles without compromising safety?

Each point below is one Monte Carlo trial. The x-axis shows throughput (vehicles/hr) and the y-axis shows the minimum separation distance observed during the trial (higher = safer). The two rule sets should cluster in different regions of this space, revealing the tradeoff.

In [20]:
fig = px.scatter(
    df.to_pandas(),
    x="throughput",
    y="min_distance",
    color="rule_set_id",
    title="Safety–Throughput Tradeoff (2 Rule Sets × 10 Trials)",
    labels={
        "throughput": "Throughput (vehicles/hr)",
        "min_distance": "Min Separation Distance (m)",
        "rule_set_id": "Rule Set",
    },
    opacity=0.8,
    width=750,
    height=450,
)
fig.update_layout(template="plotly_dark")
fig.show()

6 — Replay a Trial in 3D¶

To understand why a rule set performs the way it does, we can visualize vehicle trajectories through the airspace. Below, two flight paths (N→S and E→W) are rendered as gradient-coloured tubes — the colour encodes elapsed time, so you can trace each vehicle's progress and see where they converge at the central merge point.

White spheres mark vehicle positions at evenly-spaced time intervals. Notice how the two routes cross at altitude 175 m directly above the grid centre — this is the merge zone where conflict detection and separation rules are most critical.

In [26]:
from skycorridor.view import make_plotter, add_voxels, add_vertiports

pl = make_plotter("Trial Replay")
add_voxels(pl, voxel_config)
add_vertiports(pl, vertiports)

cmaps = ["plasma", "viridis"]
for i, route in enumerate(routes):
    pts = np.array([wp["position"] for wp in route["waypoints"]], dtype=float)
    spline = pv.Spline(pts, n_points=200)
    spline["time_s"] = np.linspace(0, 180, spline.n_points)
    tube = spline.tube(radius=6)
    label = f"{route['origin']} → {route['destination']}"
    pl.add_mesh(
        tube,
        scalars="time_s",
        cmap=cmaps[i],
        scalar_bar_args={"title": f"Time (s) — {label}", "color": "white"},
    )

    for t_idx in np.linspace(0, 199, 6, dtype=int):
        pos = spline.points[t_idx]
        sphere = pv.Sphere(radius=10, center=pos)
        pl.add_mesh(sphere, color="white", opacity=0.3 + 0.7 * (t_idx / 199))

pl.camera_position = [(700, 700, 500), (0, 0, 100), (0, 0, 1)]
pl.show()
Widget(value='<iframe src="http://localhost:54189/index.html?ui=P_0x25caa4c6a90_4&reconnect=auto" class="pyvis…

Interpretation¶

The results above reveal the core dynamic in urban air corridor management:

  • Static Baseline maintains larger separation distances (higher safety margins) but achieves lower throughput. Its fixed rules are conservative — they don't adapt to actual traffic density, so capacity is left on the table when the airspace is lightly loaded.

  • Dynamic Adaptive achieves higher throughput by adjusting separation requirements based on real-time density. This trades some safety margin for efficiency, but the separation distance never drops below dangerous levels.

This is the safety–throughput Pareto tradeoff that SkyCorridor is designed to explore. With only two rule sets and 10 trials each, we already see the clusters diverge. Scaling up to hundreds of rule configurations and thousands of trials reveals the full Pareto frontier.

Next Steps¶

Notebook Topic What You'll Learn
02 Understanding the Rule Space Parameter sweeps, ablation studies, interactive exploration
03 Evolutionary Optimization NSGA-II search for the full Pareto frontier
04 LLM Rule Generation Translate regulatory text into testable rule configs
05 Reinforcement Learning PPO fine-tuning of continuous parameters
06 Full Research Pipeline End-to-end study with publication-ready figures