# Chapter 9: Real-Time & Streaming *"Static data is yesterday's news. Real-time is where the magic happens."* --- ## The Live Data Challenge Sarah's Weather Buddy was sophisticated - cached, typed, and professionally instrumented. But users wanted more. "Can we get live weather updates?" asked the CEO during a demo. "I want to see the temperature change in real-time during a storm." "And weather alerts," added the PM. "The moment a tornado warning is issued, users need to know." Sarah looked at her request-response architecture. How could she make data flow continuously? "Time to enter the streaming dimension," Marcus said. "Let me show you Server-Sent Events, WebSockets, and the art of real-time data." ## Server-Sent Events: One-Way Streaming Magic SSE is like a news ticker - the server continuously sends updates: ```typescript // Traditional polling - inefficient and delayed setInterval(async () => { const weather = await tf.get('/api/weather/current') updateDisplay(weather) }, 5000) // 5 second delay, constant requests // Server-Sent Events - real-time and efficient const events = tf.stream('/api/weather/live') events.on('temperature', (data) => { console.log(`Temperature updated: ${data.value}°C`) updateTemperature(data.value) }) events.on('alert', (data) => { console.log(`Weather alert: ${data.message}`) showAlert(data) }) events.on('error', (error) => { console.error('Stream error:', error) events.reconnect() }) ``` ## TypedFetch Streaming API TypedFetch makes streaming as easy as regular requests: ```typescript // Basic SSE stream const stream = tf.stream('/api/events') // Typed SSE stream interface WeatherEvent { type: 'temperature' | 'humidity' | 'pressure' | 'alert' value: number unit: string timestamp: number } const stream = tf.stream('/api/weather/live', { // Reconnect automatically reconnect: true, reconnectDelay: 1000, maxReconnectDelay: 30000, // Handle connection lifecycle onOpen: () => console.log('Stream connected'), onClose: () => console.log('Stream closed'), onError: (error) => console.error('Stream error:', error) }) // Listen to specific event types stream.on('temperature', (event: WeatherEvent) => { if (event.value > 30) { showHeatWarning() } }) // Listen to all events stream.on('*', (event: WeatherEvent) => { logEvent(event) }) // Close when done stream.close() ``` ## WebSocket Integration: Two-Way Communication For bidirectional real-time data, TypedFetch supports WebSockets: ```typescript // Create WebSocket connection const ws = tf.websocket('wss://api.example.com/live', { protocols: ['v2.weather.json'], // Automatic reconnection reconnect: { enabled: true, delay: 1000, maxDelay: 30000, maxAttempts: 10 }, // Heartbeat to keep connection alive heartbeat: { interval: 30000, message: { type: 'ping' }, timeout: 5000 } }) // Send typed messages interface WeatherSubscription { action: 'subscribe' | 'unsubscribe' cities: string[] metrics: Array<'temperature' | 'humidity' | 'pressure'> interval?: number } ws.send({ action: 'subscribe', cities: ['London', 'Tokyo', 'New York'], metrics: ['temperature', 'humidity'], interval: 1000 }) // Receive typed messages interface WeatherUpdate { city: string metrics: { temperature?: number humidity?: number pressure?: number } timestamp: number } ws.on('weather:update', (data) => { updateCityWeather(data.city, data.metrics) }) // Handle connection states ws.on('open', () => { console.log('WebSocket connected') syncSubscriptions() }) ws.on('close', (event) => { console.log(`WebSocket closed: ${event.code} - ${event.reason}`) }) ws.on('error', (error) => { console.error('WebSocket error:', error) }) // Graceful shutdown window.addEventListener('beforeunload', () => { ws.close(1000, 'Page unloading') }) ``` ## Streaming JSON: Handle Large Datasets For large JSON responses, stream and parse incrementally: ```typescript // Traditional - loads entire response into memory const { data } = await tf.get('/api/large-dataset') // 100MB = OOM! // Streaming JSON - process as it arrives const stream = tf.streamJSON('/api/logs/stream') let processedCount = 0 stream.on('data', (entry: LogEntry) => { processLog(entry) processedCount++ if (processedCount % 1000 === 0) { updateProgress(processedCount) } }) stream.on('end', () => { console.log(`Processed ${processedCount} log entries`) }) // Advanced: Stream with backpressure const processor = tf.streamJSON('/api/data/firehose', { highWaterMark: 100, // Buffer up to 100 items // Pause stream when overwhelmed transform: async (item) => { await expensiveProcessing(item) return item } }) processor.pipe(writableStream) ``` ## Weather Buddy 9.0: Live and Dangerous Let's add real-time features to Weather Buddy: ```typescript // weather-buddy-9.ts import { tf } from 'typedfetch' // Weather event types interface TemperatureUpdate { city: string temperature: number feelsLike: number trend: 'rising' | 'falling' | 'stable' rate: number // degrees per hour } interface WeatherAlert { id: string severity: 'advisory' | 'watch' | 'warning' | 'emergency' type: string headline: string description: string areas: string[] effective: Date expires: Date } interface PrecipitationStart { city: string type: 'rain' | 'snow' | 'sleet' | 'hail' intensity: 'light' | 'moderate' | 'heavy' expectedDuration: number // minutes accumulation?: number // mm or cm } // Real-time weather service class LiveWeatherService { private streams = new Map() private ws?: WebSocket private subscribers = new Map>() // Connect to live weather updates async connectCity(city: string) { if (this.streams.has(city)) return const stream = tf.stream(`/api/weather/live/${city}`, { reconnect: true, reconnectDelay: 2000, onOpen: () => { console.log(`Connected to ${city} weather stream`) this.emit('connected', { city }) }, onError: (error) => { console.error(`${city} stream error:`, error) this.emit('error', { city, error }) } }) // Temperature updates every second during rapid changes stream.on('temperature', (data) => { this.emit('temperature', data) // Detect rapid changes if (Math.abs(data.rate) > 5) { this.emit('rapid-change', { city: data.city, message: `Temperature ${data.trend} rapidly: ${data.rate}°/hour` }) } }) // Weather alerts stream.on('alert', (alert) => { this.emit('alert', alert) // Critical alerts need immediate attention if (alert.severity === 'emergency') { this.showEmergencyAlert(alert) } }) // Precipitation notifications stream.on('precipitation', (data) => { this.emit('precipitation', data) this.showNotification({ title: `${data.type} starting in ${city}`, body: `${data.intensity} ${data.type} expected for ${data.expectedDuration} minutes`, icon: this.getWeatherIcon(data.type) }) }) this.streams.set(city, stream) } // WebSocket for two-way communication async connectWebSocket() { this.ws = tf.websocket('wss://weather.example.com/v2/live', { reconnect: { enabled: true, delay: 1000, maxAttempts: 5 }, heartbeat: { interval: 30000, message: { type: 'ping' } } }) // Request custom alerts this.ws.on('open', () => { this.ws!.send({ type: 'configure', alerts: { temperature: { threshold: 35, direction: 'above' }, wind: { threshold: 50, unit: 'km/h' }, precipitation: { threshold: 10, unit: 'mm/h' } } }) }) // Handle custom alerts this.ws.on('custom-alert', (data) => { this.emit('custom-alert', data) }) } // Event system on(event: string, handler: Function) { if (!this.subscribers.has(event)) { this.subscribers.set(event, new Set()) } this.subscribers.get(event)!.add(handler) } off(event: string, handler: Function) { this.subscribers.get(event)?.delete(handler) } private emit(event: string, data: any) { this.subscribers.get(event)?.forEach(handler => { try { handler(data) } catch (error) { console.error(`Error in ${event} handler:`, error) } }) } private showEmergencyAlert(alert: WeatherAlert) { // Full screen alert for emergencies const alertEl = document.createElement('div') alertEl.className = 'emergency-alert' alertEl.innerHTML = `

⚠️ ${alert.headline}

${alert.description}

` document.body.appendChild(alertEl) // Also use browser notifications if ('Notification' in window && Notification.permission === 'granted') { new Notification('Emergency Weather Alert', { body: alert.headline, icon: '/emergency-icon.png', requireInteraction: true, vibrate: [200, 100, 200] }) } } private showNotification(options: NotificationOptions) { if ('Notification' in window && Notification.permission === 'granted') { new Notification(options.title, options) } } private getWeatherIcon(type: string): string { const icons = { rain: '🌧️', snow: '❄️', sleet: '🌨️', hail: '🌨️', thunderstorm: '⛈️' } return icons[type] || '🌦️' } } // Real-time UI components class LiveWeatherCard { private element: HTMLElement private data: Map = new Map() private animationFrame?: number constructor(private city: string, private container: HTMLElement) { this.element = this.createElement() this.container.appendChild(this.element) } private createElement(): HTMLElement { const card = document.createElement('div') card.className = 'weather-card live' card.innerHTML = `

${this.city}

LIVE
-- °C
💧 --%
💨 -- km/h
🔵 -- hPa
` return card } updateTemperature(data: TemperatureUpdate) { const tempEl = this.element.querySelector('.temperature .value')! const trendEl = this.element.querySelector('.temperature .trend')! // Smooth animation this.animateValue(tempEl, data.temperature) // Trend indicator const trendSymbols = { rising: '↗️', falling: '↘️', stable: '→' } trendEl.textContent = trendSymbols[data.trend] // Update chart this.updateChart('temperature', data.temperature) // Color based on temperature const color = this.getTemperatureColor(data.temperature) this.element.style.borderColor = color } showAlert(alert: WeatherAlert) { const alertsEl = this.element.querySelector('.alerts')! const alertEl = document.createElement('div') alertEl.className = `alert ${alert.severity}` alertEl.innerHTML = ` ${alert.headline} Expires: ${new Date(alert.expires).toLocaleTimeString()} ` alertsEl.appendChild(alertEl) // Auto-remove when expired const now = new Date().getTime() const expires = new Date(alert.expires).getTime() setTimeout(() => alertEl.remove(), expires - now) } private animateValue(element: Element, target: number) { const current = parseFloat(element.textContent || '0') const difference = target - current const duration = 1000 const steps = 60 const increment = difference / steps let step = 0 const animate = () => { step++ const value = current + (increment * step) element.textContent = value.toFixed(1) if (step < steps) { this.animationFrame = requestAnimationFrame(animate) } } if (this.animationFrame) { cancelAnimationFrame(this.animationFrame) } animate() } private updateChart(metric: string, value: number) { const canvas = this.element.querySelector('.live-chart') as HTMLCanvasElement const ctx = canvas.getContext('2d')! // Store data points if (!this.data.has(metric)) { this.data.set(metric, []) } const points = this.data.get(metric) points.push({ time: Date.now(), value }) // Keep last 60 points (1 minute at 1/second) if (points.length > 60) { points.shift() } // Draw chart ctx.clearRect(0, 0, canvas.width, canvas.height) ctx.strokeStyle = '#007AFF' ctx.lineWidth = 2 ctx.beginPath() points.forEach((point, index) => { const x = (index / 60) * canvas.width const y = canvas.height - ((point.value - 10) / 30 * canvas.height) if (index === 0) { ctx.moveTo(x, y) } else { ctx.lineTo(x, y) } }) ctx.stroke() } private getTemperatureColor(temp: number): string { if (temp < 0) return '#0066CC' if (temp < 10) return '#0099FF' if (temp < 20) return '#00CC99' if (temp < 30) return '#FFCC00' return '#FF6600' } } // Live weather dashboard class LiveWeatherDashboard { private service = new LiveWeatherService() private cards = new Map() private audioContext?: AudioContext async initialize() { // Request notification permission if ('Notification' in window && Notification.permission === 'default') { await Notification.requestPermission() } // Setup audio for alerts this.audioContext = new AudioContext() // Connect WebSocket for two-way communication await this.service.connectWebSocket() // Listen for events this.service.on('temperature', (data) => { this.cards.get(data.city)?.updateTemperature(data) }) this.service.on('alert', (alert) => { alert.areas.forEach(city => { this.cards.get(city)?.showAlert(alert) }) if (alert.severity === 'warning' || alert.severity === 'emergency') { this.playAlertSound(alert.severity) } }) } async addCity(city: string) { const container = document.getElementById('live-weather')! const card = new LiveWeatherCard(city, container) this.cards.set(city, card) await this.service.connectCity(city) } private playAlertSound(severity: string) { if (!this.audioContext) return const oscillator = this.audioContext.createOscillator() const gainNode = this.audioContext.createGain() oscillator.connect(gainNode) gainNode.connect(this.audioContext.destination) // Different sounds for different severities if (severity === 'emergency') { // Urgent siren oscillator.type = 'sawtooth' oscillator.frequency.setValueAtTime(440, this.audioContext.currentTime) oscillator.frequency.exponentialRampToValueAtTime(880, this.audioContext.currentTime + 0.5) gainNode.gain.setValueAtTime(0.3, this.audioContext.currentTime) } else { // Warning beep oscillator.type = 'sine' oscillator.frequency.value = 660 gainNode.gain.setValueAtTime(0.2, this.audioContext.currentTime) } oscillator.start() oscillator.stop(this.audioContext.currentTime + 0.5) } } // Initialize the live dashboard const dashboard = new LiveWeatherDashboard() dashboard.initialize() // Add cities ['London', 'Tokyo', 'Miami'].forEach(city => { dashboard.addCity(city) }) ``` ## Advanced Streaming Patterns ### 1. Multiplexed Streams Handle multiple data streams over a single connection: ```typescript class MultiplexedStream { private connection: WebSocket private channels = new Map>() constructor(url: string) { this.connection = tf.websocket(url) this.connection.on('message', (event) => { const { channel, data } = JSON.parse(event.data) this.emit(channel, data) }) } subscribe(channel: string, handler: Function) { if (!this.channels.has(channel)) { this.channels.set(channel, new Set()) // Tell server we want this channel this.connection.send({ type: 'subscribe', channel }) } this.channels.get(channel)!.add(handler) } unsubscribe(channel: string, handler: Function) { const handlers = this.channels.get(channel) if (handlers) { handlers.delete(handler) if (handlers.size === 0) { this.channels.delete(channel) // Tell server we're done with this channel this.connection.send({ type: 'unsubscribe', channel }) } } } private emit(channel: string, data: any) { this.channels.get(channel)?.forEach(handler => { handler(data) }) } } // Usage const stream = new MultiplexedStream('wss://api.example.com/multiplex') stream.subscribe('weather:london', (data) => { updateLondonWeather(data) }) stream.subscribe('weather:tokyo', (data) => { updateTokyoWeather(data) }) stream.subscribe('alerts:global', (alert) => { showGlobalAlert(alert) }) ``` ### 2. Stream Synchronization Keep multiple streams in sync: ```typescript class SynchronizedStreams { private streams = new Map() private buffer = new Map() private syncWindow = 1000 // 1 second sync window add(name: string, stream: EventSource) { this.streams.set(name, stream) this.buffer.set(name, []) stream.on('data', (data) => { this.buffer.get(name)!.push({ data, timestamp: Date.now() }) this.checkSync() }) } private checkSync() { const now = Date.now() const buffers = Array.from(this.buffer.values()) // Find synchronized data points const synced = [] for (const buffer of buffers) { const point = buffer.find(p => Math.abs(p.timestamp - now) < this.syncWindow ) if (point) { synced.push(point.data) } else { return // Not all streams have data yet } } // All streams have synchronized data this.emit('sync', synced) // Clear old data this.buffer.forEach(buffer => { const cutoff = now - this.syncWindow buffer = buffer.filter(p => p.timestamp > cutoff) }) } on(event: string, handler: Function) { // Event handling implementation } } ``` ### 3. Stream Transformation Process streaming data on the fly: ```typescript class StreamTransformer { constructor( private source: ReadableStream, private transform: (value: T) => R | Promise ) {} async *[Symbol.asyncIterator]() { const reader = this.source.getReader() try { while (true) { const { done, value } = await reader.read() if (done) break yield await this.transform(value) } } finally { reader.releaseLock() } } pipe(writable: WritableStream) { const writer = writable.getWriter() (async () => { for await (const value of this) { await writer.write(value) } await writer.close() })() } } // Usage: Aggregate streaming data const aggregator = new StreamTransformer( weatherStream, (data) => ({ ...data, timestamp: Date.now(), movingAverage: calculateMovingAverage(data.temperature) }) ) for await (const aggregated of aggregator) { updateDisplay(aggregated) } ``` ### 4. Reliable Streaming Handle disconnections gracefully: ```typescript class ReliableStream { private eventSource?: EventSource private lastEventId?: string private reconnectAttempts = 0 private queue: any[] = [] constructor( private url: string, private options: { maxReconnectAttempts?: number reconnectDelay?: number queueOfflineEvents?: boolean } = {} ) { this.connect() } private connect() { const headers: any = {} // Resume from last event if (this.lastEventId) { headers['Last-Event-ID'] = this.lastEventId } this.eventSource = new EventSource(this.url, { headers }) this.eventSource.onopen = () => { console.log('Stream connected') this.reconnectAttempts = 0 // Flush queued events if (this.queue.length > 0) { this.queue.forEach(event => this.emit('data', event)) this.queue = [] } } this.eventSource.onmessage = (event) => { this.lastEventId = event.lastEventId const data = JSON.parse(event.data) if (navigator.onLine) { this.emit('data', data) } else if (this.options.queueOfflineEvents) { this.queue.push(data) } } this.eventSource.onerror = () => { this.eventSource!.close() if (this.reconnectAttempts < (this.options.maxReconnectAttempts || 10)) { this.reconnectAttempts++ const delay = this.options.reconnectDelay || 1000 const backoff = Math.min(delay * Math.pow(2, this.reconnectAttempts), 30000) console.log(`Reconnecting in ${backoff}ms...`) setTimeout(() => this.connect(), backoff) } else { this.emit('error', new Error('Max reconnection attempts reached')) } } } private emit(event: string, data: any) { // Event emitter implementation } } ``` ## Best Practices for Real-Time 🎯 ### 1. Choose the Right Protocol ```typescript // SSE for one-way server → client if (needsServerPush && !needsBidirectional) { useSSE() } // WebSocket for bidirectional if (needsBidirectional || lowLatency) { useWebSocket() } // Long polling for compatibility if (needsFallback) { useLongPolling() } ``` ### 2. Handle Connection Lifecycle ```typescript class StreamManager { private streams = new Set() constructor() { // Clean up on page unload window.addEventListener('beforeunload', () => { this.closeAll() }) // Handle network changes window.addEventListener('online', () => { this.reconnectAll() }) window.addEventListener('offline', () => { this.pauseAll() }) // Handle page visibility document.addEventListener('visibilitychange', () => { if (document.hidden) { this.throttleAll() } else { this.resumeAll() } }) } } ``` ### 3. Implement Backpressure ```typescript class BackpressureStream { private buffer: any[] = [] private processing = false async handleData(data: any) { this.buffer.push(data) if (!this.processing) { this.processing = true await this.processBuffer() this.processing = false } } private async processBuffer() { while (this.buffer.length > 0) { const batch = this.buffer.splice(0, 10) // Process in batches await Promise.all( batch.map(item => this.processItem(item)) ) // Yield to UI await new Promise(resolve => setTimeout(resolve, 0)) } } } ``` ### 4. Monitor Stream Health ```typescript class StreamHealthMonitor { private metrics = { messagesReceived: 0, bytesReceived: 0, errors: 0, reconnections: 0, latency: [] } trackMessage(message: any) { this.metrics.messagesReceived++ this.metrics.bytesReceived += JSON.stringify(message).length if (message.timestamp) { const latency = Date.now() - message.timestamp this.metrics.latency.push(latency) // Keep last 100 latency measurements if (this.metrics.latency.length > 100) { this.metrics.latency.shift() } } } getHealth() { const avgLatency = this.metrics.latency.reduce((a, b) => a + b, 0) / this.metrics.latency.length return { ...this.metrics, averageLatency: avgLatency, health: this.calculateHealthScore() } } private calculateHealthScore(): 'good' | 'degraded' | 'poor' { const errorRate = this.metrics.errors / this.metrics.messagesReceived if (errorRate > 0.1) return 'poor' if (errorRate > 0.01) return 'degraded' return 'good' } } ``` ## Practice Time! 🏋️ ### Exercise 1: Build a Chat System Create a real-time chat with TypedFetch: ```typescript class RealtimeChat { // Your code here: // - WebSocket connection // - Message types // - User presence // - Message history // - Reconnection handling } ``` ### Exercise 2: Live Data Dashboard Build a dashboard with multiple streams: ```typescript class LiveDashboard { // Your code here: // - Multiple SSE streams // - Data synchronization // - Chart updates // - Alert system } ``` ### Exercise 3: Stream Aggregator Create a stream processing pipeline: ```typescript class StreamAggregator { // Your code here: // - Combine multiple streams // - Window functions // - Reduce operations // - Output stream } ``` ## Key Takeaways 🎯 1. **SSE for server-to-client streaming** - Simple, automatic reconnection 2. **WebSockets for bidirectional communication** - Real-time, low latency 3. **Stream JSON for large datasets** - Process without loading all in memory 4. **TypedFetch handles reconnection automatically** - Built-in reliability 5. **Handle connection lifecycle properly** - Online/offline, visibility 6. **Implement backpressure for fast streams** - Don't overwhelm the client 7. **Monitor stream health** - Track latency, errors, reconnections 8. **Choose the right protocol** - SSE vs WebSocket vs polling ## Common Pitfalls 🚨 1. **Not handling reconnection** - Networks are unreliable 2. **Memory leaks from unclosed streams** - Always clean up 3. **Overwhelming the UI thread** - Process in batches 4. **Not handling offline states** - Queue or pause appropriately 5. **Missing error boundaries** - Streams can fail anytime 6. **Ignoring backpressure** - Fast producers, slow consumers ## What's Next? You've mastered real-time streaming! But how do you make it all performant? In Chapter 10, we'll dive deep into performance optimization: - Request deduplication strategies - Connection pooling - Optimal caching configurations - Bundle size optimization - Memory management - Performance monitoring Ready to make TypedFetch blazing fast? See you in Chapter 10! ⚡ --- ## Chapter Summary - Server-Sent Events provide one-way streaming from server to client - WebSockets enable bidirectional real-time communication - TypedFetch handles automatic reconnection and error recovery - Stream JSON allows processing large datasets without memory issues - Proper lifecycle management prevents memory leaks and connection issues - Backpressure handling prevents overwhelming slow consumers - Weather Buddy 9.0 shows live temperature updates and emergency alerts - Choose SSE for simplicity, WebSocket for interactivity **Next Chapter Preview**: Performance Optimization - Make TypedFetch blazing fast with deduplication, connection pooling, and advanced caching strategies.