feat: add alerting engine and heartbeat sender (v2.0)

This commit is contained in:
fullsizemalt 2026-01-01 23:33:29 -08:00
parent a024a124f3
commit 722b777927
5 changed files with 427 additions and 25 deletions

View file

@ -1,13 +1,44 @@
{ {
"$schema": "https://veridian.runfoo.run/schemas/edge-config.json", "$schema": "https://veridian.runfoo.run/schemas/edge-config.json",
"version": "1.0", "version": "2.0",
"facilityId": "YOUR_FACILITY_UUID", "facilityId": "YOUR_FACILITY_UUID",
"backendUrl": "https://api.veridian.runfoo.run", "edgeId": "rpi-01",
"backendApiKey": "YOUR_API_KEY", "server": {
"url": "https://api.veridian.runfoo.run",
"apiKey": "YOUR_API_KEY",
"heartbeatIntervalSec": 60,
"syncIntervalSec": 300
},
"sensorpush": { "sensorpush": {
"email": "sensors@facility.com", "email": "sensors@facility.com",
"password": "YOUR_PASSWORD" "password": "YOUR_PASSWORD"
}, },
"alerts": {
"enabled": true,
"cooldownMinutes": 15,
"thresholds": [
{
"sensor": "*",
"metric": "temperature",
"min": 60,
"max": 85
},
{
"sensor": "*",
"metric": "humidity",
"min": 40,
"max": 70
}
]
},
"storage": {
"retentionDays": 30,
"maxRows": 100000
},
"dashboard": {
"enabled": true,
"port": 8080
},
"sensorMappings": [ "sensorMappings": [
{ {
"sensorId": "123456.7890", "sensorId": "123456.7890",
@ -16,6 +47,5 @@
} }
], ],
"pollingIntervalSec": 60, "pollingIntervalSec": 60,
"bufferMaxRows": 10000,
"logLevel": "info" "logLevel": "info"
} }

142
src/alerts.ts Normal file
View file

@ -0,0 +1,142 @@
/**
* Alert Engine - Threshold monitoring with cooldowns
*
* Monitors sensor readings against configurable thresholds.
* Fires alerts locally and sends to server for notification fan-out.
*/
export interface ThresholdConfig {
sensor: string; // Sensor ID or "*" for all
metric: 'temperature' | 'humidity' | 'vpd' | 'dewpoint';
min?: number;
max?: number;
}
export interface AlertConfig {
enabled: boolean;
cooldownMinutes: number;
thresholds: ThresholdConfig[];
}
export interface Alert {
id: string;
sensorId: string;
sensorName: string;
metric: string;
value: number;
threshold: number;
type: 'HIGH' | 'LOW';
timestamp: Date;
}
type AlertCallback = (alert: Alert) => void;
export class AlertEngine {
private config: AlertConfig;
private cooldowns: Map<string, Date> = new Map();
private onAlert: AlertCallback;
constructor(config: AlertConfig, onAlert: AlertCallback) {
this.config = config;
this.onAlert = onAlert;
}
/**
* Check a reading against thresholds
*/
checkReading(reading: {
sensorId: string;
sensorName: string;
temperature: number;
humidity: number;
vpd?: number;
dewpoint?: number;
}): Alert | null {
if (!this.config.enabled) return null;
for (const threshold of this.config.thresholds) {
// Check if threshold applies to this sensor
if (threshold.sensor !== '*' && threshold.sensor !== reading.sensorId) {
continue;
}
const value = reading[threshold.metric];
if (value === undefined) continue;
// Check high threshold
if (threshold.max !== undefined && value > threshold.max) {
return this.createAlert(reading, threshold.metric, value, threshold.max, 'HIGH');
}
// Check low threshold
if (threshold.min !== undefined && value < threshold.min) {
return this.createAlert(reading, threshold.metric, value, threshold.min, 'LOW');
}
}
return null;
}
/**
* Create and emit an alert if not in cooldown
*/
private createAlert(
reading: { sensorId: string; sensorName: string },
metric: string,
value: number,
threshold: number,
type: 'HIGH' | 'LOW'
): Alert | null {
const cooldownKey = `${reading.sensorId}-${metric}-${type}`;
const lastAlert = this.cooldowns.get(cooldownKey);
// Check cooldown
if (lastAlert) {
const cooldownMs = this.config.cooldownMinutes * 60 * 1000;
if (Date.now() - lastAlert.getTime() < cooldownMs) {
return null; // Still in cooldown
}
}
// Create alert
const alert: Alert = {
id: `${Date.now()}-${Math.random().toString(36).slice(2, 8)}`,
sensorId: reading.sensorId,
sensorName: reading.sensorName,
metric,
value,
threshold,
type,
timestamp: new Date(),
};
// Set cooldown
this.cooldowns.set(cooldownKey, new Date());
// Emit alert
this.onAlert(alert);
return alert;
}
/**
* Clear a specific cooldown (for testing or manual reset)
*/
clearCooldown(sensorId: string, metric: string, type: 'HIGH' | 'LOW'): void {
this.cooldowns.delete(`${sensorId}-${metric}-${type}`);
}
/**
* Clear all cooldowns
*/
clearAllCooldowns(): void {
this.cooldowns.clear();
}
/**
* Update thresholds at runtime
*/
updateThresholds(thresholds: ThresholdConfig[]): void {
this.config.thresholds = thresholds;
}
}

View file

@ -1,9 +1,10 @@
/** /**
* Configuration loader for Veridian Edge Agent * Configuration loader for Veridian Edge Agent v2.0
*/ */
import { existsSync, readFileSync } from 'fs'; import { existsSync, readFileSync } from 'fs';
import { join } from 'path'; import { join } from 'path';
import type { AlertConfig, ThresholdConfig } from './alerts';
export interface SensorMapping { export interface SensorMapping {
sensorId: string; sensorId: string;
@ -14,16 +15,35 @@ export interface SensorMapping {
export interface EdgeConfig { export interface EdgeConfig {
version: string; version: string;
facilityId: string; facilityId: string;
backendUrl: string; edgeId: string;
backendApiKey: string;
server: {
url: string;
apiKey: string;
heartbeatIntervalSec: number;
syncIntervalSec: number;
};
sensorpush: { sensorpush: {
email: string; email: string;
password: string; password: string;
gateway?: string; gateway?: string;
}; };
alerts: AlertConfig;
storage: {
retentionDays: number;
maxRows: number;
};
dashboard: {
enabled: boolean;
port: number;
};
sensorMappings: SensorMapping[]; sensorMappings: SensorMapping[];
pollingIntervalSec: number; pollingIntervalSec: number;
bufferMaxRows: number;
logLevel: 'debug' | 'info' | 'warn' | 'error'; logLevel: 'debug' | 'info' | 'warn' | 'error';
} }
@ -56,19 +76,52 @@ export function loadConfig(): EdgeConfig {
function parseConfig(path: string): EdgeConfig { function parseConfig(path: string): EdgeConfig {
console.log(`📁 Loading config from: ${path}`); console.log(`📁 Loading config from: ${path}`);
const content = readFileSync(path, 'utf-8'); const content = readFileSync(path, 'utf-8');
const config = JSON.parse(content) as EdgeConfig; const raw = JSON.parse(content);
// Handle both v1 and v2 config formats
const config: EdgeConfig = {
version: raw.version || '2.0',
facilityId: raw.facilityId,
edgeId: raw.edgeId || 'edge-01',
server: raw.server || {
url: raw.backendUrl || '',
apiKey: raw.backendApiKey || '',
heartbeatIntervalSec: 60,
syncIntervalSec: 300,
},
sensorpush: raw.sensorpush,
alerts: raw.alerts || {
enabled: true,
cooldownMinutes: 15,
thresholds: [
{ sensor: '*', metric: 'temperature', min: 60, max: 85 },
{ sensor: '*', metric: 'humidity', min: 40, max: 70 },
],
},
storage: raw.storage || {
retentionDays: 30,
maxRows: raw.bufferMaxRows || 100000,
},
dashboard: raw.dashboard || {
enabled: false,
port: 8080,
},
sensorMappings: raw.sensorMappings || [],
pollingIntervalSec: raw.pollingIntervalSec || 60,
logLevel: raw.logLevel || 'info',
};
// Validate required fields // Validate required fields
if (!config.facilityId) throw new Error('Config missing: facilityId'); if (!config.facilityId) throw new Error('Config missing: facilityId');
if (!config.backendUrl) throw new Error('Config missing: backendUrl'); if (!config.server.url) throw new Error('Config missing: server.url');
if (!config.sensorpush?.email) throw new Error('Config missing: sensorpush.email'); if (!config.sensorpush?.email) throw new Error('Config missing: sensorpush.email');
if (!config.sensorpush?.password) throw new Error('Config missing: sensorpush.password'); if (!config.sensorpush?.password) throw new Error('Config missing: sensorpush.password');
// Defaults
config.pollingIntervalSec = config.pollingIntervalSec || 60;
config.bufferMaxRows = config.bufferMaxRows || 10000;
config.logLevel = config.logLevel || 'info';
config.sensorMappings = config.sensorMappings || [];
return config; return config;
} }

96
src/heartbeat.ts Normal file
View file

@ -0,0 +1,96 @@
/**
* Heartbeat sender - Keep-alive signal to server
*
* Sends periodic heartbeats so the server knows the edge device is alive.
* Server can detect missed heartbeats and alert operators.
*/
export interface HeartbeatData {
facilityId: string;
edgeId: string;
status: 'ok' | 'degraded' | 'error';
sensorCount: number;
bufferSize: number;
lastReading?: string; // ISO timestamp
uptime: number; // seconds
}
export class HeartbeatSender {
private backendUrl: string;
private apiKey: string;
private intervalId: ReturnType<typeof setInterval> | null = null;
private startTime: Date;
private getData: () => Omit<HeartbeatData, 'uptime'>;
constructor(
backendUrl: string,
apiKey: string,
getData: () => Omit<HeartbeatData, 'uptime'>
) {
this.backendUrl = backendUrl.replace(/\/$/, '');
this.apiKey = apiKey;
this.getData = getData;
this.startTime = new Date();
}
/**
* Start sending heartbeats at the specified interval
*/
start(intervalSec: number = 60): void {
if (this.intervalId) {
this.stop();
}
// Send initial heartbeat
this.send();
// Schedule periodic heartbeats
this.intervalId = setInterval(() => {
this.send();
}, intervalSec * 1000);
console.log(`💓 Heartbeat started (every ${intervalSec}s)`);
}
/**
* Stop sending heartbeats
*/
stop(): void {
if (this.intervalId) {
clearInterval(this.intervalId);
this.intervalId = null;
}
}
/**
* Send a single heartbeat
*/
async send(): Promise<boolean> {
try {
const data = this.getData();
const heartbeat: HeartbeatData = {
...data,
uptime: Math.floor((Date.now() - this.startTime.getTime()) / 1000),
};
const res = await fetch(`${this.backendUrl}/environment/heartbeat`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${this.apiKey}`,
},
body: JSON.stringify(heartbeat),
});
if (!res.ok) {
console.warn(`⚠️ Heartbeat failed: ${res.status}`);
return false;
}
return true;
} catch (error) {
console.warn('⚠️ Heartbeat error:', error);
return false;
}
}
}

View file

@ -1,8 +1,11 @@
/** /**
* Veridian Edge Agent - SensorPush Integration * Veridian Edge Agent v2.0 - SensorPush Integration
* *
* Main entry point for the edge device. * Main entry point for the edge device.
* Polls SensorPush Cloud API and syncs readings to Veridian backend. * - Polls SensorPush Cloud API
* - Monitors thresholds and fires alerts
* - Syncs readings to Veridian backend
* - Sends heartbeats for monitoring
*/ */
import { SensorPushClient } from './sensorpush'; import { SensorPushClient } from './sensorpush';
@ -10,27 +13,51 @@ import { VeridianClient } from './veridian';
import { BufferManager } from './buffer'; import { BufferManager } from './buffer';
import { startHealthServer } from './health'; import { startHealthServer } from './health';
import { loadConfig, type EdgeConfig } from './config'; import { loadConfig, type EdgeConfig } from './config';
import { AlertEngine, type Alert } from './alerts';
import { HeartbeatSender } from './heartbeat';
// Global state // Global state
let config: EdgeConfig; let config: EdgeConfig;
let sensorPush: SensorPushClient; let sensorPush: SensorPushClient;
let veridian: VeridianClient; let veridian: VeridianClient;
let buffer: BufferManager; let buffer: BufferManager;
let alertEngine: AlertEngine;
let heartbeat: HeartbeatSender;
let lastSync: Date | null = null; let lastSync: Date | null = null;
let lastReading: Date | null = null;
let isRunning = false; let isRunning = false;
let sensorCount = 0;
async function main() { async function main() {
console.log('🌱 Veridian Edge Agent starting...'); console.log('🌱 Veridian Edge Agent v2.0 starting...');
// Load configuration // Load configuration
config = loadConfig(); config = loadConfig();
console.log(`📍 Facility: ${config.facilityId}`); console.log(`📍 Facility: ${config.facilityId}`);
console.log(`🔗 Backend: ${config.backendUrl}`); console.log(`🔗 Backend: ${config.server.url}`);
console.log(`🔔 Alerts: ${config.alerts.enabled ? 'enabled' : 'disabled'}`);
// Initialize clients // Initialize clients
sensorPush = new SensorPushClient(config.sensorpush); sensorPush = new SensorPushClient(config.sensorpush);
veridian = new VeridianClient(config.backendUrl, config.backendApiKey); veridian = new VeridianClient(config.server.url, config.server.apiKey);
buffer = new BufferManager(config.bufferMaxRows); buffer = new BufferManager(config.storage.maxRows);
// Initialize alert engine
alertEngine = new AlertEngine(config.alerts, handleAlert);
// Initialize heartbeat sender
heartbeat = new HeartbeatSender(
config.server.url,
config.server.apiKey,
() => ({
facilityId: config.facilityId,
edgeId: config.edgeId,
status: 'ok',
sensorCount,
bufferSize: buffer.count(),
lastReading: lastReading?.toISOString(),
})
);
// Authenticate with SensorPush // Authenticate with SensorPush
await sensorPush.authenticate(); await sensorPush.authenticate();
@ -44,6 +71,9 @@ async function main() {
})); }));
console.log('🏥 Health server running on :3030'); console.log('🏥 Health server running on :3030');
// Start heartbeat sender
heartbeat.start(config.server.heartbeatIntervalSec);
// Start polling loop // Start polling loop
isRunning = true; isRunning = true;
pollLoop(); pollLoop();
@ -73,7 +103,25 @@ async function pollAndSync() {
if (readings.length === 0) return; if (readings.length === 0) return;
// 2. Map sensor IDs to room IDs sensorCount = new Set(readings.map(r => r.sensorId)).size;
lastReading = new Date();
// 2. Check each reading against thresholds
for (const reading of readings) {
const mapping = config.sensorMappings.find(m => m.sensorId === reading.sensorId);
const sensorName = mapping?.name || reading.sensorId;
alertEngine.checkReading({
sensorId: reading.sensorId,
sensorName,
temperature: reading.temperature,
humidity: reading.humidity,
vpd: reading.vpd,
dewpoint: reading.dewpoint,
});
}
// 3. Map sensor IDs to room IDs
const mappedReadings = readings const mappedReadings = readings
.map(r => { .map(r => {
const mapping = config.sensorMappings.find(m => m.sensorId === r.sensorId); const mapping = config.sensorMappings.find(m => m.sensorId === r.sensorId);
@ -89,7 +137,7 @@ async function pollAndSync() {
}) })
.filter((r): r is NonNullable<typeof r> => r !== null); .filter((r): r is NonNullable<typeof r> => r !== null);
// 3. Try to sync to backend // 4. Try to sync to backend
try { try {
// First flush any buffered readings // First flush any buffered readings
const buffered = buffer.getAll(); const buffered = buffer.getAll();
@ -97,7 +145,7 @@ async function pollAndSync() {
await veridian.postReadings([...buffered, ...mappedReadings]); await veridian.postReadings([...buffered, ...mappedReadings]);
buffer.clear(); buffer.clear();
console.log(`📤 Synced ${buffered.length + mappedReadings.length} readings (including buffered)`); console.log(`📤 Synced ${buffered.length + mappedReadings.length} readings (including buffered)`);
} else { } else if (mappedReadings.length > 0) {
await veridian.postReadings(mappedReadings); await veridian.postReadings(mappedReadings);
console.log(`📤 Synced ${mappedReadings.length} readings`); console.log(`📤 Synced ${mappedReadings.length} readings`);
} }
@ -109,9 +157,42 @@ async function pollAndSync() {
} }
} }
/**
* Handle an alert from the alert engine
*/
async function handleAlert(alert: Alert) {
const icon = alert.type === 'HIGH' ? '🔥' : '❄️';
console.log(`${icon} ALERT: ${alert.sensorName} ${alert.metric} ${alert.type} (${alert.value} vs threshold ${alert.threshold})`);
// Send alert to server for notification fan-out
try {
await fetch(`${config.server.url}/environment/alert`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${config.server.apiKey}`,
},
body: JSON.stringify({
facilityId: config.facilityId,
edgeId: config.edgeId,
alertType: `${alert.metric.toUpperCase()}_${alert.type}`,
sensorId: alert.sensorId,
sensorName: alert.sensorName,
currentValue: alert.value,
threshold: alert.threshold,
timestamp: alert.timestamp.toISOString(),
}),
});
} catch (error) {
console.warn('⚠️ Failed to send alert to server:', error);
// Alert still logged locally
}
}
function shutdown() { function shutdown() {
console.log('\n🛑 Shutting down...'); console.log('\n🛑 Shutting down...');
isRunning = false; isRunning = false;
heartbeat.stop();
buffer.close(); buffer.close();
process.exit(0); process.exit(0);
} }