Skip to content

Commit 3544bfa

Browse files
authored
fix: handle VAD stream closed error during handover and upgrade sharp (#997)
1 parent ef35009 commit 3544bfa

File tree

6 files changed

+193
-145
lines changed

6 files changed

+193
-145
lines changed

.changeset/wet-carpets-clean.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
---
2+
'@livekit/agents-plugins-test': patch
3+
'@livekit/agents': patch
4+
---
5+
6+
fix: handle VAD stream closed error during agent handover
7+
8+
- Fixed a race condition in `StreamAdapter` where `endInput()` could be called on an already-closed VAD stream during agent handover, causing an unrecoverable `stt_error`. This affected non-streaming STTs (like OpenAI STT) that use the StreamAdapter wrapper.
9+
- Added `isStreamClosedError()` utility function for consistent error handling.
10+
- Upgraded sharp from 0.34.3 to 0.34.5 to fix libvips version conflict (1.2.0 vs 1.2.4) that caused flaky agent behavior and ObjC class collision warnings on macOS.
11+
- Fixed pre-existing build error in test plugin (Int16Array to ArrayBuffer conversion).

agents/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@
7373
"pidusage": "^4.0.1",
7474
"pino": "^8.19.0",
7575
"pino-pretty": "^11.0.0",
76-
"sharp": "0.34.3",
76+
"sharp": "0.34.5",
7777
"uuid": "^11.1.0",
7878
"ws": "^8.18.0",
7979
"zod-to-json-schema": "^3.24.6"

agents/src/stt/stream_adapter.ts

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import type { AudioFrame } from '@livekit/rtc-node';
55
import { log } from '../log.js';
66
import type { APIConnectOptions } from '../types.js';
7+
import { isStreamClosedError } from '../utils.js';
78
import type { VAD, VADStream } from '../vad.js';
89
import { VADEventType } from '../vad.js';
910
import type { SpeechEvent } from './stt.js';
@@ -68,7 +69,17 @@ export class StreamAdapterWrapper extends SpeechStream {
6869
this.#vadStream.pushFrame(input);
6970
}
7071
}
71-
this.#vadStream.endInput();
72+
73+
// Guard against calling endInput() on already-closed stream
74+
// This happens during handover when close() is called while forwardInput is running
75+
try {
76+
this.#vadStream.endInput();
77+
} catch (e) {
78+
if (isStreamClosedError(e)) {
79+
return;
80+
}
81+
throw e;
82+
}
7283
};
7384

7485
const recognize = async () => {

agents/src/utils.ts

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -675,6 +675,20 @@ export class InvalidErrorType extends Error {
675675
}
676676
}
677677

678+
/**
679+
* Check if an error is a stream closed error that can be safely ignored during cleanup.
680+
* This happens during handover/cleanup when close() is called while operations are still running.
681+
*
682+
* @param error - The error to check.
683+
* @returns True if the error is a stream closed error.
684+
*/
685+
export function isStreamClosedError(error: unknown): boolean {
686+
return (
687+
error instanceof Error &&
688+
(error.message === 'Stream is closed' || error.message === 'Input is closed')
689+
);
690+
}
691+
678692
/**
679693
* In JS an error can be any arbitrary value.
680694
* This function converts an unknown error to an Error and stores the original value in the error object.

plugins/test/src/stt.ts

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,14 @@ const makeTestSpeech = (targetSampleRate: number, chunkDuration?: number): Audio
140140

141141
const chunkSize = (targetSampleRate * chunkDuration) / 1000;
142142
const bstream = new AudioByteStream(targetSampleRate, channels, chunkSize);
143-
frames = bstream.write(merged.data);
143+
144+
// Convert Int16Array to ArrayBuffer
145+
const arrayBuffer = merged.data.buffer.slice(
146+
merged.data.byteOffset,
147+
merged.data.byteOffset + merged.data.byteLength,
148+
) as ArrayBuffer;
149+
150+
frames = bstream.write(arrayBuffer);
144151
frames.push(...bstream.flush());
145152
return frames;
146153
};

0 commit comments

Comments
 (0)