Utils
Stream
11/14/2025, 3:30:43 AM modified by MarvinA library to stream functions
Installation
Loading...
Preview
Nothing to show.
import type { z } from 'zod';export const textDecoder = new TextDecoder();export const textEncoder = new TextEncoder();/** * Generate a stream of messages from a SSE readable stream * @param stream - The readable stream to generate messages from * @returns A generator of messages */export async function* SSEMessageGenerator(stream: ReadableStream<string | ArrayBuffer> | null) { if (!stream) { return; } let rest_str = ''; const reader = stream.getReader(); try { while (true) { const { done, value } = await reader.read(); if (done) { // 处理流结束时的剩余数据 if (rest_str.trim()) { try { yield rest_str; } catch (e) { console.warn('Failed to parse remaining data:', e); } } break; } const sse_chunk = value instanceof ArrayBuffer ? textDecoder.decode(value) : value; for (const line of sse_chunk.split(/\n+/)) { const json_str = line.replace(/data:\s*/, '').trim(); if (json_str.length > 0) { try { const full_str = rest_str + json_str; rest_str = ''; yield full_str; // 在生成器内yield消息 } catch (e) { rest_str += json_str; console.warn(e); } } } } } finally { reader.releaseLock(); }}export function MessageParser<T extends z.ZodSchema>(message: string, schema: T): z.infer<T> | null { try { const json = JSON.parse(message); const result = schema.safeParse(json); if (!result.success) { throw new Error(result.error.message); } return result.data; } catch (_e) { return null; }}// test the ParseReadableStream functionimport { expect, test } from 'vitest';import z from 'zod';import { MessageParser, SSEMessageGenerator } from '.';test('SSEMessageGenerator', async () => { const readableStream = new ReadableStream({ start(controller) { controller.enqueue('data: {"message": "Hello, "}\n\ndata: {"message": "world!"}\n\n'); controller.close(); }, }); const generator = SSEMessageGenerator(readableStream); const messages = []; for await (const message of generator) { messages.push(message); } expect(messages).toEqual(['{"message": "Hello, "}', '{"message": "world!"}']);});test('MessageParser', async () => { const readableStream = new ReadableStream({ start(controller) { controller.enqueue('data: {"message": "Hello, "}\n\ndata: {"message": "world!"}\n\n'); controller.close(); }, }); const generator = SSEMessageGenerator(readableStream); const results = []; const schema = z.object({ message: z.string() }); for await (const message of generator) { const result = MessageParser(message, schema); results.push(result); } expect(results).toEqual([{ message: 'Hello, ' }, { message: 'world!' }]);});Git Commit History(1 commits)
feat: build
Marvin
11月14日 03:30
e50053c2