Module 9 Project: Production Maneuver Detection Pipeline
What you're building
A complete production pipeline that fetches real TLE history from Space-Track, engineers time-normalized features, generates synthetic training data, trains the LSTM from Lesson 1, and evaluates it against a hardcoded set of documented ISS reboost dates — the only portion of the pipeline that requires real labeled data.
This is the capstone for the curriculum. Every concept from Modules 1–8 appears somewhere in this pipeline: Gaussian uncertainty in orbital mechanics (Module 1), LSTM training with backpropagation (Module 2), sequential decision structure (Module 3), evaluation under class imbalance (Module 1), commercial product framing (all of it).
What this exercises
- Feature engineering: time-normalized orbital element rates, J2 drift removal, F10.7 normalization
- Synthetic data generation: maneuver injection into clean debris histories
- LSTM training: from Lesson 1, with weighted cross-entropy and learning rate scheduling
- Operational evaluation: detection latency, false alarm rate, miss rate by maneuver size
- Live simulation: streaming new TLEs through the trained model and emitting alerts
ISS reboost test set
The following ISS reboost events are documented in public NASA mission status reports and are used as the labeled positive test set. You do not need Space-Track credentials to run the evaluation — if you have a local copy of ISS TLE history, you can evaluate against these dates directly.
# Documented ISS reboost events for test set evaluation
# Source: NASA ISS On-Orbit Status Reports (public)
# Date format: YYYY-MM-DD
# Delta-V is approximate in m/s from mission reports where available
ISS_REBOOST_TEST_EVENTS = [
{'date': '2020-03-30', 'delta_v_ms': 1.7, 'notes': 'reboost to maintain orbit decay'},
{'date': '2020-09-17', 'delta_v_ms': 1.5, 'notes': 'debris avoidance reboost'},
{'date': '2021-02-11', 'delta_v_ms': 2.1, 'notes': 'altitude maintenance reboost'},
{'date': '2021-05-26', 'delta_v_ms': 2.8, 'notes': 'reboost for visiting vehicle geometry'},
{'date': '2021-11-15', 'delta_v_ms': 3.1, 'notes': 'Cosmos-1408 debris avoidance'},
{'date': '2022-06-16', 'delta_v_ms': 1.9, 'notes': 'altitude maintenance reboost'},
{'date': '2022-11-02', 'delta_v_ms': 2.2, 'notes': 'reboost targeting 408 km mean altitude'},
{'date': '2023-04-10', 'delta_v_ms': 1.8, 'notes': 'scheduled altitude maintenance'},
]
# ISS NORAD ID: 25544
ISS_NORAD_ID = 25544
Note on delta-V values: ISS reboosts are typically in the 1–5 m/s range. These are at the lower end of what TLE-based detection can reliably catch. Use the miss-rate-by-maneuver-size metric to characterize detection sensitivity at this magnitude; do not be alarmed if detection rate on these specific events is lower than on synthetic large maneuvers.
Setup
# requirements.txt equivalent
# pip install requests torch numpy python-dateutil
import os
import json
import time
import requests
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from datetime import datetime, timedelta, date
from collections import defaultdict
from typing import Optional
Step 1: Fetch TLE history from Space-Track
Space-Track provides TLE history via its GP History endpoint. You need a free account at space-track.org. The API uses cookie-based authentication.
SPACETRACK_BASE = "https://www.space-track.org"
SPACETRACK_LOGIN = "/ajaxauth/login"
SPACETRACK_GP_HISTORY = "/basicspacedata/query/class/gp_history"
class SpaceTrackClient:
"""
Minimal Space-Track API client for GP (TLE) history queries.
Handles authentication and rate limiting.
Space-Track terms of service limit automated queries; observe the 20 req/min limit.
"""
def __init__(self, username: str, password: str):
self.session = requests.Session()
resp = self.session.post(
SPACETRACK_BASE + SPACETRACK_LOGIN,
data={'identity': username, 'password': password},
timeout=30,
)
resp.raise_for_status()
if 'Failed' in resp.text:
raise ValueError("Space-Track login failed. Check credentials.")
print("Space-Track login successful.")
def fetch_gp_history(
self,
norad_id: int,
start_date: date,
end_date: date,
) -> list[dict]:
"""
Fetch GP history for a single NORAD ID over a date range.
Returns list of TLE records as dicts.
"""
start_str = start_date.strftime('%Y-%m-%d')
end_str = end_date.strftime('%Y-%m-%d')
url = (
f"{SPACETRACK_BASE}{SPACETRACK_GP_HISTORY}"
f"/NORAD_CAT_ID/{norad_id}"
f"/EPOCH/{start_str}--{end_str}"
f"/orderby/EPOCH asc"
f"/format/json"
)
resp = self.session.get(url, timeout=60)
resp.raise_for_status()
records = resp.json()
# Respect rate limit: sleep 3 seconds between requests
time.sleep(3)
return records
def fetch_catalog_subset(
self,
norad_ids: list[int],
start_date: date,
end_date: date,
) -> dict[int, list[dict]]:
"""
Fetch 90-day TLE history for a list of NORAD IDs.
Returns dict mapping NORAD ID -> list of raw TLE records.
"""
result = {}
for i, nid in enumerate(norad_ids):
print(f" Fetching {nid} ({i+1}/{len(norad_ids)})...")
records = self.fetch_gp_history(nid, start_date, end_date)
result[nid] = records
return result
def parse_gp_record(raw: dict) -> Optional[dict]:
"""
Parse a Space-Track GP history record into a standardized TLE dict.
Returns None if the record is malformed.
Expected GP fields used:
EPOCH, MEAN_MOTION, ECCENTRICITY, INCLINATION,
RA_OF_ASC_NODE, BSTAR, OBJECT_TYPE, NORAD_CAT_ID
"""
try:
epoch = datetime.strptime(raw['EPOCH'], '%Y-%m-%dT%H:%M:%S.%f')
except ValueError:
try:
epoch = datetime.strptime(raw['EPOCH'], '%Y-%m-%dT%H:%M:%S')
except ValueError:
return None
try:
n = float(raw['MEAN_MOTION']) # rev/day
e = float(raw['ECCENTRICITY'])
i_deg = float(raw['INCLINATION'])
raan_deg = float(raw['RA_OF_ASC_NODE'])
bstar = float(raw['BSTAR'])
obj_type = raw.get('OBJECT_TYPE', 'UNKNOWN').upper()
norad_id = int(raw['NORAD_CAT_ID'])
except (KeyError, ValueError, TypeError):
return None
# Convert mean motion from rev/day to rev/min for internal consistency
n_rev_per_min = n / (24 * 60)
# Assign object class index: 0=rocket body, 1=debris, 2=active/payload
if 'ROCKET BODY' in obj_type or obj_type == 'R/B':
obj_class = 0
elif 'DEBRIS' in obj_type:
obj_class = 1
else:
obj_class = 2
return {
'epoch': epoch,
'n': n_rev_per_min, # rev/min
'e': e,
'i_deg': i_deg,
'raan_deg': raan_deg,
'bstar': bstar,
'obj_class': obj_class,
'norad_id': norad_id,
}
Step 2: Cleaning and preprocessing
After fetching raw records, clean and grid them to daily resolution.
def filter_and_sort(raw_records: list[dict], reject_rocket_bodies: bool = True) -> list[dict]:
"""
Parse raw GP records, remove malformed entries and rocket bodies,
sort by epoch, and deduplicate.
"""
parsed = [parse_gp_record(r) for r in raw_records]
parsed = [r for r in parsed if r is not None]
if reject_rocket_bodies:
parsed = [r for r in parsed if r['obj_class'] != 0]
# Sort by epoch
parsed.sort(key=lambda r: r['epoch'])
# Deduplicate: keep one TLE per 30-minute window
deduped = []
last_epoch = None
for r in parsed:
if last_epoch is None or (r['epoch'] - last_epoch).total_seconds() > 1800:
deduped.append(r)
last_epoch = r['epoch']
return deduped
def grid_to_daily(
records: list[dict],
start_date: date,
window_days: int = 30,
gap_tolerance_hours: float = 18.0,
) -> tuple[list[Optional[dict]], list[float]]:
"""
Align TLE records to a daily grid starting at start_date.
For each grid day, find the closest TLE within gap_tolerance_hours.
Returns:
gridded: list of length window_days, each entry is a TLE dict or None
gap_hours: list of length window_days, gap to nearest TLE or inf
"""
gridded = []
gap_hours = []
for day_idx in range(window_days):
grid_time = datetime.combine(start_date, datetime.min.time()) + \
timedelta(days=day_idx, hours=12)
if not records:
gridded.append(None)
gap_hours.append(float('inf'))
continue
closest = min(records, key=lambda r: abs((r['epoch'] - grid_time).total_seconds()))
gap_h = abs((closest['epoch'] - grid_time).total_seconds()) / 3600.0
if gap_h <= gap_tolerance_hours:
gridded.append(closest)
gap_hours.append(gap_h)
else:
gridded.append(None)
gap_hours.append(float('inf'))
return gridded, gap_hours
Step 3: Feature engineering with F10.7
def fetch_f107_noaa(start_date: date, end_date: date) -> dict[date, float]:
"""
Fetch daily F10.7 solar flux index from NOAA.
Returns dict mapping date -> F10.7 value.
NOAA provides this as a free public dataset.
URL format for the daily observed F10.7:
https://www.ngdc.noaa.gov/stp/space-weather/solar-data/solar-features/
solar-radio/noontime-flux/penticton/penticton_observed/tables/
table_drao_flux-observed-daily_drao_*.txt
This function uses a simplified NOAA JSON endpoint for recent data.
For historical data, parse the fixed-width text files from the URL above.
"""
# Simplified: return a constant for the offline case
# In production, fetch from:
# https://services.swpc.noaa.gov/json/solar-geophysical-activity.json
# or parse the NOAA Penticton archive tables
url = "https://services.swpc.noaa.gov/json/solar-geophysical-activity.json"
try:
resp = requests.get(url, timeout=10)
data = resp.json()
# Structure varies; parse the most recent value
# For a robust implementation, use the historical text file archive
current_f107 = float(data[0].get('solar_flux', 150.0))
except Exception:
current_f107 = 150.0 # typical solar-cycle-averaged value
# Return the same value for all dates (production code should use daily lookup)
result = {}
d = start_date
while d <= end_date:
result[d] = current_f107
d += timedelta(days=1)
return result
# Reuse the feature engineering functions from Lesson 1
# (build_feature_vector, compute_j2_raan_rate, mean_motion_to_sma)
# They are reproduced below for standalone project use.
J2 = 1.08263e-3
RE = 6378.137
def mean_motion_to_sma(n_rev_per_min: float) -> float:
GM = 398600.4418
n_rad_per_sec = n_rev_per_min * 2 * np.pi / 60.0
return (GM / n_rad_per_sec**2) ** (1.0 / 3.0)
def compute_j2_raan_rate(n_rev_per_min: float, a_km: float,
e: float, i_deg: float) -> float:
n_rad_per_sec = n_rev_per_min * 2 * np.pi / 60.0
i_rad = np.radians(i_deg)
raan_dot = (
-1.5 * n_rad_per_sec * J2 * (RE / a_km)**2
/ (1 - e**2)**2
* np.cos(i_rad)
)
return np.degrees(raan_dot) * 86400
def build_feature_vector(rec_prev: dict, rec_curr: dict,
f107: float, obj_class: int) -> np.ndarray:
dt_days = (rec_curr['epoch'] - rec_prev['epoch']).total_seconds() / 86400.0
if dt_days < 1e-6:
return None
dn_dt = (rec_curr['n'] - rec_prev['n']) / dt_days
de_dt = (rec_curr['e'] - rec_prev['e']) / dt_days
di_dt = (rec_curr['i_deg'] - rec_prev['i_deg']) / dt_days
a_km = mean_motion_to_sma(rec_prev['n'])
j2_rate = compute_j2_raan_rate(rec_prev['n'], a_km, rec_prev['e'], rec_prev['i_deg'])
raan_predicted = rec_prev['raan_deg'] + j2_rate * dt_days
raan_residual = rec_curr['raan_deg'] - raan_predicted
raan_residual = (raan_residual + 180) % 360 - 180
draan_dt = raan_residual / dt_days
dt_hours = dt_days * 24.0
return np.array([
dn_dt, de_dt, di_dt, draan_dt,
rec_curr['bstar'], f107, dt_hours, float(obj_class),
], dtype=np.float32)
```rust
// Pure stdlib — no external crates needed.
use std::f64::consts::PI;
const J2: f64 = 1.08263e-3;
const RE: f64 = 6378.137; // km
const GM: f64 = 398600.4418; // km³/s²
fn mean_motion_to_sma(n_rev_per_min: f64) -> f64 {
let n_rad_per_sec = n_rev_per_min * 2.0 * PI / 60.0;
(GM / n_rad_per_sec.powi(2)).powf(1.0 / 3.0)
}
fn compute_j2_raan_rate(n_rev_per_min: f64, a_km: f64, e: f64, i_deg: f64) -> f64 {
let n_rad_per_sec = n_rev_per_min * 2.0 * PI / 60.0;
let i_rad = i_deg.to_radians();
let raan_dot = -1.5 * n_rad_per_sec * J2 * (RE / a_km).powi(2)
/ (1.0 - e * e).powi(2)
* i_rad.cos();
raan_dot.to_degrees() * 86400.0 // rad/s → deg/day
}
/// Returns [dn_dt, de_dt, di_dt, draan_dt, bstar, f107, dt_hours, obj_class]
/// or None if dt_days is too small.
fn build_feature_vector(
prev_n: f64, prev_e: f64, prev_i: f64, prev_raan: f64, prev_bstar: f64,
curr_n: f64, curr_e: f64, curr_i: f64, curr_raan: f64,
dt_days: f64, f107: f64, obj_class: f64,
) -> Option<[f64; 8]> {
if dt_days < 1e-6 { return None; }
let dn_dt = (curr_n - prev_n) / dt_days;
let de_dt = (curr_e - prev_e) / dt_days;
let di_dt = (curr_i - prev_i) / dt_days;
let a_km = mean_motion_to_sma(prev_n);
let j2_rate = compute_j2_raan_rate(prev_n, a_km, prev_e, prev_i);
// Subtract predicted J2 drift so only non-J2 RAAN changes register
let mut raan_residual = curr_raan - (prev_raan + j2_rate * dt_days);
raan_residual = ((raan_residual + 180.0) % 360.0) - 180.0; // wrap to [-180, 180)
let draan_dt = raan_residual / dt_days;
Some([dn_dt, de_dt, di_dt, draan_dt, prev_bstar, f107, dt_days * 24.0, obj_class])
}
fn main() {
// ISS-like object: ~15.5 rev/day at 51.6° inclination
let n_rev_per_min = 15.5 / 1440.0;
let a_km = mean_motion_to_sma(n_rev_per_min);
let j2_rate = compute_j2_raan_rate(n_rev_per_min, a_km, 0.0006, 51.6);
println!("Semi-major axis: {:.2} km (expected ~6780 km)", a_km);
println!("J2 RAAN drift: {:.4} deg/day (expected ~-7 deg/day)", j2_rate);
// Simulate a quiet day: curr_raan follows J2 prediction exactly → draan_dt ≈ 0
let quiet = build_feature_vector(
n_rev_per_min, 0.0006, 51.64, 22.45, 1.3e-4,
n_rev_per_min + 1e-8, 0.0006, 51.64, 22.45 + j2_rate * 1.0,
1.0, 150.0, 2.0,
);
// Simulate a maneuver day: mean motion jumps by 0.1%
let maneuver = build_feature_vector(
n_rev_per_min, 0.0006, 51.64, 22.45, 1.3e-4,
n_rev_per_min * 1.001, 0.0006, 51.64, 22.45 + j2_rate * 1.0,
1.0, 150.0, 2.0,
);
if let (Some(q), Some(m)) = (quiet, maneuver) {
println!("\n{:<12} {:>12} {:>12}", "Feature", "Quiet day", "Maneuver day");
let labels = ["dn_dt", "de_dt", "di_dt", "draan_dt", "bstar", "f107", "dt_h", "obj_class"];
for (label, (qv, mv)) in labels.iter().zip(q.iter().zip(m.iter())) {
println!("{:<12} {:>12.4e} {:>12.4e}", label, qv, mv);
}
}
}
raan_residual = ((raan_residual + 180.0) % 360.0) - 180.0 wraps the angle to ([-180°, 180°)) — the same modular arithmetic as the Python version, since Rust's % on f64 is the IEEE remainder (same sign as dividend).
def gridded_history_to_window( gridded: list[Optional[dict]], f107_map: dict[date, float], obj_class: int, ) -> np.ndarray: """ Convert a daily-gridded TLE history to a (30, 9) feature+mask array. """ window_days = len(gridded) features = np.zeros((window_days, 8), dtype=np.float32) mask = np.zeros((window_days, 1), dtype=np.float32)
for day in range(1, window_days):
rec_prev = gridded[day - 1]
rec_curr = gridded[day]
if rec_prev is None or rec_curr is None:
continue
f107 = f107_map.get(rec_curr['epoch'].date(), 150.0)
fvec = build_feature_vector(rec_prev, rec_curr, f107, obj_class)
if fvec is not None:
features[day] = fvec
mask[day] = 1.0
return np.concatenate([features, mask], axis=1) # (30, 9)
## Step 4: Synthetic training data generation
```python
import copy
def inject_maneuver_into_gridded(
gridded: list[Optional[dict]],
inject_day: int,
delta_frac: float,
) -> list[Optional[dict]]:
"""
Inject a synthetic maneuver into a daily-gridded TLE history.
From inject_day onward, shift mean motion by n * delta_frac.
None entries (missing observations) are left as None.
"""
modified = copy.deepcopy(gridded)
# Find the mean motion at the injection point
ref_record = modified[inject_day]
if ref_record is None:
# Find the nearest non-None record to get a reference mean motion
for offset in range(1, 5):
if inject_day - offset >= 0 and modified[inject_day - offset] is not None:
ref_record = modified[inject_day - offset]
break
if ref_record is None:
return gridded # cannot inject, return unchanged
delta_n = ref_record['n'] * delta_frac
for idx in range(inject_day, len(modified)):
if modified[idx] is not None:
modified[idx]['n'] += delta_n
return modified
def build_training_dataset(
catalog_histories: dict[int, list[dict]],
f107_map: dict[date, float],
start_date: date,
n_positive: int = 4000,
n_negative: int = 4000,
window_days: int = 30,
seed: int = 42,
) -> tuple[np.ndarray, np.ndarray]:
"""
Build balanced training dataset from a dict of TLE histories.
catalog_histories: NORAD_ID -> sorted list of parsed TLE dicts
Only use debris objects (obj_class == 1) for background histories.
"""
rng = np.random.default_rng(seed)
# Select only debris backgrounds for training
debris_histories = [
records for records in catalog_histories.values()
if records and records[0]['obj_class'] == 1 and len(records) >= window_days + 5
]
print(f"Found {len(debris_histories)} debris objects with sufficient history")
windows_list = []
labels_list = []
end_date = start_date + timedelta(days=90)
def get_random_window(history):
"""Extract a random 30-day gridded window from a TLE history."""
window_start = start_date + timedelta(
days=int(rng.integers(0, 60))
)
window_records = [
r for r in history
if window_start <= r['epoch'].date() <= (window_start + timedelta(days=window_days))
]
gridded, _ = grid_to_daily(window_records, window_start, window_days)
# Must have at least 20 observed days out of 30
n_obs = sum(1 for g in gridded if g is not None)
if n_obs < 20:
return None
return gridded
# Negative examples: clean debris windows
neg_attempts = 0
while len(labels_list) < n_negative and neg_attempts < n_negative * 5:
neg_attempts += 1
hist = debris_histories[rng.integers(len(debris_histories))]
gridded = get_random_window(hist)
if gridded is None:
continue
obj_class = hist[0]['obj_class']
window = gridded_history_to_window(gridded, f107_map, obj_class)
windows_list.append(window)
labels_list.append(0)
# Positive examples: maneuver injected
pos_attempts = 0
while len(labels_list) < n_negative + n_positive and pos_attempts < n_positive * 5:
pos_attempts += 1
hist = debris_histories[rng.integers(len(debris_histories))]
gridded = get_random_window(hist)
if gridded is None:
continue
inject_day = int(rng.integers(10, 20))
delta_frac = float(rng.uniform(0.0001, 0.002))
gridded = inject_maneuver_into_gridded(gridded, inject_day, delta_frac)
obj_class = hist[0]['obj_class']
window = gridded_history_to_window(gridded, f107_map, obj_class)
windows_list.append(window)
labels_list.append(1)
windows = np.stack(windows_list)
labels = np.array(labels_list, dtype=np.int64)
perm = rng.permutation(len(labels))
print(f"Dataset: {(labels==0).sum()} negatives, {(labels==1).sum()} positives")
return windows[perm], labels[perm]
Step 5: Model training
# Reproduce model and training loop from Lesson 1 for standalone use.
# (ManeuverLSTM, train_one_epoch, evaluate, train_maneuver_detector
# are defined identically to the Lesson 1 code.)
class ManeuverLSTM(nn.Module):
def __init__(self, input_size=9, hidden_size=64, num_layers=1, dropout=0.2):
super().__init__()
self.lstm = nn.LSTM(
input_size=input_size, hidden_size=hidden_size,
num_layers=num_layers, batch_first=True,
dropout=dropout if num_layers > 1 else 0.0,
)
self.dropout = nn.Dropout(dropout)
self.classifier = nn.Linear(hidden_size, 2)
def forward(self, x):
_, (h_n, _) = self.lstm(x)
return self.classifier(self.dropout(h_n[-1]))
class TLEWindowDataset(Dataset):
def __init__(self, windows, labels):
self.windows = torch.tensor(windows, dtype=torch.float32)
self.labels = torch.tensor(labels, dtype=torch.long)
def __len__(self): return len(self.labels)
def __getitem__(self, idx):
return self.windows[idx], self.labels[idx]
def run_training(
windows: np.ndarray,
labels: np.ndarray,
val_fraction: float = 0.15,
n_epochs: int = 30,
batch_size: int = 64,
) -> ManeuverLSTM:
split = int(len(labels) * (1 - val_fraction))
train_w, val_w = windows[:split], windows[split:]
train_l, val_l = labels[:split], labels[split:]
train_ds = TLEWindowDataset(train_w, train_l)
val_ds = TLEWindowDataset(val_w, val_l)
train_loader = DataLoader(train_ds, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_ds, batch_size=batch_size, shuffle=False)
device = torch.device('cpu')
model = ManeuverLSTM().to(device)
weight = torch.tensor([1.0, 100.0], device=device)
criterion = nn.CrossEntropyLoss(weight=weight)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
optimizer, mode='max', patience=3, factor=0.5
)
best_f1 = 0.0
best_state = None
for epoch in range(n_epochs):
model.train()
for windows_b, labels_b in train_loader:
optimizer.zero_grad()
loss = criterion(model(windows_b.to(device)), labels_b.to(device))
loss.backward()
nn.utils.clip_grad_norm_(model.parameters(), 1.0)
optimizer.step()
# Validation
model.eval()
preds_all, labels_all = [], []
with torch.no_grad():
for w_b, l_b in val_loader:
p = model(w_b.to(device)).argmax(1).cpu().numpy()
preds_all.extend(p)
labels_all.extend(l_b.numpy())
pa = np.array(preds_all)
la = np.array(labels_all)
tp = int(((pa==1)&(la==1)).sum())
fp = int(((pa==1)&(la==0)).sum())
fn = int(((pa==0)&(la==1)).sum())
prec = tp / (tp + fp + 1e-8)
rec = tp / (tp + fn + 1e-8)
f1 = 2 * prec * rec / (prec + rec + 1e-8)
scheduler.step(f1)
if f1 > best_f1:
best_f1 = f1
best_state = {k: v.clone() for k, v in model.state_dict().items()}
if (epoch + 1) % 5 == 0:
print(f" Epoch {epoch+1:>2}: val_f1={f1:.3f} prec={prec:.3f} rec={rec:.3f}")
if best_state:
model.load_state_dict(best_state)
print(f"Training done. Best val F1={best_f1:.3f}")
return model
Step 6: Evaluate on ISS reboost test set
This step evaluates detection latency: how many days after the documented reboost date does the model first flag a window?
def evaluate_on_iss_test_set(
model: ManeuverLSTM,
iss_records: list[dict], # sorted TLE history for ISS (NORAD 25544)
f107_map: dict,
test_events: list[dict], # ISS_REBOOST_TEST_EVENTS from above
window_days: int = 30,
prob_threshold: float = 0.5,
) -> dict:
"""
For each documented reboost event, search windows ending within
[event_date, event_date + 14 days] and report the earliest detection.
Returns a summary dict with per-event latency and aggregate statistics.
"""
model.eval()
results = []
for event in test_events:
event_date = datetime.strptime(event['date'], '%Y-%m-%d').date()
delta_v_ms = event.get('delta_v_ms', 0.0)
detected = False
latency_days = None
# Search windows ending up to 14 days after the event
for days_after in range(0, 15):
window_end = event_date + timedelta(days=days_after)
window_start = window_end - timedelta(days=window_days)
# Extract records for this window
window_recs = [
r for r in iss_records
if window_start <= r['epoch'].date() <= window_end
]
if len(window_recs) < 10:
continue
gridded, _ = grid_to_daily(window_recs, window_start, window_days)
n_obs = sum(1 for g in gridded if g is not None)
if n_obs < 15:
continue
# obj_class=2 for ISS (active satellite)
window_arr = gridded_history_to_window(gridded, f107_map, obj_class=2)
window_t = torch.tensor(window_arr, dtype=torch.float32).unsqueeze(0)
with torch.no_grad():
logits = model(window_t)
prob_maneuver = F.softmax(logits, dim=1)[0, 1].item()
if prob_maneuver >= prob_threshold:
detected = True
latency_days = days_after
break
results.append({
'date': event['date'],
'delta_v_ms': delta_v_ms,
'detected': detected,
'latency_days': latency_days,
'notes': event.get('notes', ''),
})
status = f"DETECTED (latency={latency_days}d)" if detected else "MISSED"
print(f" {event['date']} Δv≈{delta_v_ms:.1f}m/s: {status}")
detected_events = [r for r in results if r['detected']]
detection_rate = len(detected_events) / len(results)
avg_latency = (
np.mean([r['latency_days'] for r in detected_events])
if detected_events else float('nan')
)
print(f"\nISS test set: {len(detected_events)}/{len(results)} detected "
f"({detection_rate:.1%}), avg latency={avg_latency:.1f} days")
return {
'events': results,
'detection_rate': detection_rate,
'avg_latency': avg_latency,
}
def evaluate_false_alarm_rate(
model: ManeuverLSTM,
quiet_histories: dict[int, list[dict]],
f107_map: dict,
start_date: date,
monitoring_days: int = 90,
prob_threshold: float = 0.5,
) -> float:
"""
Compute false alarm rate per object per month on confirmed non-maneuvering objects.
Uses all objects in quiet_histories (debris only).
Returns false alerts per object per month.
"""
model.eval()
total_objects = 0
total_false_alerts = 0
object_months = 0.0
for norad_id, records in quiet_histories.items():
obj_class = records[0]['obj_class'] if records else 1
if obj_class != 1: # debris only
continue
total_objects += 1
object_months += monitoring_days / 30.0
object_alerts = 0
# Slide window by 1 day over the monitoring period
for start_offset in range(monitoring_days - 30):
window_start = start_date + timedelta(days=start_offset)
window_end = window_start + timedelta(days=30)
window_recs = [
r for r in records
if window_start <= r['epoch'].date() <= window_end
]
if len(window_recs) < 10:
continue
gridded, _ = grid_to_daily(window_recs, window_start, 30)
n_obs = sum(1 for g in gridded if g is not None)
if n_obs < 15:
continue
window_arr = gridded_history_to_window(gridded, f107_map, obj_class)
window_t = torch.tensor(window_arr, dtype=torch.float32).unsqueeze(0)
with torch.no_grad():
logits = model(window_t)
prob_m = F.softmax(logits, dim=1)[0, 1].item()
if prob_m >= prob_threshold:
object_alerts += 1
# Count non-overlapping alerts only (alert-free cooldown of 5 days)
total_false_alerts += object_alerts
rate = total_false_alerts / (object_months + 1e-8)
print(f"False alarm rate: {rate:.2f} per object per month "
f"({total_false_alerts} alerts, {object_months:.1f} object-months, "
f"{total_objects} objects)")
return rate
Step 7: Live simulation
Simulate the production deployment pattern: new TLEs arrive each day, the pipeline processes them, and alerts are emitted when a maneuver is detected.
def run_live_simulation(
model: ManeuverLSTM,
live_records: list[dict], # sorted TLE history, newest last
f107_map: dict,
prob_threshold: float = 0.5,
window_days: int = 30,
alert_cooldown_days: int = 5,
) -> list[dict]:
"""
Simulate streaming TLE ingestion.
Process each new TLE epoch as if it just arrived.
Maintain a rolling 30-day window and emit an alert when P(maneuver) > threshold.
Suppress repeated alerts within alert_cooldown_days of a previous alert.
Returns list of alert dicts.
"""
model.eval()
alerts = []
last_alert_date = None
if not live_records:
return alerts
start_date = live_records[0]['epoch'].date()
end_date = live_records[-1]['epoch'].date()
current_date = start_date + timedelta(days=window_days)
while current_date <= end_date:
window_start = current_date - timedelta(days=window_days)
window_recs = [
r for r in live_records
if window_start <= r['epoch'].date() <= current_date
]
# Need minimum coverage
if len(window_recs) < 10:
current_date += timedelta(days=1)
continue
gridded, _ = grid_to_daily(window_recs, window_start, window_days)
n_obs = sum(1 for g in gridded if g is not None)
if n_obs < 15:
current_date += timedelta(days=1)
continue
obj_class = live_records[0]['obj_class']
window_arr = gridded_history_to_window(gridded, f107_map, obj_class)
window_t = torch.tensor(window_arr, dtype=torch.float32).unsqueeze(0)
with torch.no_grad():
logits = model(window_t)
prob_m = F.softmax(logits, dim=1)[0, 1].item()
in_cooldown = (
last_alert_date is not None and
(current_date - last_alert_date).days < alert_cooldown_days
)
if prob_m >= prob_threshold and not in_cooldown:
alert = {
'alert_date': current_date.isoformat(),
'norad_id': live_records[0]['norad_id'],
'prob_maneuver': round(prob_m, 4),
'window_start': window_start.isoformat(),
'window_end': current_date.isoformat(),
}
alerts.append(alert)
last_alert_date = current_date
print(f" ALERT [{current_date}] NORAD {live_records[0]['norad_id']}: "
f"P(maneuver)={prob_m:.3f}")
current_date += timedelta(days=1)
print(f"\nLive simulation: {len(alerts)} alerts over "
f"{(end_date - start_date).days} days")
return alerts
Putting it all together
def main():
"""
Complete pipeline from data fetch to live simulation.
Set SPACETRACK_USER and SPACETRACK_PASS as environment variables,
or replace with your credentials directly (do not commit credentials).
"""
import os
# -----------------------------------------------------------------------
# Configuration
# -----------------------------------------------------------------------
SPACETRACK_USER = os.environ.get('SPACETRACK_USER', 'your_username_here')
SPACETRACK_PASS = os.environ.get('SPACETRACK_PASS', 'your_password_here')
END_DATE = date.today()
START_DATE = END_DATE - timedelta(days=90)
# Curated catalog subset: ISS + a handful of debris objects
# ISS NORAD: 25544
# A selection of well-tracked LEO debris objects with dense TLE history:
CATALOG_IDS = [
25544, # ISS (active, test set only — do not use for training)
# Debris objects: update these with current catalog entries from Space-Track
# Filter by: OBJECT_TYPE = DEBRIS, INCLINATION 51-52 deg (similar to ISS),
# MEAN_MOTION 15.4-15.6 rev/day, dense TLE history (> 60 records in 90 days)
# Example placeholder IDs (replace with real debris NORAD IDs):
20580, # example debris placeholder
22285, # example debris placeholder
27386, # example debris placeholder
29664, # example debris placeholder
32063, # example debris placeholder
35491, # example debris placeholder
37820, # example debris placeholder
40086, # example debris placeholder
]
# -----------------------------------------------------------------------
# Step 1: Fetch TLE history
# -----------------------------------------------------------------------
print("Step 1: Fetching TLE history from Space-Track...")
client = SpaceTrackClient(SPACETRACK_USER, SPACETRACK_PASS)
raw_catalog = client.fetch_catalog_subset(CATALOG_IDS, START_DATE, END_DATE)
# Parse and clean
catalog_histories = {}
for nid, raw_records in raw_catalog.items():
parsed = filter_and_sort(raw_records, reject_rocket_bodies=(nid != 25544))
if len(parsed) >= 20:
catalog_histories[nid] = parsed
print(f" Usable histories: {len(catalog_histories)} objects")
# -----------------------------------------------------------------------
# Step 2-3: Feature engineering with F10.7
# -----------------------------------------------------------------------
print("\nStep 2-3: Fetching F10.7 and preparing features...")
f107_map = fetch_f107_noaa(START_DATE, END_DATE)
# -----------------------------------------------------------------------
# Step 4: Synthetic training data
# -----------------------------------------------------------------------
print("\nStep 4: Generating synthetic training data...")
# Exclude ISS from background (it is the test object)
debris_catalog = {k: v for k, v in catalog_histories.items() if k != ISS_NORAD_ID}
windows, labels = build_training_dataset(
catalog_histories = debris_catalog,
f107_map = f107_map,
start_date = START_DATE,
n_positive = 4000,
n_negative = 4000,
)
print(f" Training set: {windows.shape}, labels: {labels.shape}")
# -----------------------------------------------------------------------
# Step 5: Train
# -----------------------------------------------------------------------
print("\nStep 5: Training LSTM maneuver detector...")
model = run_training(windows, labels, n_epochs=30)
# Save checkpoint
torch.save(model.state_dict(), 'maneuver_detector.pt')
print(" Model saved to maneuver_detector.pt")
# -----------------------------------------------------------------------
# Step 6: Evaluate on ISS reboost test set
# -----------------------------------------------------------------------
print("\nStep 6: Evaluating on ISS reboost test set...")
iss_records = catalog_histories.get(ISS_NORAD_ID, [])
if iss_records:
test_results = evaluate_on_iss_test_set(
model, iss_records, f107_map, ISS_REBOOST_TEST_EVENTS
)
else:
print(" ISS records not available. Check Space-Track fetch.")
# Evaluate false alarm rate on debris objects
quiet_debris = {k: v for k, v in catalog_histories.items()
if k != ISS_NORAD_ID and v and v[0]['obj_class'] == 1}
if quiet_debris:
print("\n Evaluating false alarm rate on debris objects...")
far = evaluate_false_alarm_rate(model, quiet_debris, f107_map, START_DATE)
# -----------------------------------------------------------------------
# Step 7: Live simulation on ISS
# -----------------------------------------------------------------------
print("\nStep 7: Running live simulation on ISS TLE history...")
if iss_records:
alerts = run_live_simulation(model, iss_records, f107_map)
print(f"\nLive simulation produced {len(alerts)} maneuver alerts.")
for a in alerts:
print(f" {a['alert_date']} P={a['prob_maneuver']:.3f} "
f"window [{a['window_start']} to {a['window_end']}]")
print("\nPipeline complete.")
if __name__ == '__main__':
main()
Reflection questions
After running the full pipeline, answer these in a comment block at the top of your script:
-
What was your model's detection rate on the ISS test events? If it was low, is that expected given the delta-V magnitudes of those events relative to your synthetic training distribution?
-
What was your false alarm rate per object per month on debris objects? Is it below 2.0? If not, what would you try first to reduce it — adjusting the probability threshold, increasing the positive class weight, or engineering better features?
-
Detection latency: for the events your model did detect, how many days after the documented reboost date was the detection? Does this meet the less-than-3-day target?
-
What would change in the pipeline if you wanted to monitor GEO objects instead of LEO? (Hint: J2 drift rates are different, atmospheric drag is negligible, and solar radiation pressure effects are larger. What features would you remove or add?)
-
How would you extend this pipeline to output not just "maneuver detected" but a rough estimate of the maneuver size (Δv) and type (in-plane vs. out-of-plane)? What architecture change would be required?
What's next
With a working maneuver detector, the natural next extensions are:
- Intent inference: given a detected maneuver trajectory, classify the intent — station-keeping, rendezvous approach, avoidance, plane change — using the game-theoretic models from Module 5. The LSTM output (P(maneuver)) becomes an observation in a POMDP over adversary intent.
- Multi-object alerting service: wrap the pipeline in a lightweight API that monitors a user-configured watch list and delivers alerts via webhook or email.
- Fleet-level anomaly scoring: instead of per-object binary classification, score the entire catalog for anomalous behavior relative to historical baselines, and surface the top-K most unusual objects each day.
The maneuver detector built in this project is the sensor front-end for all of these downstream products. The game-theoretic reasoning from Modules 5–8 lives above it in the stack.