-
Notifications
You must be signed in to change notification settings - Fork 424
[Feature] Ensure MultiSyncDataCollectors returns data ordered by worker id
#3243
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 3 commits
5f582de
73eed6c
bb5fad6
c428fd4
981a6f6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -3760,8 +3760,7 @@ def iterator(self) -> Iterator[TensorDictBase]: | |
| cat_results = self.cat_results | ||
| if cat_results is None: | ||
| cat_results = "stack" | ||
|
|
||
| self.buffers = {} | ||
| self.buffers = [None for _ in range(self.num_workers)] | ||
| dones = [False for _ in range(self.num_workers)] | ||
| workers_frames = [0 for _ in range(self.num_workers)] | ||
| same_device = None | ||
|
|
@@ -3781,7 +3780,6 @@ def iterator(self) -> Iterator[TensorDictBase]: | |
| msg = "continue_random" | ||
| else: | ||
| msg = "continue" | ||
| # Debug: sending 'continue' | ||
| self.pipes[idx].send((None, msg)) | ||
|
|
||
| self._iter += 1 | ||
|
|
@@ -3844,16 +3842,16 @@ def iterator(self) -> Iterator[TensorDictBase]: | |
| if preempt: | ||
| # mask buffers if cat, and create a mask if stack | ||
| if cat_results != "stack": | ||
| buffers = {} | ||
| for worker_idx, buffer in self.buffers.items(): | ||
| buffers = [None] * self.num_workers | ||
| for idx, buffer in enumerate(filter(None.__ne__, self.buffers)): | ||
| valid = buffer.get(("collector", "traj_ids")) != -1 | ||
| if valid.ndim > 2: | ||
| valid = valid.flatten(0, -2) | ||
| if valid.ndim == 2: | ||
| valid = valid.any(0) | ||
| buffers[worker_idx] = buffer[..., valid] | ||
| buffers[idx] = buffer[..., valid] | ||
| else: | ||
| for buffer in self.buffers.values(): | ||
| for buffer in filter(None.__ne__, self.buffers): | ||
| with buffer.unlock_(): | ||
| buffer.set( | ||
| ("collector", "mask"), | ||
|
|
@@ -3863,11 +3861,6 @@ def iterator(self) -> Iterator[TensorDictBase]: | |
| else: | ||
| buffers = self.buffers | ||
|
|
||
| # Skip frame counting if this worker didn't send data this iteration | ||
| # (happens when reusing buffers or on first iteration with some workers) | ||
| if idx not in buffers: | ||
| continue | ||
|
|
||
|
||
| workers_frames[idx] = workers_frames[idx] + buffers[idx].numel() | ||
|
|
||
| if workers_frames[idx] >= self.total_frames: | ||
|
|
@@ -3876,17 +3869,15 @@ def iterator(self) -> Iterator[TensorDictBase]: | |
| if self.replay_buffer is not None: | ||
| yield | ||
| self._frames += sum( | ||
| [ | ||
| self.frames_per_batch_worker(worker_idx) | ||
| for worker_idx in range(self.num_workers) | ||
| ] | ||
| self.frames_per_batch_worker(worker_idx) | ||
| for worker_idx in range(self.num_workers) | ||
| ) | ||
| continue | ||
|
|
||
| # we have to correct the traj_ids to make sure that they don't overlap | ||
| # We can count the number of frames collected for free in this loop | ||
| n_collected = 0 | ||
| for idx in buffers.keys(): | ||
| for idx in range(self.num_workers): | ||
| buffer = buffers[idx] | ||
| traj_ids = buffer.get(("collector", "traj_ids")) | ||
| if preempt: | ||
|
|
@@ -3901,7 +3892,7 @@ def iterator(self) -> Iterator[TensorDictBase]: | |
| if same_device is None: | ||
| prev_device = None | ||
| same_device = True | ||
| for item in self.buffers.values(): | ||
| for item in filter(None.__ne__, self.buffers): | ||
| if prev_device is None: | ||
| prev_device = item.device | ||
| else: | ||
|
|
@@ -3912,33 +3903,30 @@ def iterator(self) -> Iterator[TensorDictBase]: | |
| torch.stack if self._use_buffers else TensorDict.maybe_dense_stack | ||
| ) | ||
| if same_device: | ||
| self.out_buffer = stack(list(buffers.values()), 0) | ||
| self.out_buffer = stack( | ||
| [item for item in buffers if item is not None], 0 | ||
| ) | ||
| else: | ||
| self.out_buffer = stack( | ||
| [item.cpu() for item in buffers.values()], 0 | ||
| [item.cpu() for item in buffers if item is not None], 0 | ||
| ) | ||
| else: | ||
| if self._use_buffers is None: | ||
| torchrl_logger.warning( | ||
| "use_buffer not specified and not yet inferred from data, assuming `True`." | ||
| ) | ||
| torchrl_logger.warning("use_buffer not specified and not yet inferred from data, assuming `True`.") | ||
| elif not self._use_buffers: | ||
| raise RuntimeError( | ||
| "Cannot concatenate results with use_buffers=False" | ||
| ) | ||
| raise RuntimeError("Cannot concatenate results with use_buffers=False") | ||
| try: | ||
| if same_device: | ||
| self.out_buffer = torch.cat(list(buffers.values()), cat_results) | ||
| self.out_buffer = torch.cat( | ||
| [item for item in buffers if item is not None], cat_results | ||
| ) | ||
| else: | ||
| self.out_buffer = torch.cat( | ||
| [item.cpu() for item in buffers.values()], cat_results | ||
| [item.cpu() for item in buffers if item is not None], | ||
| cat_results, | ||
| ) | ||
| except RuntimeError as err: | ||
| if ( | ||
| preempt | ||
| and cat_results != -1 | ||
| and "Sizes of tensors must match" in str(err) | ||
| ): | ||
| if preempt and cat_results != -1 and "Sizes of tensors must match" in str(err): | ||
| raise RuntimeError( | ||
| "The value provided to cat_results isn't compatible with the collectors outputs. " | ||
| "Consider using `cat_results=-1`." | ||
|
|
@@ -3956,11 +3944,7 @@ def iterator(self) -> Iterator[TensorDictBase]: | |
| self._frames += n_collected | ||
|
|
||
| if self.postprocs: | ||
| self.postprocs = ( | ||
| self.postprocs.to(out.device) | ||
| if hasattr(self.postprocs, "to") | ||
| else self.postprocs | ||
| ) | ||
| self.postprocs = self.postprocs.to(out.device) if hasattr(self.postprocs, "to") else self.postprocs | ||
| out = self.postprocs(out) | ||
| if self._exclude_private_keys: | ||
| excluded_keys = [key for key in out.keys() if key.startswith("_")] | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to filter out buffers who did not return their experience: I use the
enumerate(filter(None.__ne__, self.buffers))idiom to make this compact and hopefully readable; I'm open to better ideasThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's ok
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but you define idx which was defined earlier (LoC 3829 or 3840). It should be worker_idx I believe
See my comment below
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, fixed. Also, following some internal comments I am switching to
lambda x: x is Nonefor readability