forked from open-webui/open-webui
		
	feat: switch OpenAI SSE parsing to eventsource-parser
This commit is contained in:
		
							parent
							
								
									3f0fae1d10
								
							
						
					
					
						commit
						e8bf596959
					
				
					 5 changed files with 40 additions and 44 deletions
				
			
		
							
								
								
									
										9
									
								
								package-lock.json
									
										
									
										generated
									
									
									
								
							
							
						
						
									
										9
									
								
								package-lock.json
									
										
									
										generated
									
									
									
								
							|  | @ -12,6 +12,7 @@ | ||||||
| 				"async": "^3.2.5", | 				"async": "^3.2.5", | ||||||
| 				"bits-ui": "^0.19.7", | 				"bits-ui": "^0.19.7", | ||||||
| 				"dayjs": "^1.11.10", | 				"dayjs": "^1.11.10", | ||||||
|  | 				"eventsource-parser": "^1.1.2", | ||||||
| 				"file-saver": "^2.0.5", | 				"file-saver": "^2.0.5", | ||||||
| 				"highlight.js": "^11.9.0", | 				"highlight.js": "^11.9.0", | ||||||
| 				"i18next": "^23.10.0", | 				"i18next": "^23.10.0", | ||||||
|  | @ -3167,6 +3168,14 @@ | ||||||
| 			"integrity": "sha512-tYUSVOGeQPKt/eC1ABfhHy5Xd96N3oIijJvN3O9+TsC28T5V9yX9oEfEK5faP0EFSNVOG97qtAS68GBrQB2hDg==", | 			"integrity": "sha512-tYUSVOGeQPKt/eC1ABfhHy5Xd96N3oIijJvN3O9+TsC28T5V9yX9oEfEK5faP0EFSNVOG97qtAS68GBrQB2hDg==", | ||||||
| 			"dev": true | 			"dev": true | ||||||
| 		}, | 		}, | ||||||
|  | 		"node_modules/eventsource-parser": { | ||||||
|  | 			"version": "1.1.2", | ||||||
|  | 			"resolved": "https://registry.npmjs.org/eventsource-parser/-/eventsource-parser-1.1.2.tgz", | ||||||
|  | 			"integrity": "sha512-v0eOBUbiaFojBu2s2NPBfYUoRR9GjcDNvCXVaqEf5vVfpIAh9f8RCo4vXTP8c63QRKCFwoLpMpTdPwwhEKVgzA==", | ||||||
|  | 			"engines": { | ||||||
|  | 				"node": ">=14.18" | ||||||
|  | 			} | ||||||
|  | 		}, | ||||||
| 		"node_modules/execa": { | 		"node_modules/execa": { | ||||||
| 			"version": "4.1.0", | 			"version": "4.1.0", | ||||||
| 			"resolved": "https://registry.npmjs.org/execa/-/execa-4.1.0.tgz", | 			"resolved": "https://registry.npmjs.org/execa/-/execa-4.1.0.tgz", | ||||||
|  |  | ||||||
|  | @ -49,6 +49,7 @@ | ||||||
| 		"async": "^3.2.5", | 		"async": "^3.2.5", | ||||||
| 		"bits-ui": "^0.19.7", | 		"bits-ui": "^0.19.7", | ||||||
| 		"dayjs": "^1.11.10", | 		"dayjs": "^1.11.10", | ||||||
|  | 		"eventsource-parser": "^1.1.2", | ||||||
| 		"file-saver": "^2.0.5", | 		"file-saver": "^2.0.5", | ||||||
| 		"highlight.js": "^11.9.0", | 		"highlight.js": "^11.9.0", | ||||||
| 		"i18next": "^23.10.0", | 		"i18next": "^23.10.0", | ||||||
|  |  | ||||||
|  | @ -1,15 +1,22 @@ | ||||||
|  | import { EventSourceParserStream } from 'eventsource-parser/stream'; | ||||||
|  | import type { ParsedEvent } from 'eventsource-parser'; | ||||||
|  | 
 | ||||||
| type TextStreamUpdate = { | type TextStreamUpdate = { | ||||||
| 	done: boolean; | 	done: boolean; | ||||||
| 	value: string; | 	value: string; | ||||||
| }; | }; | ||||||
| 
 | 
 | ||||||
| // createOpenAITextStream takes a ReadableStreamDefaultReader from an SSE response,
 | // createOpenAITextStream takes a responseBody with a SSE response,
 | ||||||
| // and returns an async generator that emits delta updates with large deltas chunked into random sized chunks
 | // and returns an async generator that emits delta updates with large deltas chunked into random sized chunks
 | ||||||
| export async function createOpenAITextStream( | export async function createOpenAITextStream( | ||||||
| 	messageStream: ReadableStreamDefaultReader, | 	responseBody: ReadableStream<Uint8Array>, | ||||||
| 	splitLargeDeltas: boolean | 	splitLargeDeltas: boolean | ||||||
| ): Promise<AsyncGenerator<TextStreamUpdate>> { | ): Promise<AsyncGenerator<TextStreamUpdate>> { | ||||||
| 	let iterator = openAIStreamToIterator(messageStream); | 	const eventStream = responseBody | ||||||
|  | 		.pipeThrough(new TextDecoderStream()) | ||||||
|  | 		.pipeThrough(new EventSourceParserStream()) | ||||||
|  | 		.getReader(); | ||||||
|  | 	let iterator = openAIStreamToIterator(eventStream); | ||||||
| 	if (splitLargeDeltas) { | 	if (splitLargeDeltas) { | ||||||
| 		iterator = streamLargeDeltasAsRandomChunks(iterator); | 		iterator = streamLargeDeltasAsRandomChunks(iterator); | ||||||
| 	} | 	} | ||||||
|  | @ -17,7 +24,7 @@ export async function createOpenAITextStream( | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| async function* openAIStreamToIterator( | async function* openAIStreamToIterator( | ||||||
| 	reader: ReadableStreamDefaultReader | 	reader: ReadableStreamDefaultReader<ParsedEvent> | ||||||
| ): AsyncGenerator<TextStreamUpdate> { | ): AsyncGenerator<TextStreamUpdate> { | ||||||
| 	while (true) { | 	while (true) { | ||||||
| 		const { value, done } = await reader.read(); | 		const { value, done } = await reader.read(); | ||||||
|  | @ -25,31 +32,22 @@ async function* openAIStreamToIterator( | ||||||
| 			yield { done: true, value: '' }; | 			yield { done: true, value: '' }; | ||||||
| 			break; | 			break; | ||||||
| 		} | 		} | ||||||
| 		const lines = value.split('\n'); | 		if (!value) { | ||||||
| 		for (let line of lines) { | 			continue; | ||||||
| 			if (line.endsWith('\r')) { | 		} | ||||||
| 				// Remove trailing \r
 | 		const data = value.data; | ||||||
| 				line = line.slice(0, -1); | 		if (data.startsWith('[DONE]')) { | ||||||
| 			} | 			yield { done: true, value: '' }; | ||||||
| 			if (line !== '') { | 			break; | ||||||
| 				console.log(line); | 		} | ||||||
| 				if (line === 'data: [DONE]') { |  | ||||||
| 					yield { done: true, value: '' }; |  | ||||||
| 				} else if (line.startsWith(':')) { |  | ||||||
| 					// Events starting with : are comments https://developer.mozilla.org/en-US/docs/Web/API/Server-sent_events/Using_server-sent_events#event_stream_format
 |  | ||||||
| 					// OpenRouter sends heartbeats like ": OPENROUTER PROCESSING"
 |  | ||||||
| 					continue; |  | ||||||
| 				} else { |  | ||||||
| 					try { |  | ||||||
| 						const data = JSON.parse(line.replace(/^data: /, '')); |  | ||||||
| 						console.log(data); |  | ||||||
| 
 | 
 | ||||||
| 						yield { done: false, value: data.choices?.[0]?.delta?.content ?? '' }; | 		try { | ||||||
| 					} catch (e) { | 			const parsedData = JSON.parse(data); | ||||||
| 						console.error('Error extracting delta from SSE event:', e); | 			console.log(parsedData); | ||||||
| 					} | 
 | ||||||
| 				} | 			yield { done: false, value: parsedData.choices?.[0]?.delta?.content ?? '' }; | ||||||
| 			} | 		} catch (e) { | ||||||
|  | 			console.error('Error extracting delta from SSE event:', e); | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
|  | @ -605,14 +605,8 @@ | ||||||
| 
 | 
 | ||||||
| 		scrollToBottom(); | 		scrollToBottom(); | ||||||
| 
 | 
 | ||||||
| 		if (res && res.ok) { | 		if (res && res.ok && res.body) { | ||||||
| 			const reader = res.body | 			const textStream = await createOpenAITextStream(res.body, $settings.splitLargeChunks); | ||||||
| 				.pipeThrough(new TextDecoderStream()) |  | ||||||
| 				.pipeThrough(splitStream('\n')) |  | ||||||
| 				.getReader(); |  | ||||||
| 
 |  | ||||||
| 			const textStream = await createOpenAITextStream(reader, $settings.splitLargeChunks); |  | ||||||
| 			console.log(textStream); |  | ||||||
| 
 | 
 | ||||||
| 			for await (const update of textStream) { | 			for await (const update of textStream) { | ||||||
| 				const { value, done } = update; | 				const { value, done } = update; | ||||||
|  |  | ||||||
|  | @ -617,14 +617,8 @@ | ||||||
| 
 | 
 | ||||||
| 		scrollToBottom(); | 		scrollToBottom(); | ||||||
| 
 | 
 | ||||||
| 		if (res && res.ok) { | 		if (res && res.ok && res.body) { | ||||||
| 			const reader = res.body | 			const textStream = await createOpenAITextStream(res.body, $settings.splitLargeChunks); | ||||||
| 				.pipeThrough(new TextDecoderStream()) |  | ||||||
| 				.pipeThrough(splitStream('\n')) |  | ||||||
| 				.getReader(); |  | ||||||
| 
 |  | ||||||
| 			const textStream = await createOpenAITextStream(reader, $settings.splitLargeChunks); |  | ||||||
| 			console.log(textStream); |  | ||||||
| 
 | 
 | ||||||
| 			for await (const update of textStream) { | 			for await (const update of textStream) { | ||||||
| 				const { value, done } = update; | 				const { value, done } = update; | ||||||
|  |  | ||||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue
	
	 Jun Siang Cheah
						Jun Siang Cheah