Skip to content

Commit ff184a0

Browse files
Merge pull request #1700 from roboflow/lean/webrtc-data-decoupling
WebRTC data decoupling
2 parents ff4db8b + dc95986 commit ff184a0

File tree

9 files changed

+948
-183
lines changed

9 files changed

+948
-183
lines changed

examples/webrtc/webrtc_worker.py

Lines changed: 409 additions & 37 deletions
Large diffs are not rendered by default.

inference/core/exceptions.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,3 +216,7 @@ def __init__(self, message: str, inner_error: Exception):
216216
@property
217217
def inner_error(self) -> Exception:
218218
return self._inner_error
219+
220+
221+
class WebRTCConfigurationError(Exception):
222+
pass

inference/core/interfaces/http/error_handlers.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
RoboflowAPITimeoutError,
2929
RoboflowAPIUnsuccessfulRequestError,
3030
ServiceConfigurationError,
31+
WebRTCConfigurationError,
3132
WorkspaceLoadError,
3233
)
3334
from inference.core.interfaces.stream_manager.api.errors import (
@@ -358,6 +359,15 @@ def wrapped_route(*args, **kwargs):
358359
"inner_error_type": error.inner_error_type,
359360
},
360361
)
362+
except WebRTCConfigurationError as error:
363+
logger.error("%s: %s", type(error).__name__, error)
364+
resp = JSONResponse(
365+
status_code=400,
366+
content={
367+
"message": str(error),
368+
"error_type": "WebRTCConfigurationError",
369+
},
370+
)
361371
except Exception as error:
362372
logger.exception("%s: %s", type(error).__name__, error)
363373
resp = JSONResponse(status_code=500, content={"message": "Internal error."})
@@ -661,6 +671,15 @@ async def wrapped_route(*args, **kwargs):
661671
"inner_error_type": error.inner_error_type,
662672
},
663673
)
674+
except WebRTCConfigurationError as error:
675+
logger.error("%s: %s", type(error).__name__, error)
676+
resp = JSONResponse(
677+
status_code=400,
678+
content={
679+
"message": str(error),
680+
"error_type": "WebRTCConfigurationError",
681+
},
682+
)
664683
except Exception as error:
665684
logger.exception("%s: %s", type(error).__name__, error)
666685
resp = JSONResponse(status_code=500, content={"message": "Internal error."})

inference/core/interfaces/http/http_api.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,7 @@
171171
MissingServiceSecretError,
172172
RoboflowAPINotAuthorizedError,
173173
RoboflowAPINotNotFoundError,
174+
WebRTCConfigurationError,
174175
WorkspaceLoadError,
175176
)
176177
from inference.core.interfaces.base import BaseInterface
@@ -1467,6 +1468,7 @@ async def initialise_webrtc_worker(
14671468
"RoboflowAPINotAuthorizedError": RoboflowAPINotAuthorizedError,
14681469
"RoboflowAPINotNotFoundError": RoboflowAPINotNotFoundError,
14691470
"ValidationError": ValidationError,
1471+
"WebRTCConfigurationError": WebRTCConfigurationError,
14701472
}
14711473
exc = expected_exceptions.get(
14721474
worker_result.exception_type, Exception

inference/core/interfaces/stream_manager/manager_app/entities.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -112,8 +112,8 @@ class InitialiseWebRTCPipelinePayload(InitialisePipelinePayload):
112112
WEBRTC_REALTIME_PROCESSING # this parameter controls only webrtc processing, not inference pipeline strategies
113113
)
114114
webrtc_turn_config: Optional[WebRTCTURNConfig] = None
115-
stream_output: Optional[List[Optional[str]]] = Field(default_factory=list)
116-
data_output: Optional[List[Optional[str]]] = Field(default_factory=list)
115+
stream_output: Optional[List[str]] = Field(default_factory=list)
116+
data_output: Optional[List[str]] = Field(default_factory=list)
117117
webcam_fps: Optional[float] = (
118118
None # TODO: this parameter is now passed for both webcam and video source
119119
)
@@ -124,8 +124,8 @@ class InitialiseWebRTCPipelinePayload(InitialisePipelinePayload):
124124

125125

126126
class WebRTCData(BaseModel):
127-
stream_output: Optional[str] = None
128-
data_output: Optional[str] = None
127+
stream_output: Optional[List[str]] = None
128+
data_output: Optional[List[str]] = None
129129

130130

131131
class ConsumeResultsPayload(BaseModel):

inference/core/interfaces/stream_manager/manager_app/webrtc.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -338,7 +338,7 @@ def __init__(
338338
video_transform_track: VideoTransformTrack,
339339
asyncio_loop: asyncio.AbstractEventLoop,
340340
stream_output: Optional[str] = None,
341-
data_output: Optional[str] = None,
341+
data_output: Optional[List[str]] = None,
342342
*args,
343343
**kwargs,
344344
):
@@ -347,7 +347,7 @@ def __init__(
347347
self.video_transform_track: VideoTransformTrack = video_transform_track
348348
self._consumers_signalled: bool = False
349349
self.stream_output: Optional[str] = stream_output
350-
self.data_output: Optional[str] = data_output
350+
self.data_output: Optional[List[str]] = data_output
351351
self.data_channel: Optional[RTCDataChannel] = None
352352

353353

@@ -384,7 +384,7 @@ async def init_rtc_peer_connection(
384384
webrtc_realtime_processing: bool = True,
385385
webcam_fps: Optional[float] = None,
386386
stream_output: Optional[str] = None,
387-
data_output: Optional[str] = None,
387+
data_output: Optional[List[str]] = None,
388388
) -> RTCPeerConnectionWithFPS:
389389
relay = MediaRelay()
390390
video_transform_track = VideoTransformTrack(

inference/core/interfaces/webrtc_worker/entities.py

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
from typing import List, Literal, Optional, Union
1+
from enum import Enum
2+
from typing import Any, Dict, List, Literal, Optional, Union
23

34
from pydantic import BaseModel, Field
45

@@ -21,8 +22,8 @@ class WebRTCWorkerRequest(BaseModel):
2122
webrtc_realtime_processing: bool = (
2223
WEBRTC_REALTIME_PROCESSING # when set to True, MediaRelay.subscribe will be called with buffered=False
2324
)
24-
stream_output: Optional[List[Optional[str]]] = Field(default_factory=list)
25-
data_output: Optional[List[Optional[str]]] = Field(default_factory=list)
25+
stream_output: Optional[List[str]] = Field(default=None)
26+
data_output: Optional[List[str]] = Field(default=None)
2627
declared_fps: Optional[float] = None
2728
rtsp_url: Optional[str] = None
2829
processing_timeout: Optional[int] = WEBRTC_MODAL_FUNCTION_TIME_LIMIT
@@ -53,8 +54,15 @@ class WebRTCVideoMetadata(BaseModel):
5354

5455

5556
class WebRTCOutput(BaseModel):
56-
output_name: Optional[str] = None
57-
serialized_output_data: Optional[str] = None
57+
"""Output sent via WebRTC data channel.
58+
59+
serialized_output_data contains a dictionary with workflow outputs:
60+
- If data_output is None or []: no data sent (only metadata)
61+
- If data_output is ["*"]: all workflow outputs (excluding images, unless explicitly named)
62+
- If data_output is ["field1", "field2"]: only those fields (including images if explicitly named)
63+
"""
64+
65+
serialized_output_data: Optional[Dict[str, Any]] = None
5866
video_metadata: Optional[WebRTCVideoMetadata] = None
5967
errors: List[str] = Field(default_factory=list)
6068

@@ -66,3 +74,15 @@ class WebRTCWorkerResult(BaseModel):
6674
error_message: Optional[str] = None
6775
error_context: Optional[str] = None
6876
inner_error: Optional[str] = None
77+
78+
79+
class StreamOutputMode(str, Enum):
80+
AUTO_DETECT = "auto_detect" # None -> auto-detect first image
81+
NO_VIDEO = "no_video" # [] -> no video track
82+
SPECIFIC_FIELD = "specific" # ["field"] -> use specific field
83+
84+
85+
class DataOutputMode(str, Enum):
86+
NONE = "none" # None or [] -> no data sent
87+
ALL = "all" # ["*"] -> send all (skip images)
88+
SPECIFIC = "specific" # ["field1", "field2"] -> send only these

inference/core/interfaces/webrtc_worker/utils.py

Lines changed: 44 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,38 @@
1515
logging.getLogger("aiortc").setLevel(logging.WARNING)
1616

1717

18+
def detect_image_output(
19+
workflow_output: Dict[str, Union[WorkflowImageData, Any]],
20+
) -> Optional[str]:
21+
"""Detect the first available image output field in workflow output."""
22+
for output_name in workflow_output.keys():
23+
if (
24+
get_frame_from_workflow_output(
25+
workflow_output=workflow_output,
26+
frame_output_key=output_name,
27+
)
28+
is not None
29+
):
30+
return output_name
31+
return None
32+
33+
1834
def process_frame(
1935
frame: VideoFrame,
2036
frame_id: int,
2137
inference_pipeline: InferencePipeline,
22-
stream_output: str,
38+
stream_output: Optional[str] = None,
39+
render_output: bool = True,
2340
include_errors_on_frame: bool = True,
24-
) -> Tuple[Dict[str, Union[WorkflowImageData, Any]], VideoFrame, List[str]]:
41+
) -> Tuple[
42+
Dict[str, Union[WorkflowImageData, Any]],
43+
Optional[VideoFrame],
44+
List[str],
45+
]:
2546
np_image = frame.to_ndarray(format="bgr24")
2647
workflow_output: Dict[str, Union[WorkflowImageData, Any]] = {}
2748
errors = []
49+
2850
try:
2951
video_frame = InferenceVideoFrame(
3052
image=np_image,
@@ -34,36 +56,40 @@ def process_frame(
3456
fps=30, # placeholder
3557
measured_fps=30, # placeholder
3658
)
37-
workflow_output: Dict[str, Union[WorkflowImageData, Any]] = (
38-
inference_pipeline._on_video_frame([video_frame])[0]
59+
workflow_output = inference_pipeline._on_video_frame([video_frame])[0]
60+
except Exception as e:
61+
logger.exception("Error in workflow processing")
62+
errors.append(str(e))
63+
64+
if not render_output:
65+
return workflow_output, None, errors
66+
67+
if stream_output is None:
68+
errors.append("stream_output is required when render_output=True")
69+
return (
70+
workflow_output,
71+
VideoFrame.from_ndarray(np_image, format="bgr24"),
72+
errors,
3973
)
40-
result_np_image: Optional[np.ndarray] = get_frame_from_workflow_output(
74+
75+
result_np_image: Optional[np.ndarray] = None
76+
try:
77+
result_np_image = get_frame_from_workflow_output(
4178
workflow_output=workflow_output,
4279
frame_output_key=stream_output,
4380
)
44-
if result_np_image is None:
45-
for k in workflow_output.keys():
46-
result_np_image = get_frame_from_workflow_output(
47-
workflow_output=workflow_output,
48-
frame_output_key=k,
49-
)
50-
if result_np_image is not None:
51-
errors.append(
52-
f"'{stream_output}' not found in workflow outputs, using '{k}' instead"
53-
)
54-
break
5581
if result_np_image is None:
5682
errors.append("Visualisation blocks were not executed")
5783
errors.append("or workflow was not configured to output visuals.")
5884
errors.append("Please try to adjust the scene so models detect objects")
5985
errors.append("or stop preview, update workflow and try again.")
6086
result_np_image = np_image
6187
except Exception as e:
62-
logger.exception("Error in inference pipeline")
88+
logger.exception("Error extracting visual output")
6389
result_np_image = np_image
6490
errors.append(str(e))
6591

66-
if include_errors_on_frame:
92+
if include_errors_on_frame and errors:
6793
result_np_image = overlay_text_on_np_frame(
6894
frame=result_np_image,
6995
text=errors,

0 commit comments

Comments
 (0)