Skip to content

Commit b9abcc0

Browse files
committed
fixes
1 parent 3b01892 commit b9abcc0

File tree

6 files changed

+222
-7
lines changed

6 files changed

+222
-7
lines changed

lib/src/constants.dart

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,15 @@ class Timeouts {
1616
final Duration connection;
1717
final Duration debounce;
1818
final Duration publish;
19+
final Duration subscribe;
1920
final Duration peerConnection;
2021
final Duration iceRestart;
2122

2223
const Timeouts({
2324
required this.connection,
2425
required this.debounce,
2526
required this.publish,
27+
required this.subscribe,
2628
required this.peerConnection,
2729
required this.iceRestart,
2830
});
@@ -31,6 +33,7 @@ class Timeouts {
3133
connection: Duration(seconds: 10),
3234
debounce: Duration(milliseconds: 100),
3335
publish: Duration(seconds: 10),
36+
subscribe: Duration(seconds: 10),
3437
peerConnection: Duration(seconds: 10),
3538
iceRestart: Duration(seconds: 10),
3639
);

lib/src/core/engine.dart

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,7 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
102102
String? url;
103103
String? token;
104104

105-
late ConnectOptions connectOptions;
105+
ConnectOptions connectOptions;
106106
RoomOptions roomOptions;
107107
FastConnectOptions? fastConnectOptions;
108108

@@ -188,6 +188,7 @@ class Engine extends Disposable with EventsEmittable<EngineEvent> {
188188
}
189189

190190
Engine({
191+
required this.connectOptions,
191192
required this.roomOptions,
192193
SignalClient? signalClient,
193194
PeerConnectionCreate? peerConnectionCreate,
Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
// Copyright 2024 LiveKit, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
import 'package:flutter_webrtc/flutter_webrtc.dart' as rtc;
16+
import 'package:meta/meta.dart';
17+
18+
import '../events.dart';
19+
import '../logger.dart';
20+
import '../types/other.dart';
21+
22+
typedef PendingTrackSubscriber = Future<bool> Function(PendingTrack entry);
23+
typedef TrackExceptionEmitter = void Function(TrackSubscriptionExceptionEvent event);
24+
25+
/// Helper that queues subscriber tracks when participant metadata isn't ready yet.
26+
@internal
27+
class PendingTrackQueue {
28+
final int maxSize;
29+
final Duration ttl;
30+
final TrackExceptionEmitter emitException;
31+
32+
// keyed by participant sid
33+
final Map<String, List<PendingTrack>> _pending = {};
34+
35+
PendingTrackQueue({
36+
required this.ttl,
37+
required this.emitException,
38+
this.maxSize = 100,
39+
});
40+
41+
void enqueue({
42+
required rtc.MediaStreamTrack track,
43+
required rtc.MediaStream stream,
44+
required rtc.RTCRtpReceiver? receiver,
45+
required String participantSid,
46+
required String trackSid,
47+
required ConnectionState connectionState,
48+
}) {
49+
// If we're already disconnected, drop immediately.
50+
if (connectionState == ConnectionState.disconnected) {
51+
final event = TrackSubscriptionExceptionEvent(
52+
participant: null,
53+
sid: trackSid,
54+
reason: TrackSubscribeFailReason.noParticipantFound,
55+
);
56+
logger.warning('Dropping pending track while disconnected trackSid:$trackSid participantSid:$participantSid');
57+
emitException(event);
58+
return;
59+
}
60+
61+
_removeExpired();
62+
63+
final totalPending = _pending.values.fold<int>(0, (sum, list) => sum + list.length);
64+
if (totalPending >= maxSize) {
65+
final event = TrackSubscriptionExceptionEvent(
66+
participant: null,
67+
sid: trackSid,
68+
reason: TrackSubscribeFailReason.noParticipantFound,
69+
);
70+
logger.severe('Pending track queue full, dropping trackSid:$trackSid participantSid:$participantSid');
71+
emitException(event);
72+
return;
73+
}
74+
75+
final expiresAt = DateTime.now().add(ttl);
76+
logger.fine('Queueing pending trackSid:$trackSid participantSid:$participantSid until metadata is ready');
77+
final entry = PendingTrack(
78+
track: track,
79+
stream: stream,
80+
receiver: receiver,
81+
participantSid: participantSid,
82+
trackSid: trackSid,
83+
expiresAt: expiresAt,
84+
);
85+
final list = _pending.putIfAbsent(participantSid, () => []);
86+
list.add(entry);
87+
}
88+
89+
@internal
90+
Future<void> flush({
91+
required bool isConnected,
92+
String? participantSid,
93+
required PendingTrackSubscriber subscriber,
94+
}) async {
95+
_removeExpired();
96+
if (!isConnected) return;
97+
98+
final Iterable<PendingTrack> source = participantSid != null
99+
? List<PendingTrack>.from(_pending[participantSid] ?? const [])
100+
: _pending.values.expand((e) => e).toList();
101+
102+
for (final item in source) {
103+
final success = await subscriber(item);
104+
if (success) {
105+
_pending[item.participantSid]?.remove(item);
106+
}
107+
}
108+
}
109+
110+
void _removeExpired() {
111+
final now = DateTime.now();
112+
_pending.forEach((sid, list) {
113+
final expired = list.where((p) => p.expiresAt.isBefore(now)).toList();
114+
for (final item in expired) {
115+
list.remove(item);
116+
final event = TrackSubscriptionExceptionEvent(
117+
participant: null,
118+
sid: item.trackSid,
119+
reason: TrackSubscribeFailReason.noParticipantFound,
120+
);
121+
logger.warning('Pending track expired waiting for participant metadata: $event');
122+
emitException(event);
123+
}
124+
});
125+
}
126+
}
127+
128+
@internal
129+
class PendingTrack {
130+
final rtc.MediaStreamTrack track;
131+
final rtc.MediaStream stream;
132+
final rtc.RTCRtpReceiver? receiver;
133+
final String participantSid;
134+
final String trackSid;
135+
final DateTime expiresAt;
136+
137+
PendingTrack({
138+
required this.track,
139+
required this.stream,
140+
required this.receiver,
141+
required this.participantSid,
142+
required this.trackSid,
143+
required this.expiresAt,
144+
});
145+
}

lib/src/core/room.dart

Lines changed: 55 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ import '../types/rpc.dart';
5454
import '../types/transcription_segment.dart';
5555
import '../utils.dart' show unpackStreamId;
5656
import 'engine.dart';
57+
import 'pending_track_queue.dart';
5758

5859
/// Room is the primary construct for LiveKit conferences. It contains a
5960
/// group of [Participant]s, each publishing and subscribing to [Track]s.
@@ -135,6 +136,9 @@ class Room extends DisposableChangeNotifier with EventsEmittable<RoomEvent> {
135136
@internal
136137
late final PreConnectAudioBuffer preConnectAudioBuffer;
137138

139+
// Pending subscriber tracks keyed by participantSid, for tracks arriving before metadata or before the room connected.
140+
late final PendingTrackQueue _pendingTrackQueue;
141+
138142
// for testing
139143
@internal
140144
Map<String, RpcRequestHandler> get rpcHandlers => _rpcHandlers;
@@ -152,6 +156,7 @@ class Room extends DisposableChangeNotifier with EventsEmittable<RoomEvent> {
152156
Engine? engine,
153157
}) : engine = engine ??
154158
Engine(
159+
connectOptions: connectOptions,
155160
roomOptions: roomOptions,
156161
) {
157162
//
@@ -161,11 +166,21 @@ class Room extends DisposableChangeNotifier with EventsEmittable<RoomEvent> {
161166
_signalListener = this.engine.signalClient.createListener();
162167
_setUpSignalListeners();
163168

169+
_pendingTrackQueue = PendingTrackQueue(
170+
ttl: this.engine.connectOptions.timeouts.subscribe,
171+
emitException: (event) => events.emit(event),
172+
);
173+
164174
// Any event emitted will trigger ChangeNotifier
165175
events.listen((event) {
166176
logger.finer('[RoomEvent] $event, will notifyListeners()');
167177
notifyListeners();
168178
});
179+
events.on<InternalParticipantAvailableEvent>(
180+
(event) => _flushPendingTracks(participant: event.participant),
181+
);
182+
// Keep a connected flush as a fallback in case tracks arrive pre-connected but before metadata.
183+
events.on<RoomConnectedEvent>((event) => _flushPendingTracks());
169184

170185
_setupRpcListeners();
171186

@@ -596,12 +611,18 @@ class Room extends DisposableChangeNotifier with EventsEmittable<RoomEvent> {
596611
reason: TrackSubscribeFailReason.invalidServerResponse,
597612
);
598613
}
599-
if (participant == null) {
600-
throw TrackSubscriptionExceptionEvent(
601-
participant: participant,
602-
sid: trackSid,
603-
reason: TrackSubscribeFailReason.noParticipantFound,
614+
615+
final shouldDefer = connectionState != ConnectionState.connected || participant == null;
616+
if (shouldDefer) {
617+
_pendingTrackQueue.enqueue(
618+
track: event.track,
619+
stream: event.stream,
620+
receiver: event.receiver,
621+
participantSid: participantSid,
622+
trackSid: trackSid,
623+
connectionState: connectionState,
604624
);
625+
return;
605626
}
606627
await participant.addSubscribedMediaTrack(
607628
event.track,
@@ -678,6 +699,7 @@ class Room extends DisposableChangeNotifier with EventsEmittable<RoomEvent> {
678699

679700
_remoteParticipants[result.participant.identity] = result.participant;
680701
_sidToIdentity[result.participant.sid] = result.participant.identity;
702+
events.emit(InternalParticipantAvailableEvent(participant: result.participant));
681703
return result;
682704
}
683705

@@ -722,10 +744,12 @@ class Room extends DisposableChangeNotifier with EventsEmittable<RoomEvent> {
722744
}
723745
}
724746
_sidToIdentity[info.sid] = info.identity;
747+
events.emit(InternalParticipantAvailableEvent(participant: result.participant));
725748
} else {
726749
final wasUpdated = await result.participant.updateFromInfo(info);
727750
if (wasUpdated) {
728751
_sidToIdentity[info.sid] = info.identity;
752+
events.emit(InternalParticipantAvailableEvent(participant: result.participant));
729753
}
730754
}
731755
}
@@ -760,6 +784,32 @@ class Room extends DisposableChangeNotifier with EventsEmittable<RoomEvent> {
760784
emitWhenConnected(ActiveSpeakersChangedEvent(speakers: activeSpeakers));
761785
}
762786

787+
Future<void> _flushPendingTracks({RemoteParticipant? participant}) => _pendingTrackQueue.flush(
788+
isConnected: connectionState == ConnectionState.connected,
789+
participantSid: participant?.sid,
790+
subscriber: (pending) async {
791+
final target = participant ?? _getRemoteParticipantBySid(pending.participantSid);
792+
if (target == null) return false;
793+
try {
794+
await target.addSubscribedMediaTrack(
795+
pending.track,
796+
pending.stream,
797+
pending.trackSid,
798+
receiver: pending.receiver,
799+
audioOutputOptions: roomOptions.defaultAudioOutputOptions,
800+
);
801+
return true;
802+
} on TrackSubscriptionExceptionEvent catch (event) {
803+
logger.severe('Track subscription failed during flush: ${event}');
804+
events.emit(event);
805+
return true;
806+
} catch (exception) {
807+
logger.warning('Unknown exception during pending track flush: ${exception}');
808+
return false;
809+
}
810+
},
811+
);
812+
763813
// from data channel
764814
// updates are sent only when there's a change to speaker ordering
765815
void _onEngineActiveSpeakersUpdateEvent(List<lk_models.SpeakerInfo> speakers) {

lib/src/internal/events.dart

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import 'package:connectivity_plus/connectivity_plus.dart';
1616
import 'package:flutter_webrtc/flutter_webrtc.dart' as rtc;
1717
import 'package:meta/meta.dart';
1818

19+
import 'package:livekit_client/src/participant/remote.dart' show RemoteParticipant;
1920
import '../e2ee/options.dart';
2021
import '../events.dart';
2122
import '../proto/livekit_models.pb.dart' as lk_models;
@@ -459,6 +460,20 @@ class EngineActiveSpeakersUpdateEvent with EngineEvent, InternalEvent {
459460
String toString() => '${runtimeType}(speakers: ${speakers})';
460461
}
461462

463+
/// Internal event fired when a participant becomes available (added to _sidToIdentity map).
464+
/// Used by EngineTrackAddedEvent handler to wait/flush tracks that arrived before metadata.
465+
@internal
466+
class InternalParticipantAvailableEvent with RoomEvent, InternalEvent {
467+
final RemoteParticipant participant;
468+
469+
const InternalParticipantAvailableEvent({required this.participant});
470+
471+
String get participantSid => participant.sid;
472+
473+
@override
474+
String toString() => '${runtimeType}(participant: ${participant.sid})';
475+
}
476+
462477
@internal
463478
class SignalLeaveEvent with SignalEvent, InternalEvent {
464479
bool get canReconnect => request.canReconnect;

test/mock/e2e_container.dart

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,10 @@ class E2EContainer {
2929
wsConnector = MockWebSocketConnector();
3030
client = SignalClient(wsConnector.connect);
3131
engine = Engine(
32+
connectOptions: const ConnectOptions(),
33+
roomOptions: const RoomOptions(),
3234
signalClient: client,
3335
peerConnectionCreate: MockPeerConnection.create,
34-
roomOptions: const RoomOptions(),
3536
);
3637
room = Room(engine: engine);
3738
}

0 commit comments

Comments
 (0)