Skip to content

Commit 615f0f3

Browse files
authored
Organize participant map (#931)
refactor: organize remote participant map collection
1 parent 24c2005 commit 615f0f3

File tree

2 files changed

+85
-50
lines changed

2 files changed

+85
-50
lines changed
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
// Copyright 2024 LiveKit, Inc.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
import 'dart:collection';
16+
17+
import '../participant/participant.dart';
18+
19+
/// Small helper to keep participant lookups by identity and sid in sync.
20+
class ParticipantCollection<T extends Participant> extends IterableBase<T> {
21+
final Map<String, T> _byIdentity = {};
22+
final Map<String, T> _bySid = {};
23+
24+
// Read-only views of the collections
25+
UnmodifiableMapView<String, T> get byIdentity => UnmodifiableMapView(_byIdentity);
26+
UnmodifiableMapView<String, T> get bySid => UnmodifiableMapView(_bySid);
27+
28+
// Update methods
29+
void set(T participant) {
30+
_byIdentity[participant.identity] = participant;
31+
_bySid[participant.sid] = participant;
32+
}
33+
34+
// Contains methods
35+
bool containsIdentity(String identity) => _byIdentity.containsKey(identity);
36+
bool containsSid(String sid) => _bySid.containsKey(sid);
37+
38+
void clear() {
39+
_byIdentity.clear();
40+
_bySid.clear();
41+
}
42+
43+
T? removeByIdentity(String identity) {
44+
final participant = _byIdentity.remove(identity);
45+
if (participant == null) return null;
46+
return _bySid.remove(participant.sid);
47+
}
48+
49+
@override
50+
Iterator<T> get iterator => _byIdentity.values.iterator;
51+
}

lib/src/core/room.dart

Lines changed: 34 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ import '../types/rpc.dart';
5454
import '../types/transcription_segment.dart';
5555
import '../utils.dart' show unpackStreamId;
5656
import 'engine.dart';
57+
import 'participant_collection.dart';
5758
import 'pending_track_queue.dart';
5859

5960
/// Room is the primary construct for LiveKit conferences. It contains a
@@ -72,10 +73,9 @@ class Room extends DisposableChangeNotifier with EventsEmittable<RoomEvent> {
7273
ConnectOptions get connectOptions => engine.connectOptions;
7374
RoomOptions get roomOptions => engine.roomOptions;
7475

75-
/// map of identity: [[RemoteParticipant]]
76-
UnmodifiableMapView<String, RemoteParticipant> get remoteParticipants => UnmodifiableMapView(_remoteParticipants);
77-
final _remoteParticipants = <String, RemoteParticipant>{};
78-
final Map<String, String> _sidToIdentity = <String, String>{};
76+
final ParticipantCollection<RemoteParticipant> _remoteParticipants = ParticipantCollection();
77+
UnmodifiableMapView<String, RemoteParticipant> get remoteParticipants =>
78+
UnmodifiableMapView(_remoteParticipants.byIdentity);
7979

8080
/// the current participant
8181
LocalParticipant? get localParticipant => _localParticipant;
@@ -373,7 +373,7 @@ class Room extends DisposableChangeNotifier with EventsEmittable<RoomEvent> {
373373
'allowed:${event.allowed}');
374374

375375
// find participant
376-
final participant = _getRemoteParticipantBySid(event.participantSid);
376+
final participant = _remoteParticipants.bySid[event.participantSid];
377377
if (participant == null) {
378378
return;
379379
}
@@ -519,30 +519,28 @@ class Room extends DisposableChangeNotifier with EventsEmittable<RoomEvent> {
519519
..on<EngineFullRestartingEvent>((event) async {
520520
events.emit(const RoomReconnectingEvent());
521521

522-
// clean up RemoteParticipants
523-
final copy = _remoteParticipants.values.toList();
524-
525-
_remoteParticipants.clear();
526-
_sidToIdentity.clear();
527-
_activeSpeakers.clear();
528522
// reset params
529523
_name = null;
530524
_metadata = null;
531525
_serverVersion = null;
532526
_serverRegion = null;
533527

534-
for (final participant in copy) {
528+
final participants = _remoteParticipants.toList();
529+
_remoteParticipants.clear();
530+
_activeSpeakers.clear();
531+
for (final participant in participants) {
535532
events.emit(ParticipantDisconnectedEvent(participant: participant));
536533
await participant.removeAllPublishedTracks(notify: false);
537534
await participant.dispose();
538535
}
536+
539537
notifyListeners();
540538
})
541539
..on<EngineRestartedEvent>((event) async {
542540
// re-publish all tracks
543541
await localParticipant?.rePublishAllTracks();
544542

545-
for (var participant in remoteParticipants.values) {
543+
for (var participant in _remoteParticipants) {
546544
for (var pub in participant.trackPublications.values) {
547545
if (pub.subscribed) {
548546
pub.sendUpdateTrackSettings();
@@ -601,7 +599,7 @@ class Room extends DisposableChangeNotifier with EventsEmittable<RoomEvent> {
601599
trackSid = streamId;
602600
}
603601

604-
final participant = _getRemoteParticipantBySid(participantSid);
602+
final participant = _remoteParticipants.bySid[participantSid];
605603
try {
606604
if (trackSid == null || trackSid.isEmpty) {
607605
throw TrackSubscriptionExceptionEvent(
@@ -665,23 +663,15 @@ class Room extends DisposableChangeNotifier with EventsEmittable<RoomEvent> {
665663
if (_localParticipant?.identity == identity) {
666664
return _localParticipant;
667665
}
668-
return remoteParticipants[identity];
669-
}
670-
671-
RemoteParticipant? _getRemoteParticipantBySid(String sid) {
672-
final identity = _sidToIdentity[sid];
673-
if (identity != null) {
674-
return remoteParticipants[identity];
675-
}
676-
return null;
666+
return _remoteParticipants.byIdentity[identity];
677667
}
678668

679669
Future<ParticipantCreationResult> _getOrCreateRemoteParticipant(lk_models.ParticipantInfo info) async {
680670
if (!info.hasIdentity() || !info.hasSid()) {
681671
throw Exception('ParticipantInfo must have identity and sid');
682672
}
683673

684-
final participant = _remoteParticipants[info.identity];
674+
final participant = _remoteParticipants.byIdentity[info.identity];
685675
if (participant != null) {
686676
// Return existing participant with no new publications; caller handles updates.
687677
return ParticipantCreationResult(
@@ -695,8 +685,7 @@ class Room extends DisposableChangeNotifier with EventsEmittable<RoomEvent> {
695685
info: info,
696686
);
697687

698-
_remoteParticipants[result.participant.identity] = result.participant;
699-
_sidToIdentity[result.participant.sid] = result.participant.identity;
688+
_remoteParticipants.set(result.participant);
700689
await _flushPendingTracks(participant: result.participant);
701690
return result;
702691
}
@@ -718,7 +707,7 @@ class Room extends DisposableChangeNotifier with EventsEmittable<RoomEvent> {
718707
continue;
719708
}
720709

721-
final isNew = !_remoteParticipants.containsKey(info.identity);
710+
final isNew = !_remoteParticipants.containsIdentity(info.identity);
722711

723712
if (info.state == lk_models.ParticipantInfo_State.DISCONNECTED) {
724713
hasChanged = await _handleParticipantDisconnect(info.identity);
@@ -741,12 +730,12 @@ class Room extends DisposableChangeNotifier with EventsEmittable<RoomEvent> {
741730
[result.participant.events, events].emit(event);
742731
}
743732
}
744-
_sidToIdentity[info.sid] = info.identity;
733+
_remoteParticipants.set(result.participant);
745734
await _flushPendingTracks(participant: result.participant);
746735
} else {
747736
final wasUpdated = await result.participant.updateFromInfo(info);
748737
if (wasUpdated) {
749-
_sidToIdentity[info.sid] = info.identity;
738+
_remoteParticipants.set(result.participant);
750739
await _flushPendingTracks(participant: result.participant);
751740
}
752741
}
@@ -763,7 +752,7 @@ class Room extends DisposableChangeNotifier with EventsEmittable<RoomEvent> {
763752
};
764753

765754
for (final speaker in speakers) {
766-
Participant? p = _getRemoteParticipantBySid(speaker.sid);
755+
Participant? p = _remoteParticipants.bySid[speaker.sid];
767756
if (speaker.sid == localParticipant?.sid) p = localParticipant;
768757
if (p == null) continue;
769758

@@ -786,7 +775,7 @@ class Room extends DisposableChangeNotifier with EventsEmittable<RoomEvent> {
786775
isConnected: connectionState == ConnectionState.connected,
787776
participantSid: participant?.sid,
788777
subscriber: (pending) async {
789-
final target = participant ?? _getRemoteParticipantBySid(pending.participantSid);
778+
final target = participant ?? _remoteParticipants.bySid[pending.participantSid];
790779
if (target == null) return false;
791780
try {
792781
await target.addSubscribedMediaTrack(
@@ -816,7 +805,7 @@ class Room extends DisposableChangeNotifier with EventsEmittable<RoomEvent> {
816805
// localParticipant & remote participants
817806
final allParticipants = <String, Participant>{
818807
if (localParticipant != null) localParticipant!.sid: localParticipant!,
819-
..._remoteParticipants,
808+
..._remoteParticipants.bySid,
820809
};
821810

822811
for (final speaker in speakers) {
@@ -847,7 +836,7 @@ class Room extends DisposableChangeNotifier with EventsEmittable<RoomEvent> {
847836
if (entry.participantSid == localParticipant?.sid) {
848837
participant = localParticipant;
849838
} else {
850-
participant = _getRemoteParticipantBySid(entry.participantSid);
839+
participant = _remoteParticipants.bySid[entry.participantSid];
851840
}
852841

853842
if (participant != null) {
@@ -859,14 +848,12 @@ class Room extends DisposableChangeNotifier with EventsEmittable<RoomEvent> {
859848

860849
void _onSignalStreamStateUpdateEvent(List<lk_rtc.StreamStateInfo> updates) async {
861850
for (final update in updates) {
862-
final identity = _sidToIdentity[update.participantSid];
863-
if (identity == null) {
864-
logger.warning('participant not found for sid ${update.participantSid}');
851+
// try to find RemoteParticipant
852+
final participant = _remoteParticipants.bySid[update.participantSid];
853+
if (participant == null) {
854+
logger.warning('Participant not found for sid ${update.participantSid}');
865855
continue;
866856
}
867-
// try to find RemoteParticipant
868-
final participant = remoteParticipants[identity];
869-
if (participant == null) continue;
870857
// try to find RemoteTrackPublication
871858
final trackPublication = participant.trackPublications[update.trackSid];
872859
if (trackPublication == null) continue;
@@ -933,10 +920,8 @@ class Room extends DisposableChangeNotifier with EventsEmittable<RoomEvent> {
933920
}
934921

935922
Future<bool> _handleParticipantDisconnect(String identity) async {
936-
final participant = _remoteParticipants.remove(identity);
937-
if (participant == null) {
938-
return false;
939-
}
923+
final participant = _remoteParticipants.removeByIdentity(identity);
924+
if (participant == null) return false;
940925

941926
validateParticipantHasNoActiveDataStreams(identity);
942927

@@ -952,7 +937,7 @@ class Room extends DisposableChangeNotifier with EventsEmittable<RoomEvent> {
952937
final trackSids = <String>[];
953938
final trackSidsDisabled = <String>[];
954939

955-
for (var participant in remoteParticipants.values) {
940+
for (var participant in _remoteParticipants) {
956941
for (var track in participant.trackPublications.values) {
957942
if (track.subscribed != autoSubscribe) {
958943
trackSids.add(track.sid);
@@ -981,14 +966,13 @@ extension RoomPrivateMethods on Room {
981966
logger.fine('[${objectId}] cleanUp()');
982967

983968
// clean up RemoteParticipants
984-
final participants = _remoteParticipants.values.toList();
969+
final participants = _remoteParticipants.toList();
970+
_remoteParticipants.clear();
985971
for (final participant in participants) {
986972
await participant.removeAllPublishedTracks(notify: false);
987973
// RemoteParticipant is responsible for disposing resources
988974
await participant.dispose();
989975
}
990-
_remoteParticipants.clear();
991-
_sidToIdentity.clear();
992976
_pendingTrackQueue.clear();
993977

994978
// clean up LocalParticipant
@@ -1101,11 +1085,11 @@ extension RoomHardwareManagementMethods on Room {
11011085
/// Set audio output device.
11021086
Future<void> setAudioOutputDevice(MediaDevice device) async {
11031087
if (lkPlatformIs(PlatformType.web)) {
1104-
remoteParticipants.forEach((_, participant) {
1105-
for (var audioTrack in participant.audioTrackPublications) {
1088+
for (final participant in _remoteParticipants) {
1089+
for (final audioTrack in participant.audioTrackPublications) {
11061090
audioTrack.track?.setSinkId(device.deviceId);
11071091
}
1108-
});
1092+
}
11091093
Hardware.instance.selectedAudioOutput = device;
11101094
} else {
11111095
await Hardware.instance.selectAudioOutput(device);

0 commit comments

Comments
 (0)