@@ -19,10 +19,10 @@ use std::pin::Pin;
1919
2020use async_trait:: async_trait;
2121use axum:: http:: HeaderMap ;
22- use futures:: { Future , Stream , StreamExt , TryStreamExt } ;
22+ use futures:: { Future , StreamExt , TryStreamExt } ;
2323use ginepro:: LoadBalancedChannel ;
2424use tonic:: { Code , Request , Response , Status , Streaming } ;
25- use tracing:: { Span , debug , info , instrument } ;
25+ use tracing:: Span ;
2626
2727use super :: {
2828 BoxStream , Client , Error , create_grpc_client, errors:: grpc_to_http_code,
@@ -36,7 +36,7 @@ use crate::{
3636 BidiStreamingChunkerTokenizationTaskRequest , ChunkerTokenizationTaskRequest ,
3737 chunkers_service_client:: ChunkersServiceClient ,
3838 } ,
39- caikit_data_model:: nlp:: { ChunkerTokenizationStreamResult , Token , TokenizationResults } ,
39+ caikit_data_model:: nlp:: { ChunkerTokenizationStreamResult , TokenizationResults } ,
4040 grpc:: health:: v1:: { HealthCheckRequest , health_client:: HealthClient } ,
4141 } ,
4242 utils:: trace:: trace_context_from_grpc_response,
@@ -50,14 +50,12 @@ pub const DEFAULT_CHUNKER_ID: &str = "whole_doc_chunker";
5050type StreamingTokenizationResult =
5151 Result < Response < Streaming < ChunkerTokenizationStreamResult > > , Status > ;
5252
53- #[ cfg_attr( test, faux:: create) ]
5453#[ derive( Clone ) ]
5554pub struct ChunkerClient {
5655 client : ChunkersServiceClient < OtelGrpcService < LoadBalancedChannel > > ,
5756 health_client : HealthClient < OtelGrpcService < LoadBalancedChannel > > ,
5857}
5958
60- #[ cfg_attr( test, faux:: methods) ]
6159impl ChunkerClient {
6260 pub async fn new ( config : & ServiceConfig ) -> Self {
6361 let client = create_grpc_client ( DEFAULT_PORT , config, ChunkersServiceClient :: new) . await ;
@@ -68,28 +66,24 @@ impl ChunkerClient {
6866 }
6967 }
7068
71- #[ instrument( skip_all, fields( model_id) ) ]
7269 pub async fn tokenization_task_predict (
7370 & self ,
7471 model_id : & str ,
7572 request : ChunkerTokenizationTaskRequest ,
7673 ) -> Result < TokenizationResults , Error > {
7774 let mut client = self . client . clone ( ) ;
7875 let request = request_with_headers ( request, model_id) ;
79- debug ! ( ?request, "sending client request" ) ;
8076 let response = client. chunker_tokenization_task_predict ( request) . await ?;
8177 let span = Span :: current ( ) ;
8278 trace_context_from_grpc_response ( & span, & response) ;
8379 Ok ( response. into_inner ( ) )
8480 }
8581
86- #[ instrument( skip_all, fields( model_id) ) ]
8782 pub async fn bidi_streaming_tokenization_task_predict (
8883 & self ,
8984 model_id : & str ,
9085 request_stream : BoxStream < BidiStreamingChunkerTokenizationTaskRequest > ,
9186 ) -> Result < BoxStream < Result < ChunkerTokenizationStreamResult , Error > > , Error > {
92- info ! ( "sending client stream request" ) ;
9387 let mut client = self . client . clone ( ) ;
9488 let request = request_with_headers ( request_stream, model_id) ;
9589 // NOTE: this is an ugly workaround to avoid bogus higher-ranked lifetime errors.
@@ -103,7 +97,6 @@ impl ChunkerClient {
10397 }
10498}
10599
106- #[ cfg_attr( test, faux:: methods) ]
107100#[ async_trait]
108101impl Client for ChunkerClient {
109102 fn name ( & self ) -> & str {
@@ -144,108 +137,3 @@ fn request_with_headers<T>(request: T, model_id: &str) -> Request<T> {
144137 . insert ( MODEL_ID_HEADER_NAME , model_id. parse ( ) . unwrap ( ) ) ;
145138 request
146139}
147-
148- /// Unary tokenization result of the entire doc
149- #[ instrument( skip_all) ]
150- pub fn tokenize_whole_doc ( request : ChunkerTokenizationTaskRequest ) -> TokenizationResults {
151- let codepoint_count = request. text . chars ( ) . count ( ) as i64 ;
152- TokenizationResults {
153- results : vec ! [ Token {
154- start: 0 ,
155- end: codepoint_count,
156- text: request. text,
157- } ] ,
158- token_count : 1 , // entire doc
159- }
160- }
161-
162- /// Streaming tokenization result for the entire doc stream
163- #[ instrument( skip_all) ]
164- pub async fn tokenize_whole_doc_stream (
165- request : impl Stream < Item = BidiStreamingChunkerTokenizationTaskRequest > ,
166- ) -> Result < ChunkerTokenizationStreamResult , Error > {
167- let ( text, index_vec) : ( String , Vec < i64 > ) = request
168- . map ( |r| ( r. text_stream , r. input_index_stream ) )
169- . collect ( )
170- . await ;
171- let codepoint_count = text. chars ( ) . count ( ) as i64 ;
172- let input_end_index = index_vec. last ( ) . copied ( ) . unwrap_or_default ( ) ;
173- Ok ( ChunkerTokenizationStreamResult {
174- results : vec ! [ Token {
175- start: 0 ,
176- end: codepoint_count,
177- text,
178- } ] ,
179- token_count : 1 , // entire doc/stream
180- processed_index : codepoint_count,
181- start_index : 0 ,
182- input_start_index : 0 ,
183- input_end_index,
184- } )
185- }
186-
187- #[ cfg( test) ]
188- mod tests {
189- use super :: * ;
190-
191- #[ test]
192- fn test_tokenize_whole_doc ( ) {
193- let request = ChunkerTokenizationTaskRequest {
194- text : "Lorem ipsum dolor sit amet consectetur adipiscing \
195- elit sed do eiusmod tempor incididunt ut labore et dolore \
196- magna aliqua."
197- . into ( ) ,
198- } ;
199- let expected_response = TokenizationResults {
200- results : vec ! [ Token {
201- start: 0 ,
202- end: 121 ,
203- text: "Lorem ipsum dolor sit amet consectetur \
204- adipiscing elit sed do eiusmod tempor incididunt \
205- ut labore et dolore magna aliqua."
206- . into( ) ,
207- } ] ,
208- token_count : 1 ,
209- } ;
210- let response = tokenize_whole_doc ( request) ;
211- assert_eq ! ( response, expected_response)
212- }
213-
214- #[ tokio:: test]
215- async fn test_tokenize_whole_doc_stream ( ) {
216- let request = futures:: stream:: iter ( vec ! [
217- BidiStreamingChunkerTokenizationTaskRequest {
218- text_stream: "Lorem ipsum dolor sit amet " . into( ) ,
219- input_index_stream: 0 ,
220- } ,
221- BidiStreamingChunkerTokenizationTaskRequest {
222- text_stream: "consectetur adipiscing elit " . into( ) ,
223- input_index_stream: 1 ,
224- } ,
225- BidiStreamingChunkerTokenizationTaskRequest {
226- text_stream: "sed do eiusmod tempor incididunt " . into( ) ,
227- input_index_stream: 2 ,
228- } ,
229- BidiStreamingChunkerTokenizationTaskRequest {
230- text_stream: "ut labore et dolore magna aliqua." . into( ) ,
231- input_index_stream: 3 ,
232- } ,
233- ] ) ;
234- let expected_response = ChunkerTokenizationStreamResult {
235- results : vec ! [ Token {
236- start: 0 ,
237- end: 121 ,
238- text: "Lorem ipsum dolor sit amet consectetur adipiscing elit \
239- sed do eiusmod tempor incididunt ut labore et dolore magna aliqua."
240- . into( ) ,
241- } ] ,
242- token_count : 1 ,
243- processed_index : 121 ,
244- start_index : 0 ,
245- input_start_index : 0 ,
246- input_end_index : 3 ,
247- } ;
248- let response = tokenize_whole_doc_stream ( request) . await . unwrap ( ) ;
249- assert_eq ! ( response, expected_response) ;
250- }
251- }
0 commit comments