TypeFetched/manual/chapter-9-realtime-streaming.md
Casey Collier b85b9a63e2 Initial commit: TypedFetch - Zero-dependency, type-safe HTTP client
Features:
- Zero configuration, just works out of the box
- Runtime type inference and validation
- Built-in caching with W-TinyLFU algorithm
- Automatic retries with exponential backoff
- Circuit breaker for resilience
- Request deduplication
- Offline support with queue
- OpenAPI schema discovery
- Full TypeScript support with type descriptors
- Modular architecture
- Configurable for advanced use cases

Built with bun, ready for npm publishing
2025-07-20 12:35:43 -04:00

28 KiB

Chapter 9: Real-Time & Streaming

"Static data is yesterday's news. Real-time is where the magic happens."


The Live Data Challenge

Sarah's Weather Buddy was sophisticated - cached, typed, and professionally instrumented. But users wanted more.

"Can we get live weather updates?" asked the CEO during a demo. "I want to see the temperature change in real-time during a storm."

"And weather alerts," added the PM. "The moment a tornado warning is issued, users need to know."

Sarah looked at her request-response architecture. How could she make data flow continuously?

"Time to enter the streaming dimension," Marcus said. "Let me show you Server-Sent Events, WebSockets, and the art of real-time data."

Server-Sent Events: One-Way Streaming Magic

SSE is like a news ticker - the server continuously sends updates:

// Traditional polling - inefficient and delayed
setInterval(async () => {
  const weather = await tf.get('/api/weather/current')
  updateDisplay(weather)
}, 5000)  // 5 second delay, constant requests

// Server-Sent Events - real-time and efficient
const events = tf.stream('/api/weather/live')

events.on('temperature', (data) => {
  console.log(`Temperature updated: ${data.value}°C`)
  updateTemperature(data.value)
})

events.on('alert', (data) => {
  console.log(`Weather alert: ${data.message}`)
  showAlert(data)
})

events.on('error', (error) => {
  console.error('Stream error:', error)
  events.reconnect()
})

TypedFetch Streaming API

TypedFetch makes streaming as easy as regular requests:

// Basic SSE stream
const stream = tf.stream('/api/events')

// Typed SSE stream
interface WeatherEvent {
  type: 'temperature' | 'humidity' | 'pressure' | 'alert'
  value: number
  unit: string
  timestamp: number
}

const stream = tf.stream<WeatherEvent>('/api/weather/live', {
  // Reconnect automatically
  reconnect: true,
  reconnectDelay: 1000,
  maxReconnectDelay: 30000,
  
  // Handle connection lifecycle
  onOpen: () => console.log('Stream connected'),
  onClose: () => console.log('Stream closed'),
  onError: (error) => console.error('Stream error:', error)
})

// Listen to specific event types
stream.on('temperature', (event: WeatherEvent) => {
  if (event.value > 30) {
    showHeatWarning()
  }
})

// Listen to all events
stream.on('*', (event: WeatherEvent) => {
  logEvent(event)
})

// Close when done
stream.close()

WebSocket Integration: Two-Way Communication

For bidirectional real-time data, TypedFetch supports WebSockets:

// Create WebSocket connection
const ws = tf.websocket('wss://api.example.com/live', {
  protocols: ['v2.weather.json'],
  
  // Automatic reconnection
  reconnect: {
    enabled: true,
    delay: 1000,
    maxDelay: 30000,
    maxAttempts: 10
  },
  
  // Heartbeat to keep connection alive
  heartbeat: {
    interval: 30000,
    message: { type: 'ping' },
    timeout: 5000
  }
})

// Send typed messages
interface WeatherSubscription {
  action: 'subscribe' | 'unsubscribe'
  cities: string[]
  metrics: Array<'temperature' | 'humidity' | 'pressure'>
  interval?: number
}

ws.send<WeatherSubscription>({
  action: 'subscribe',
  cities: ['London', 'Tokyo', 'New York'],
  metrics: ['temperature', 'humidity'],
  interval: 1000
})

// Receive typed messages
interface WeatherUpdate {
  city: string
  metrics: {
    temperature?: number
    humidity?: number
    pressure?: number
  }
  timestamp: number
}

ws.on<WeatherUpdate>('weather:update', (data) => {
  updateCityWeather(data.city, data.metrics)
})

// Handle connection states
ws.on('open', () => {
  console.log('WebSocket connected')
  syncSubscriptions()
})

ws.on('close', (event) => {
  console.log(`WebSocket closed: ${event.code} - ${event.reason}`)
})

ws.on('error', (error) => {
  console.error('WebSocket error:', error)
})

// Graceful shutdown
window.addEventListener('beforeunload', () => {
  ws.close(1000, 'Page unloading')
})

Streaming JSON: Handle Large Datasets

For large JSON responses, stream and parse incrementally:

// Traditional - loads entire response into memory
const { data } = await tf.get('/api/large-dataset')  // 100MB = OOM!

// Streaming JSON - process as it arrives
const stream = tf.streamJSON<LogEntry>('/api/logs/stream')

let processedCount = 0
stream.on('data', (entry: LogEntry) => {
  processLog(entry)
  processedCount++
  
  if (processedCount % 1000 === 0) {
    updateProgress(processedCount)
  }
})

stream.on('end', () => {
  console.log(`Processed ${processedCount} log entries`)
})

// Advanced: Stream with backpressure
const processor = tf.streamJSON<DataPoint>('/api/data/firehose', {
  highWaterMark: 100,  // Buffer up to 100 items
  
  // Pause stream when overwhelmed
  transform: async (item) => {
    await expensiveProcessing(item)
    return item
  }
})

processor.pipe(writableStream)

Weather Buddy 9.0: Live and Dangerous

Let's add real-time features to Weather Buddy:

// weather-buddy-9.ts
import { tf } from 'typedfetch'

// Weather event types
interface TemperatureUpdate {
  city: string
  temperature: number
  feelsLike: number
  trend: 'rising' | 'falling' | 'stable'
  rate: number  // degrees per hour
}

interface WeatherAlert {
  id: string
  severity: 'advisory' | 'watch' | 'warning' | 'emergency'
  type: string
  headline: string
  description: string
  areas: string[]
  effective: Date
  expires: Date
}

interface PrecipitationStart {
  city: string
  type: 'rain' | 'snow' | 'sleet' | 'hail'
  intensity: 'light' | 'moderate' | 'heavy'
  expectedDuration: number  // minutes
  accumulation?: number     // mm or cm
}

// Real-time weather service
class LiveWeatherService {
  private streams = new Map<string, EventSource>()
  private ws?: WebSocket
  private subscribers = new Map<string, Set<Function>>()
  
  // Connect to live weather updates
  async connectCity(city: string) {
    if (this.streams.has(city)) return
    
    const stream = tf.stream(`/api/weather/live/${city}`, {
      reconnect: true,
      reconnectDelay: 2000,
      
      onOpen: () => {
        console.log(`Connected to ${city} weather stream`)
        this.emit('connected', { city })
      },
      
      onError: (error) => {
        console.error(`${city} stream error:`, error)
        this.emit('error', { city, error })
      }
    })
    
    // Temperature updates every second during rapid changes
    stream.on<TemperatureUpdate>('temperature', (data) => {
      this.emit('temperature', data)
      
      // Detect rapid changes
      if (Math.abs(data.rate) > 5) {
        this.emit('rapid-change', {
          city: data.city,
          message: `Temperature ${data.trend} rapidly: ${data.rate}°/hour`
        })
      }
    })
    
    // Weather alerts
    stream.on<WeatherAlert>('alert', (alert) => {
      this.emit('alert', alert)
      
      // Critical alerts need immediate attention
      if (alert.severity === 'emergency') {
        this.showEmergencyAlert(alert)
      }
    })
    
    // Precipitation notifications
    stream.on<PrecipitationStart>('precipitation', (data) => {
      this.emit('precipitation', data)
      
      this.showNotification({
        title: `${data.type} starting in ${city}`,
        body: `${data.intensity} ${data.type} expected for ${data.expectedDuration} minutes`,
        icon: this.getWeatherIcon(data.type)
      })
    })
    
    this.streams.set(city, stream)
  }
  
  // WebSocket for two-way communication
  async connectWebSocket() {
    this.ws = tf.websocket('wss://weather.example.com/v2/live', {
      reconnect: {
        enabled: true,
        delay: 1000,
        maxAttempts: 5
      },
      
      heartbeat: {
        interval: 30000,
        message: { type: 'ping' }
      }
    })
    
    // Request custom alerts
    this.ws.on('open', () => {
      this.ws!.send({
        type: 'configure',
        alerts: {
          temperature: { threshold: 35, direction: 'above' },
          wind: { threshold: 50, unit: 'km/h' },
          precipitation: { threshold: 10, unit: 'mm/h' }
        }
      })
    })
    
    // Handle custom alerts
    this.ws.on('custom-alert', (data) => {
      this.emit('custom-alert', data)
    })
  }
  
  // Event system
  on(event: string, handler: Function) {
    if (!this.subscribers.has(event)) {
      this.subscribers.set(event, new Set())
    }
    this.subscribers.get(event)!.add(handler)
  }
  
  off(event: string, handler: Function) {
    this.subscribers.get(event)?.delete(handler)
  }
  
  private emit(event: string, data: any) {
    this.subscribers.get(event)?.forEach(handler => {
      try {
        handler(data)
      } catch (error) {
        console.error(`Error in ${event} handler:`, error)
      }
    })
  }
  
  private showEmergencyAlert(alert: WeatherAlert) {
    // Full screen alert for emergencies
    const alertEl = document.createElement('div')
    alertEl.className = 'emergency-alert'
    alertEl.innerHTML = `
      <div class="alert-content">
        <h1>⚠️ ${alert.headline}</h1>
        <p>${alert.description}</p>
        <div class="alert-actions">
          <button onclick="this.parentElement.parentElement.remove()">Dismiss</button>
          <button onclick="showSafetyInfo('${alert.type}')">Safety Info</button>
        </div>
      </div>
    `
    document.body.appendChild(alertEl)
    
    // Also use browser notifications
    if ('Notification' in window && Notification.permission === 'granted') {
      new Notification('Emergency Weather Alert', {
        body: alert.headline,
        icon: '/emergency-icon.png',
        requireInteraction: true,
        vibrate: [200, 100, 200]
      })
    }
  }
  
  private showNotification(options: NotificationOptions) {
    if ('Notification' in window && Notification.permission === 'granted') {
      new Notification(options.title, options)
    }
  }
  
  private getWeatherIcon(type: string): string {
    const icons = {
      rain: '🌧️',
      snow: '❄️',
      sleet: '🌨️',
      hail: '🌨️',
      thunderstorm: '⛈️'
    }
    return icons[type] || '🌦️'
  }
}

// Real-time UI components
class LiveWeatherCard {
  private element: HTMLElement
  private data: Map<string, any> = new Map()
  private animationFrame?: number
  
  constructor(private city: string, private container: HTMLElement) {
    this.element = this.createElement()
    this.container.appendChild(this.element)
  }
  
  private createElement(): HTMLElement {
    const card = document.createElement('div')
    card.className = 'weather-card live'
    card.innerHTML = `
      <h3>${this.city}</h3>
      <div class="live-indicator">
        <span class="pulse"></span>
        LIVE
      </div>
      <div class="temperature">
        <span class="value">--</span>
        <span class="unit">°C</span>
        <span class="trend"></span>
      </div>
      <div class="metrics">
        <div class="humidity">💧 --%</div>
        <div class="wind">💨 -- km/h</div>
        <div class="pressure">🔵 -- hPa</div>
      </div>
      <div class="alerts"></div>
      <canvas class="live-chart" width="300" height="100"></canvas>
    `
    return card
  }
  
  updateTemperature(data: TemperatureUpdate) {
    const tempEl = this.element.querySelector('.temperature .value')!
    const trendEl = this.element.querySelector('.temperature .trend')!
    
    // Smooth animation
    this.animateValue(tempEl, data.temperature)
    
    // Trend indicator
    const trendSymbols = {
      rising: '↗️',
      falling: '↘️',
      stable: '→'
    }
    trendEl.textContent = trendSymbols[data.trend]
    
    // Update chart
    this.updateChart('temperature', data.temperature)
    
    // Color based on temperature
    const color = this.getTemperatureColor(data.temperature)
    this.element.style.borderColor = color
  }
  
  showAlert(alert: WeatherAlert) {
    const alertsEl = this.element.querySelector('.alerts')!
    const alertEl = document.createElement('div')
    alertEl.className = `alert ${alert.severity}`
    alertEl.innerHTML = `
      <strong>${alert.headline}</strong>
      <span class="expires">Expires: ${new Date(alert.expires).toLocaleTimeString()}</span>
    `
    
    alertsEl.appendChild(alertEl)
    
    // Auto-remove when expired
    const now = new Date().getTime()
    const expires = new Date(alert.expires).getTime()
    setTimeout(() => alertEl.remove(), expires - now)
  }
  
  private animateValue(element: Element, target: number) {
    const current = parseFloat(element.textContent || '0')
    const difference = target - current
    const duration = 1000
    const steps = 60
    const increment = difference / steps
    
    let step = 0
    const animate = () => {
      step++
      const value = current + (increment * step)
      element.textContent = value.toFixed(1)
      
      if (step < steps) {
        this.animationFrame = requestAnimationFrame(animate)
      }
    }
    
    if (this.animationFrame) {
      cancelAnimationFrame(this.animationFrame)
    }
    animate()
  }
  
  private updateChart(metric: string, value: number) {
    const canvas = this.element.querySelector('.live-chart') as HTMLCanvasElement
    const ctx = canvas.getContext('2d')!
    
    // Store data points
    if (!this.data.has(metric)) {
      this.data.set(metric, [])
    }
    
    const points = this.data.get(metric)
    points.push({ time: Date.now(), value })
    
    // Keep last 60 points (1 minute at 1/second)
    if (points.length > 60) {
      points.shift()
    }
    
    // Draw chart
    ctx.clearRect(0, 0, canvas.width, canvas.height)
    ctx.strokeStyle = '#007AFF'
    ctx.lineWidth = 2
    ctx.beginPath()
    
    points.forEach((point, index) => {
      const x = (index / 60) * canvas.width
      const y = canvas.height - ((point.value - 10) / 30 * canvas.height)
      
      if (index === 0) {
        ctx.moveTo(x, y)
      } else {
        ctx.lineTo(x, y)
      }
    })
    
    ctx.stroke()
  }
  
  private getTemperatureColor(temp: number): string {
    if (temp < 0) return '#0066CC'
    if (temp < 10) return '#0099FF'
    if (temp < 20) return '#00CC99'
    if (temp < 30) return '#FFCC00'
    return '#FF6600'
  }
}

// Live weather dashboard
class LiveWeatherDashboard {
  private service = new LiveWeatherService()
  private cards = new Map<string, LiveWeatherCard>()
  private audioContext?: AudioContext
  
  async initialize() {
    // Request notification permission
    if ('Notification' in window && Notification.permission === 'default') {
      await Notification.requestPermission()
    }
    
    // Setup audio for alerts
    this.audioContext = new AudioContext()
    
    // Connect WebSocket for two-way communication
    await this.service.connectWebSocket()
    
    // Listen for events
    this.service.on('temperature', (data) => {
      this.cards.get(data.city)?.updateTemperature(data)
    })
    
    this.service.on('alert', (alert) => {
      alert.areas.forEach(city => {
        this.cards.get(city)?.showAlert(alert)
      })
      
      if (alert.severity === 'warning' || alert.severity === 'emergency') {
        this.playAlertSound(alert.severity)
      }
    })
  }
  
  async addCity(city: string) {
    const container = document.getElementById('live-weather')!
    const card = new LiveWeatherCard(city, container)
    this.cards.set(city, card)
    
    await this.service.connectCity(city)
  }
  
  private playAlertSound(severity: string) {
    if (!this.audioContext) return
    
    const oscillator = this.audioContext.createOscillator()
    const gainNode = this.audioContext.createGain()
    
    oscillator.connect(gainNode)
    gainNode.connect(this.audioContext.destination)
    
    // Different sounds for different severities
    if (severity === 'emergency') {
      // Urgent siren
      oscillator.type = 'sawtooth'
      oscillator.frequency.setValueAtTime(440, this.audioContext.currentTime)
      oscillator.frequency.exponentialRampToValueAtTime(880, this.audioContext.currentTime + 0.5)
      gainNode.gain.setValueAtTime(0.3, this.audioContext.currentTime)
    } else {
      // Warning beep
      oscillator.type = 'sine'
      oscillator.frequency.value = 660
      gainNode.gain.setValueAtTime(0.2, this.audioContext.currentTime)
    }
    
    oscillator.start()
    oscillator.stop(this.audioContext.currentTime + 0.5)
  }
}

// Initialize the live dashboard
const dashboard = new LiveWeatherDashboard()
dashboard.initialize()

// Add cities
['London', 'Tokyo', 'Miami'].forEach(city => {
  dashboard.addCity(city)
})

Advanced Streaming Patterns

1. Multiplexed Streams

Handle multiple data streams over a single connection:

class MultiplexedStream {
  private connection: WebSocket
  private channels = new Map<string, Set<Function>>()
  
  constructor(url: string) {
    this.connection = tf.websocket(url)
    
    this.connection.on('message', (event) => {
      const { channel, data } = JSON.parse(event.data)
      this.emit(channel, data)
    })
  }
  
  subscribe(channel: string, handler: Function) {
    if (!this.channels.has(channel)) {
      this.channels.set(channel, new Set())
      
      // Tell server we want this channel
      this.connection.send({
        type: 'subscribe',
        channel
      })
    }
    
    this.channels.get(channel)!.add(handler)
  }
  
  unsubscribe(channel: string, handler: Function) {
    const handlers = this.channels.get(channel)
    if (handlers) {
      handlers.delete(handler)
      
      if (handlers.size === 0) {
        this.channels.delete(channel)
        
        // Tell server we're done with this channel
        this.connection.send({
          type: 'unsubscribe',
          channel
        })
      }
    }
  }
  
  private emit(channel: string, data: any) {
    this.channels.get(channel)?.forEach(handler => {
      handler(data)
    })
  }
}

// Usage
const stream = new MultiplexedStream('wss://api.example.com/multiplex')

stream.subscribe('weather:london', (data) => {
  updateLondonWeather(data)
})

stream.subscribe('weather:tokyo', (data) => {
  updateTokyoWeather(data)
})

stream.subscribe('alerts:global', (alert) => {
  showGlobalAlert(alert)
})

2. Stream Synchronization

Keep multiple streams in sync:

class SynchronizedStreams {
  private streams = new Map<string, any>()
  private buffer = new Map<string, any[]>()
  private syncWindow = 1000  // 1 second sync window
  
  add(name: string, stream: EventSource) {
    this.streams.set(name, stream)
    this.buffer.set(name, [])
    
    stream.on('data', (data) => {
      this.buffer.get(name)!.push({
        data,
        timestamp: Date.now()
      })
      
      this.checkSync()
    })
  }
  
  private checkSync() {
    const now = Date.now()
    const buffers = Array.from(this.buffer.values())
    
    // Find synchronized data points
    const synced = []
    
    for (const buffer of buffers) {
      const point = buffer.find(p => 
        Math.abs(p.timestamp - now) < this.syncWindow
      )
      
      if (point) {
        synced.push(point.data)
      } else {
        return  // Not all streams have data yet
      }
    }
    
    // All streams have synchronized data
    this.emit('sync', synced)
    
    // Clear old data
    this.buffer.forEach(buffer => {
      const cutoff = now - this.syncWindow
      buffer = buffer.filter(p => p.timestamp > cutoff)
    })
  }
  
  on(event: string, handler: Function) {
    // Event handling implementation
  }
}

3. Stream Transformation

Process streaming data on the fly:

class StreamTransformer<T, R> {
  constructor(
    private source: ReadableStream<T>,
    private transform: (value: T) => R | Promise<R>
  ) {}
  
  async *[Symbol.asyncIterator]() {
    const reader = this.source.getReader()
    
    try {
      while (true) {
        const { done, value } = await reader.read()
        if (done) break
        
        yield await this.transform(value)
      }
    } finally {
      reader.releaseLock()
    }
  }
  
  pipe(writable: WritableStream<R>) {
    const writer = writable.getWriter()
    
    (async () => {
      for await (const value of this) {
        await writer.write(value)
      }
      await writer.close()
    })()
  }
}

// Usage: Aggregate streaming data
const aggregator = new StreamTransformer(
  weatherStream,
  (data) => ({
    ...data,
    timestamp: Date.now(),
    movingAverage: calculateMovingAverage(data.temperature)
  })
)

for await (const aggregated of aggregator) {
  updateDisplay(aggregated)
}

4. Reliable Streaming

Handle disconnections gracefully:

class ReliableStream {
  private eventSource?: EventSource
  private lastEventId?: string
  private reconnectAttempts = 0
  private queue: any[] = []
  
  constructor(
    private url: string,
    private options: {
      maxReconnectAttempts?: number
      reconnectDelay?: number
      queueOfflineEvents?: boolean
    } = {}
  ) {
    this.connect()
  }
  
  private connect() {
    const headers: any = {}
    
    // Resume from last event
    if (this.lastEventId) {
      headers['Last-Event-ID'] = this.lastEventId
    }
    
    this.eventSource = new EventSource(this.url, { headers })
    
    this.eventSource.onopen = () => {
      console.log('Stream connected')
      this.reconnectAttempts = 0
      
      // Flush queued events
      if (this.queue.length > 0) {
        this.queue.forEach(event => this.emit('data', event))
        this.queue = []
      }
    }
    
    this.eventSource.onmessage = (event) => {
      this.lastEventId = event.lastEventId
      const data = JSON.parse(event.data)
      
      if (navigator.onLine) {
        this.emit('data', data)
      } else if (this.options.queueOfflineEvents) {
        this.queue.push(data)
      }
    }
    
    this.eventSource.onerror = () => {
      this.eventSource!.close()
      
      if (this.reconnectAttempts < (this.options.maxReconnectAttempts || 10)) {
        this.reconnectAttempts++
        const delay = this.options.reconnectDelay || 1000
        const backoff = Math.min(delay * Math.pow(2, this.reconnectAttempts), 30000)
        
        console.log(`Reconnecting in ${backoff}ms...`)
        setTimeout(() => this.connect(), backoff)
      } else {
        this.emit('error', new Error('Max reconnection attempts reached'))
      }
    }
  }
  
  private emit(event: string, data: any) {
    // Event emitter implementation
  }
}

Best Practices for Real-Time 🎯

1. Choose the Right Protocol

// SSE for one-way server → client
if (needsServerPush && !needsBidirectional) {
  useSSE()
}

// WebSocket for bidirectional
if (needsBidirectional || lowLatency) {
  useWebSocket()
}

// Long polling for compatibility
if (needsFallback) {
  useLongPolling()
}

2. Handle Connection Lifecycle

class StreamManager {
  private streams = new Set<Stream>()
  
  constructor() {
    // Clean up on page unload
    window.addEventListener('beforeunload', () => {
      this.closeAll()
    })
    
    // Handle network changes
    window.addEventListener('online', () => {
      this.reconnectAll()
    })
    
    window.addEventListener('offline', () => {
      this.pauseAll()
    })
    
    // Handle page visibility
    document.addEventListener('visibilitychange', () => {
      if (document.hidden) {
        this.throttleAll()
      } else {
        this.resumeAll()
      }
    })
  }
}

3. Implement Backpressure

class BackpressureStream {
  private buffer: any[] = []
  private processing = false
  
  async handleData(data: any) {
    this.buffer.push(data)
    
    if (!this.processing) {
      this.processing = true
      await this.processBuffer()
      this.processing = false
    }
  }
  
  private async processBuffer() {
    while (this.buffer.length > 0) {
      const batch = this.buffer.splice(0, 10)  // Process in batches
      
      await Promise.all(
        batch.map(item => this.processItem(item))
      )
      
      // Yield to UI
      await new Promise(resolve => setTimeout(resolve, 0))
    }
  }
}

4. Monitor Stream Health

class StreamHealthMonitor {
  private metrics = {
    messagesReceived: 0,
    bytesReceived: 0,
    errors: 0,
    reconnections: 0,
    latency: []
  }
  
  trackMessage(message: any) {
    this.metrics.messagesReceived++
    this.metrics.bytesReceived += JSON.stringify(message).length
    
    if (message.timestamp) {
      const latency = Date.now() - message.timestamp
      this.metrics.latency.push(latency)
      
      // Keep last 100 latency measurements
      if (this.metrics.latency.length > 100) {
        this.metrics.latency.shift()
      }
    }
  }
  
  getHealth() {
    const avgLatency = this.metrics.latency.reduce((a, b) => a + b, 0) / 
                      this.metrics.latency.length
    
    return {
      ...this.metrics,
      averageLatency: avgLatency,
      health: this.calculateHealthScore()
    }
  }
  
  private calculateHealthScore(): 'good' | 'degraded' | 'poor' {
    const errorRate = this.metrics.errors / this.metrics.messagesReceived
    
    if (errorRate > 0.1) return 'poor'
    if (errorRate > 0.01) return 'degraded'
    return 'good'
  }
}

Practice Time! 🏋️

Exercise 1: Build a Chat System

Create a real-time chat with TypedFetch:

class RealtimeChat {
  // Your code here:
  // - WebSocket connection
  // - Message types
  // - User presence
  // - Message history
  // - Reconnection handling
}

Exercise 2: Live Data Dashboard

Build a dashboard with multiple streams:

class LiveDashboard {
  // Your code here:
  // - Multiple SSE streams
  // - Data synchronization
  // - Chart updates
  // - Alert system
}

Exercise 3: Stream Aggregator

Create a stream processing pipeline:

class StreamAggregator {
  // Your code here:
  // - Combine multiple streams
  // - Window functions
  // - Reduce operations
  // - Output stream
}

Key Takeaways 🎯

  1. SSE for server-to-client streaming - Simple, automatic reconnection
  2. WebSockets for bidirectional communication - Real-time, low latency
  3. Stream JSON for large datasets - Process without loading all in memory
  4. TypedFetch handles reconnection automatically - Built-in reliability
  5. Handle connection lifecycle properly - Online/offline, visibility
  6. Implement backpressure for fast streams - Don't overwhelm the client
  7. Monitor stream health - Track latency, errors, reconnections
  8. Choose the right protocol - SSE vs WebSocket vs polling

Common Pitfalls 🚨

  1. Not handling reconnection - Networks are unreliable
  2. Memory leaks from unclosed streams - Always clean up
  3. Overwhelming the UI thread - Process in batches
  4. Not handling offline states - Queue or pause appropriately
  5. Missing error boundaries - Streams can fail anytime
  6. Ignoring backpressure - Fast producers, slow consumers

What's Next?

You've mastered real-time streaming! But how do you make it all performant? In Chapter 10, we'll dive deep into performance optimization:

  • Request deduplication strategies
  • Connection pooling
  • Optimal caching configurations
  • Bundle size optimization
  • Memory management
  • Performance monitoring

Ready to make TypedFetch blazing fast? See you in Chapter 10!


Chapter Summary

  • Server-Sent Events provide one-way streaming from server to client
  • WebSockets enable bidirectional real-time communication
  • TypedFetch handles automatic reconnection and error recovery
  • Stream JSON allows processing large datasets without memory issues
  • Proper lifecycle management prevents memory leaks and connection issues
  • Backpressure handling prevents overwhelming slow consumers
  • Weather Buddy 9.0 shows live temperature updates and emergency alerts
  • Choose SSE for simplicity, WebSocket for interactivity

Next Chapter Preview: Performance Optimization - Make TypedFetch blazing fast with deduplication, connection pooling, and advanced caching strategies.