All files / web/src/lib/worksheet-parsing sse-parser.ts

99.4% Statements 335/337
78.31% Branches 65/83
100% Functions 3/3
99.4% Lines 335/337

Press n or j to go to the next uncovered block, b, p or k for the previous block.

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 3381x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 20x 20x 20x 20x 20x 20x 20x 1x 1x 19x 19x 19x 19x 19x 19x 20x 37x 37x 1x 1x 1x 36x 36x 36x 18x 18x 18x 18x 18x 37x 37x 37x 44x 44x 21x 21x 21x 23x 23x 23x 23x 23x 23x 23x 1x 1x 1x 22x 22x 22x 23x 23x 23x 23x 1x 1x 22x 22x 22x 44x 18x 20x 19x 19x 20x 1x 1x 1x 1x 1x 1x 1x 1x 21x 21x 21x 21x 21x 3x 3x 3x 3x 21x 21x 21x 2x 2x 21x 21x 21x 3x 3x 3x 3x 3x 3x 21x 21x 21x 3x 3x 3x 3x 3x 21x 21x 21x 1x 1x 1x 1x 1x 1x 1x 21x 21x 21x 1x 1x 1x 1x 1x 1x 1x 1x 21x 21x 21x 1x 1x 1x 1x 1x 21x 21x 21x 4x 4x 4x 4x 4x 4x 4x 21x 21x 21x 1x 1x 21x 21x 21x 2x 2x 21x 21x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 1x 14x 14x 14x 14x 14x 13x 13x 13x 13x 13x 13x 13x 14x 1557x 1557x 1557x 1557x 4x 4x 4x 1553x 1557x 4x 4x 4x 1549x 1549x 1557x 187x 187x 187x 1362x 1362x 1362x 624x 624x 1557x 31x 15x 15x 31x 1557x 30x 30x 14x 14x 14x 14x 14x 14x 14x 14x 13x 12x 14x 11x 11x 11x 11x 11x 14x     14x 593x 12x 12x 12x 1557x 13x 13x 13x  
/**
 * Shared SSE (Server-Sent Events) Parser
 *
 * Provides a unified utility for parsing SSE streams from worksheet parsing endpoints.
 * Used by both initial parse and selective re-parse operations.
 */
 
import type { WorksheetParsingResult, BoundingBox } from './schemas'
import type { ParsingStats, CompletedProblem } from './state-machine'
 
// ============================================================================
// Types
// ============================================================================
 
/**
 * Callbacks for SSE stream events
 *
 * Each callback is optional - only provide the ones you need
 */
export interface SSECallbacks {
  /** Called when the response is created (initial connection established) */
  onStarted?: (responseId: string) => void
 
  /** Called with reasoning text (model's thinking process) */
  onReasoning?: (text: string, isDelta: boolean, summaryIndex?: number) => void
 
  /** Called with output deltas (partial JSON being generated) */
  onOutputDelta?: (text: string, outputIndex?: number) => void
 
  /** Called when a problem parsing starts (re-parse only) */
  onProblemStart?: (
    problemIndex: number,
    problemNumber: number,
    currentIndex: number,
    totalProblems: number
  ) => void
 
  /** Called when a problem is fully parsed */
  onProblemComplete?: (
    problemIndex: number,
    problemNumber: number,
    result: unknown,
    currentIndex: number,
    totalProblems: number
  ) => void
 
  /** Called when a specific problem fails (re-parse only) */
  onProblemError?: (problemIndex: number, message: string) => void
 
  /** Called when the entire operation completes successfully */
  onComplete?: (result: WorksheetParsingResult, stats?: ParsingStats, status?: string) => void
 
  /** Called when the operation fails */
  onError?: (message: string, code?: string) => void
 
  /** Called when the operation is cancelled */
  onCancelled?: () => void
 
  /** Called with progress messages */
  onProgress?: (message: string) => void
}
 
/**
 * Parse result event from SSE stream
 */
interface SSEEvent {
  type: string
  [key: string]: unknown
}
 
// ============================================================================
// Main Parser Function
// ============================================================================
 
/**
 * Parse an SSE stream from a worksheet parsing endpoint
 *
 * Handles both:
 * - Initial parse streams (/parse/stream)
 * - Selective re-parse streams (/parse-selected/stream)
 *
 * @param response - The fetch Response with SSE body
 * @param callbacks - Event callbacks to invoke
 * @param signal - Optional AbortSignal for cancellation
 *
 * @example
 * ```typescript
 * const response = await fetch('/api/.../parse/stream', { method: 'POST', signal })
 * await parseSSEStream(response, {
 *   onReasoning: (text) => setReasoningText(prev => prev + text),
 *   onComplete: (result) => dispatch({ type: 'PARSE_COMPLETE', result }),
 *   onError: (message) => dispatch({ type: 'PARSE_FAILED', error: message }),
 * }, signal)
 * ```
 */
export async function parseSSEStream(
  response: Response,
  callbacks: SSECallbacks,
  signal?: AbortSignal
): Promise<void> {
  const reader = response.body?.getReader()
  if (!reader) {
    throw new Error('No response body')
  }
 
  const decoder = new TextDecoder()
  let buffer = ''
  let currentEvent: string | null = null
 
  try {
    while (true) {
      // Check for cancellation
      if (signal?.aborted) {
        callbacks.onCancelled?.()
        break
      }
 
      const { done, value } = await reader.read()
      if (done) break
 
      buffer += decoder.decode(value, { stream: true })
 
      // Parse SSE events from buffer
      const lines = buffer.split('\n')
      buffer = lines.pop() ?? '' // Keep incomplete line in buffer
 
      for (const line of lines) {
        // Event type line
        if (line.startsWith('event: ')) {
          currentEvent = line.slice(7).trim()
          continue
        }
 
        // Data line
        if (line.startsWith('data: ')) {
          const data = line.slice(6).trim()
 
          // Skip [DONE] marker
          if (data === '[DONE]') {
            currentEvent = null
            continue
          }
 
          try {
            const event = JSON.parse(data) as SSEEvent
            const eventType = currentEvent ?? event.type
 
            handleSSEEvent(eventType, event, callbacks)
          } catch {
            // Ignore malformed JSON
          }
 
          currentEvent = null
        }
      }
    }
  } finally {
    reader.releaseLock()
  }
}
 
// ============================================================================
// Event Handlers
// ============================================================================
 
/**
 * Handle a single SSE event by invoking the appropriate callback
 */
function handleSSEEvent(eventType: string, event: SSEEvent, callbacks: SSECallbacks): void {
  switch (eventType) {
    // Connection established
    case 'started':
    case 'response.created': {
      const response = event.response as { id?: string } | undefined
      callbacks.onStarted?.(String(event.responseId ?? response?.id ?? ''))
      break
    }
 
    // Progress updates
    case 'progress':
      callbacks.onProgress?.(String(event.message ?? ''))
      break
 
    // Reasoning text
    case 'reasoning':
      callbacks.onReasoning?.(
        String(event.text ?? ''),
        Boolean(event.isDelta),
        typeof event.summaryIndex === 'number' ? event.summaryIndex : undefined
      )
      break
 
    // Output delta (partial JSON)
    case 'output_delta':
      callbacks.onOutputDelta?.(
        String(event.text ?? event.delta ?? ''),
        typeof event.outputIndex === 'number' ? event.outputIndex : undefined
      )
      break
 
    // Problem parsing started (re-parse only)
    case 'problem_start':
      callbacks.onProblemStart?.(
        Number(event.problemIndex ?? 0),
        Number(event.problemNumber ?? 0),
        Number(event.currentIndex ?? 0),
        Number(event.totalProblems ?? 0)
      )
      break
 
    // Problem parsing completed
    case 'problem_complete':
      callbacks.onProblemComplete?.(
        Number(event.problemIndex ?? 0),
        Number(event.problemNumber ?? 0),
        event.result,
        Number(event.currentIndex ?? 0),
        Number(event.totalProblems ?? 0)
      )
      break
 
    // Individual problem error (re-parse only)
    case 'problem_error':
      callbacks.onProblemError?.(
        Number(event.problemIndex ?? 0),
        String(event.message ?? 'Unknown error')
      )
      break
 
    // Operation completed successfully
    case 'complete': {
      // Handle both initial parse and re-parse complete events
      const result = (event.result ?? event.updatedResult) as WorksheetParsingResult
      const stats = event.stats as ParsingStats | undefined
      const status = String(event.status ?? '')
      callbacks.onComplete?.(result, stats, status)
      break
    }
 
    // Operation cancelled
    case 'cancelled':
      callbacks.onCancelled?.()
      break
 
    // Error occurred
    case 'error':
      callbacks.onError?.(String(event.message ?? 'Unknown error'), String(event.code ?? ''))
      break
  }
}
 
// ============================================================================
// Helper: Extract Completed Problems from Partial JSON
// ============================================================================
 
/**
 * Extract completed problems from partial JSON output for progressive highlighting
 *
 * This parses incomplete JSON to find fully-formed problem objects within
 * the "problems" array. Used to highlight problems as they're streamed.
 *
 * @param partialJson - Incomplete JSON string from streaming output
 * @returns Array of completed problems with their bounding boxes
 */
export function extractCompletedProblemsFromPartialJson(partialJson: string): CompletedProblem[] {
  const problems: CompletedProblem[] = []
 
  // Find the problems array start
  const problemsMatch = partialJson.match(/"problems"\s*:\s*\[/)
  if (!problemsMatch) return problems
 
  const startIndex = problemsMatch.index! + problemsMatch[0].length
  let depth = 1 // We're inside the array
  let objectStart = -1
  let inString = false
  let escaped = false
 
  for (let i = startIndex; i < partialJson.length; i++) {
    const char = partialJson[i]
 
    // Handle escape sequences in strings
    if (escaped) {
      escaped = false
      continue
    }
 
    if (char === '\\') {
      escaped = true
      continue
    }
 
    // Handle string boundaries
    if (char === '"') {
      inString = !inString
      continue
    }
 
    // Skip characters inside strings
    if (inString) continue
 
    // Track brace depth
    if (char === '{') {
      if (depth === 1 && objectStart === -1) {
        objectStart = i
      }
      depth++
    } else if (char === '}') {
      depth--
      if (depth === 1 && objectStart !== -1) {
        // We've found a complete object
        const objectStr = partialJson.slice(objectStart, i + 1)
        objectStart = -1
 
        try {
          const obj = JSON.parse(objectStr)
          if (
            typeof obj.problemNumber === 'number' &&
            obj.problemBoundingBox &&
            typeof obj.problemBoundingBox.x === 'number'
          ) {
            problems.push({
              problemNumber: obj.problemNumber,
              problemBoundingBox: obj.problemBoundingBox as BoundingBox,
            })
          }
        } catch {
          // Not a valid JSON object yet
        }
      }
    } else if (char === ']' && depth === 1) {
      // End of problems array
      break
    }
  }
 
  return problems
}