Skip to content

Commit c2ce156

Browse files
Merge pull request #1772 from roboflow/tweak-data-channel-frames-processing
Tweaks to improve data channel frames processing
2 parents 035d78f + 13ecc24 commit c2ce156

File tree

1 file changed

+22
-10
lines changed
  • inference/core/interfaces/webrtc_worker

1 file changed

+22
-10
lines changed

inference/core/interfaces/webrtc_worker/webrtc.py

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -162,10 +162,11 @@ def send_chunked_data(
162162
f"Sending response for frame {frame_id}: {total_chunks} chunk(s), {len(payload_bytes)} bytes"
163163
)
164164

165+
view = memoryview(payload_bytes)
165166
for chunk_index in range(total_chunks):
166167
start = chunk_index * chunk_size
167168
end = min(start + chunk_size, len(payload_bytes))
168-
chunk_data = payload_bytes[start:end]
169+
chunk_data = view[start:end]
169170

170171
message = create_chunked_binary_message(
171172
frame_id, chunk_index, total_chunks, chunk_data
@@ -305,7 +306,9 @@ async def _send_data_output(
305306

306307
if self._data_mode == DataOutputMode.NONE:
307308
# Even empty responses use binary protocol
308-
json_bytes = json.dumps(webrtc_output.model_dump()).encode("utf-8")
309+
json_bytes = await asyncio.to_thread(
310+
lambda: json.dumps(webrtc_output.model_dump()).encode("utf-8")
311+
)
309312
send_chunked_data(self.data_channel, self._received_frames, json_bytes)
310313
return
311314

@@ -371,15 +374,22 @@ async def _handle_data_channel_frame(self, message: bytes) -> None:
371374
f"Received frame {frame_id}: {total_chunks} chunk(s), {len(jpeg_bytes)} bytes JPEG"
372375
)
373376

374-
nparr = np.frombuffer(jpeg_bytes, np.uint8)
375-
np_image = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
377+
def _decode_to_frame(jpeg_bytes: bytes) -> VideoFrame:
378+
nparr = np.frombuffer(jpeg_bytes, np.uint8)
379+
np_image = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
380+
381+
if np_image is None:
382+
raise ValueError("cv2.imdecode returned None")
383+
384+
return VideoFrame.from_ndarray(np_image, format="bgr24")
376385

377-
if np_image is None:
378-
logger.error(f"Failed to decode JPEG for frame {frame_id}")
386+
try:
387+
video_frame = await asyncio.to_thread(_decode_to_frame, jpeg_bytes)
388+
except Exception as e:
389+
logger.error(f"Failed to decode JPEG for frame {frame_id}: {e}")
379390
return
380391

381-
video_frame = VideoFrame.from_ndarray(np_image, format="bgr24")
382-
await self._data_frame_queue.put((frame_id, video_frame))
392+
self._data_frame_queue.put_nowait((frame_id, video_frame))
383393

384394
if frame_id % 100 == 1:
385395
logger.info(f"Queued frame {frame_id}")
@@ -440,8 +450,10 @@ async def process_frames_data_only(self):
440450
)
441451

442452
# Send data via data channel
443-
await self._send_data_output(
444-
workflow_output, frame_timestamp, frame, errors
453+
asyncio.create_task(
454+
self._send_data_output(
455+
workflow_output, frame_timestamp, frame, errors
456+
)
445457
)
446458

447459
except asyncio.CancelledError:

0 commit comments

Comments
 (0)