diff --git a/crates/sui-e2e-tests/tests/authenticated_events_api_tests.rs b/crates/sui-e2e-tests/tests/authenticated_events_api_tests.rs index cb18d8241dcdf..1896a6ab3666e 100644 --- a/crates/sui-e2e-tests/tests/authenticated_events_api_tests.rs +++ b/crates/sui-e2e-tests/tests/authenticated_events_api_tests.rs @@ -104,17 +104,53 @@ async fn emit_multiple_test_events( test_cluster: &TestCluster, package_id: ObjectID, sender: SuiAddress, - values: Vec, + start_value: u64, + count: u64, ) -> sui_json_rpc_types::SuiTransactionBlockResponse { let rgp = test_cluster.get_reference_gas_price().await; let mut ptb = ProgrammableTransactionBuilder::new(); - let vals = ptb.pure(values).unwrap(); + let start = ptb.pure(start_value).unwrap(); + let cnt = ptb.pure(count).unwrap(); ptb.programmable_move_call( package_id, move_core_types::identifier::Identifier::new("events").unwrap(), move_core_types::identifier::Identifier::new("emit_multiple").unwrap(), vec![], - vec![vals], + vec![start, cnt], + ); + let gas_object = test_cluster + .wallet + .get_one_gas_object_owned_by_address(sender) + .await + .unwrap() + .unwrap(); + let tx_data = TransactionData::new( + sui_types::transaction::TransactionKind::ProgrammableTransaction(ptb.finish()), + sender, + gas_object, + 50_000_000_000, + rgp, + ); + test_cluster.sign_and_execute_transaction(&tx_data).await +} + +async fn emit_large_test_event( + test_cluster: &TestCluster, + package_id: ObjectID, + sender: SuiAddress, + value: u64, + size: u64, +) -> sui_json_rpc_types::SuiTransactionBlockResponse { + let rgp = test_cluster.get_reference_gas_price().await; + let mut ptb = ProgrammableTransactionBuilder::new(); + let val = ptb.pure(value).unwrap(); + let sz = ptb.pure(size).unwrap(); + ptb.programmable_move_call( + package_id, + move_core_types::identifier::Identifier::new("events").unwrap(), + move_core_types::identifier::Identifier::new("emit_large").unwrap(), + vec![], + vec![val, sz], ); let gas_object = test_cluster .wallet @@ -157,6 +193,140 @@ async fn query_authenticated_events( .map(|r| r.into_inner()) } +async fn list_authenticated_events( + rpc_url: &str, + stream_id: &str, + start_checkpoint: u64, + page_size: Option, +) -> Vec { + let mut event_client = EventServiceClient::connect(rpc_url.to_owned()) + .await + .unwrap(); + + let mut all_events = Vec::new(); + let mut page_token: Option> = None; + let page_size_value = page_size.unwrap_or(1000); + + loop { + let mut req = ListAuthenticatedEventsRequest::default(); + req.stream_id = Some(stream_id.to_string()); + req.start_checkpoint = Some(start_checkpoint); + req.page_size = page_size; + req.page_token = page_token.clone(); + + let response = event_client + .list_authenticated_events(req) + .await + .unwrap() + .into_inner(); + + let event_count = response.events.len(); + all_events.extend(response.events); + + if response.next_page_token.is_none() + || response.next_page_token.as_ref().unwrap().is_empty() + { + break; + } + + page_token = response.next_page_token; + + assert!( + event_count <= page_size_value as usize, + "Page should not exceed page_size. Got {} events, expected <= {}", + event_count, + page_size_value + ); + } + + all_events +} + +async fn verify_events_with_stream_head( + test_cluster: &TestCluster, + package_id: ObjectID, + events: &[AuthenticatedEvent], + expected_event_count: u64, +) { + let stream_id = sui_types::base_types::SuiAddress::from(package_id); + let event_stream_head_id = get_event_stream_head_object_id(stream_id).unwrap(); + + let mut proof_client = ProofServiceClient::connect(test_cluster.rpc_url().to_owned()) + .await + .unwrap(); + + let mut ledger_client = LedgerServiceClient::connect(test_cluster.rpc_url().to_owned()) + .await + .unwrap(); + + let current_epoch = test_cluster + .fullnode_handle + .sui_node + .state() + .epoch_store_for_testing() + .epoch(); + let genesis_committee = get_genesis_committee(test_cluster).await.unwrap(); + let epoch_cache = build_epoch_cache(&mut ledger_client, genesis_committee, current_epoch) + .await + .expect("Failed to build epoch cache"); + + let first_event_checkpoint = events[0].checkpoint.unwrap(); + let last_event_checkpoint = events.last().unwrap().checkpoint.unwrap(); + + let first_stream_head = fetch_and_verify_event_stream_head( + &mut proof_client, + &mut ledger_client, + &epoch_cache, + event_stream_head_id, + first_event_checkpoint, + ) + .await; + + let last_stream_head = fetch_and_verify_event_stream_head( + &mut proof_client, + &mut ledger_client, + &epoch_cache, + event_stream_head_id, + last_event_checkpoint, + ) + .await; + + assert_eq!( + last_stream_head.value.num_events, expected_event_count, + "expected {} events in final stream head", + expected_event_count + ); + + let events_by_checkpoint: BTreeMap> = + events.iter().fold(BTreeMap::new(), |mut map, event| { + let commitment = convert_grpc_event_to_commitment(event) + .expect("should convert event to commitment"); + map.entry(commitment.checkpoint_seq) + .or_default() + .push(commitment); + map + }); + + let checkpoints_with_events: Vec> = events_by_checkpoint + .iter() + .filter(|(cp, _)| **cp > first_event_checkpoint) + .map(|(_cp, events)| events.clone()) + .collect(); + + let calculated_stream_head = + apply_stream_updates(&first_stream_head.value, checkpoints_with_events); + + assert_eq!( + calculated_stream_head.num_events, last_stream_head.value.num_events, + "Calculated event count should match actual event count" + ); + + assert_eq!( + calculated_stream_head.mmr, last_stream_head.value.mmr, + "Calculated MMR should match actual MMR from EventStreamHead" + ); +} + fn proto_object_ref_to_sui_object_ref( object_ref_proto: &sui_rpc::proto::sui::rpc::v2::ObjectReference, ) -> Result<(ObjectID, SequenceNumber, ObjectDigest), String> { @@ -569,25 +739,13 @@ async fn list_authenticated_events_end_to_end() { emit_test_event(&test_cluster, package_id, sender, 100 + i).await; } - let mut event_client = EventServiceClient::connect(test_cluster.rpc_url().to_owned()) - .await - .unwrap(); + let all_events = + list_authenticated_events(test_cluster.rpc_url(), &package_id.to_string(), 0, None).await; - let mut req = ListAuthenticatedEventsRequest::default(); - req.stream_id = Some(package_id.to_string()); - req.start_checkpoint = Some(0); - req.page_size = None; - req.page_token = None; - let response = event_client - .list_authenticated_events(req) - .await - .unwrap() - .into_inner(); - - let count = response.events.len(); + let count = all_events.len(); assert_eq!(count, 10, "expected 10 authenticated events, got {count}"); - let found = response.events.iter().any(|event| match &event.event { + let found = all_events.iter().any(|event| match &event.event { Some(Event { contents: Some(bcs), .. @@ -596,87 +754,7 @@ async fn list_authenticated_events_end_to_end() { }); assert!(found, "expected authenticated event for the stream"); - let first_event_checkpoint = response.events[0].checkpoint.unwrap(); - let last_event_checkpoint = response.events.last().unwrap().checkpoint.unwrap(); - - let stream_id = sui_types::base_types::SuiAddress::from(package_id); - let event_stream_head_id = get_event_stream_head_object_id(stream_id).unwrap(); - - let mut proof_client = ProofServiceClient::connect(test_cluster.rpc_url().to_owned()) - .await - .unwrap(); - - let mut ledger_client = LedgerServiceClient::connect(test_cluster.rpc_url().to_owned()) - .await - .unwrap(); - - // Build committee cache once by doing trust ratcheting - let current_epoch = test_cluster - .fullnode_handle - .sui_node - .state() - .epoch_store_for_testing() - .epoch(); - let genesis_committee = get_genesis_committee(&test_cluster).await.unwrap(); - let epoch_cache = build_epoch_cache(&mut ledger_client, genesis_committee, current_epoch) - .await - .expect("Failed to build epoch cache"); - - let stream_head = fetch_and_verify_event_stream_head( - &mut proof_client, - &mut ledger_client, - &epoch_cache, - event_stream_head_id, - first_event_checkpoint, - ) - .await; - - assert!(!stream_head.value.mmr.is_empty(), "MMR should not be empty"); - - let last_stream_head = fetch_and_verify_event_stream_head( - &mut proof_client, - &mut ledger_client, - &epoch_cache, - event_stream_head_id, - last_event_checkpoint, - ) - .await; - - assert_eq!( - last_stream_head.value.num_events, 10, - "expected 10 events in final stream head" - ); - - let events_by_checkpoint: BTreeMap> = - response - .events - .iter() - .fold(BTreeMap::new(), |mut map, event| { - let commitment = convert_grpc_event_to_commitment(event) - .expect("should convert event to commitment"); - map.entry(commitment.checkpoint_seq) - .or_default() - .push(commitment); - map - }); - - let checkpoints_with_events: Vec> = events_by_checkpoint - .iter() - .filter(|(cp, _)| **cp > first_event_checkpoint) - .map(|(_cp, events)| events.clone()) - .collect(); - - let calculated_stream_head = apply_stream_updates(&stream_head.value, checkpoints_with_events); - - assert_eq!( - calculated_stream_head.num_events, last_stream_head.value.num_events, - "Calculated event count should match actual event count" - ); - - assert_eq!( - calculated_stream_head.mmr, last_stream_head.value.mmr, - "Calculated MMR should match actual MMR from EventStreamHead" - ); + verify_events_with_stream_head(&test_cluster, package_id, &all_events, 10).await; } #[sim_test] @@ -862,8 +940,7 @@ async fn authenticated_events_multiple_events_per_transaction() { let package_id = publish_test_package(&test_cluster).await; let sender = test_cluster.wallet.config.keystore.addresses()[0]; - let _response = - emit_multiple_test_events(&test_cluster, package_id, sender, vec![100, 200, 300]).await; + let _response = emit_multiple_test_events(&test_cluster, package_id, sender, 100, 3).await; let mut event_client = EventServiceClient::connect(test_cluster.rpc_url().to_owned()) .await @@ -907,8 +984,8 @@ async fn authenticated_events_multiple_events_per_transaction() { assert_eq!(values.len(), 3, "should extract 3 event values"); assert!(values.contains(&100), "should contain event with value 100"); - assert!(values.contains(&200), "should contain event with value 200"); - assert!(values.contains(&300), "should contain event with value 300"); + assert!(values.contains(&101), "should contain event with value 101"); + assert!(values.contains(&102), "should contain event with value 102"); let tx_indices: std::collections::HashSet = response .events @@ -922,3 +999,193 @@ async fn authenticated_events_multiple_events_per_transaction() { "all events should be from the same transaction" ); } + +#[sim_test] +async fn test_pagination() { + let _guard: sui_protocol_config::OverrideGuard = + ProtocolConfig::apply_overrides_for_testing(|_, mut cfg| { + cfg.enable_authenticated_event_streams_for_testing(); + cfg + }); + + let rpc_config = create_rpc_config_with_authenticated_events(); + + let test_cluster = TestClusterBuilder::new() + .disable_fullnode_pruning() + .with_rpc_config(rpc_config) + .build() + .await; + + let package_id = publish_test_package(&test_cluster).await; + let sender = test_cluster.wallet.config.keystore.addresses()[0]; + + for i in 0..5 { + emit_multiple_test_events(&test_cluster, package_id, sender, i * 5, 5).await; + } + + let all_events = + list_authenticated_events(test_cluster.rpc_url(), &package_id.to_string(), 0, Some(7)) + .await; + + assert_eq!( + all_events.len(), + 25, + "expected 25 total events across all pages, got {}", + all_events.len() + ); + + verify_events_with_stream_head(&test_cluster, package_id, &all_events, 25).await; +} + +#[sim_test] +async fn test_object_inclusion_proof_error_code() { + let _guard: sui_protocol_config::OverrideGuard = + ProtocolConfig::apply_overrides_for_testing(|_, mut cfg| { + cfg.enable_authenticated_event_streams_for_testing(); + cfg + }); + + let rpc_config = create_rpc_config_with_authenticated_events(); + + let test_cluster = TestClusterBuilder::new() + .disable_fullnode_pruning() + .with_rpc_config(rpc_config) + .build() + .await; + + let package_id = publish_test_package(&test_cluster).await; + let sender = test_cluster.wallet.config.keystore.addresses()[0]; + + emit_test_event(&test_cluster, package_id, sender, 100).await; + + let stream_id = sui_types::base_types::SuiAddress::from(package_id); + let event_stream_head_id = get_event_stream_head_object_id(stream_id).unwrap(); + + let mut event_client = EventServiceClient::connect(test_cluster.rpc_url().to_owned()) + .await + .unwrap(); + + let mut req = ListAuthenticatedEventsRequest::default(); + req.stream_id = Some(package_id.to_string()); + req.start_checkpoint = Some(0); + req.page_size = None; + req.page_token = None; + + let response = event_client + .list_authenticated_events(req) + .await + .unwrap() + .into_inner(); + + assert_eq!(response.events.len(), 1, "should have exactly one event"); + + let event_checkpoint = response.events[0].checkpoint.unwrap(); + + let mut proof_client = ProofServiceClient::connect(test_cluster.rpc_url().to_owned()) + .await + .unwrap(); + + let mut req = GetObjectInclusionProofRequest::default(); + req.object_id = Some(event_stream_head_id.to_string()); + req.checkpoint = Some(event_checkpoint); + + let result = proof_client.get_object_inclusion_proof(req.clone()).await; + assert!( + result.is_ok(), + "Should get inclusion proof at checkpoint where object was modified" + ); + + let checkpoint_without_modification = event_checkpoint + 5; + + req.checkpoint = Some(checkpoint_without_modification); + let result = proof_client.get_object_inclusion_proof(req).await; + + assert!( + result.is_err(), + "Should fail to get inclusion proof at checkpoint where object was not modified" + ); + + let error = result.unwrap_err(); + assert_eq!( + error.code(), + tonic::Code::FailedPrecondition, + "Expected FailedPrecondition error code, got {:?}", + error.code() + ); + + assert!( + error.message().contains("was not written at checkpoint"), + "Error message should explain object was not written at checkpoint. Got: {}", + error.message() + ); +} + +#[sim_test] +async fn test_size_based_pagination() { + let _guard: sui_protocol_config::OverrideGuard = + ProtocolConfig::apply_overrides_for_testing(|_, mut cfg| { + cfg.enable_authenticated_event_streams_for_testing(); + cfg + }); + + let rpc_config = create_rpc_config_with_authenticated_events(); + + let test_cluster = TestClusterBuilder::new() + .disable_fullnode_pruning() + .with_rpc_config(rpc_config) + .build() + .await; + + let package_id = publish_test_package(&test_cluster).await; + let sender = test_cluster.wallet.config.keystore.addresses()[0]; + + emit_large_test_event(&test_cluster, package_id, sender, 1, 200_000).await; + emit_large_test_event(&test_cluster, package_id, sender, 2, 200_000).await; + emit_large_test_event(&test_cluster, package_id, sender, 3, 200_000).await; + emit_large_test_event(&test_cluster, package_id, sender, 4, 200_000).await; + + let mut event_client = EventServiceClient::connect(test_cluster.rpc_url().to_owned()) + .await + .unwrap(); + + let mut req = ListAuthenticatedEventsRequest::default(); + req.stream_id = Some(package_id.to_string()); + req.start_checkpoint = Some(0); + req.page_size = Some(10); + req.page_token = None; + + let first_response = event_client + .list_authenticated_events(req) + .await + .unwrap() + .into_inner(); + + assert!( + first_response.events.len() < 4, + "Should get fewer than 4 events due to size limit (512 KiB). Got {} events", + first_response.events.len() + ); + + assert!( + !first_response.events.is_empty(), + "Should get at least 1 event (forward progress guarantee)" + ); + + assert!( + first_response.next_page_token.is_some(), + "Should have next_page_token since not all events were returned" + ); + + let all_events = + list_authenticated_events(test_cluster.rpc_url(), &package_id.to_string(), 0, Some(10)) + .await; + + assert_eq!( + all_events.len(), + 4, + "expected 4 total events across all pages, got {}", + all_events.len() + ); + + verify_events_with_stream_head(&test_cluster, package_id, &all_events, 4).await; +} diff --git a/crates/sui-e2e-tests/tests/data/auth_event/sources/auth_event.move b/crates/sui-e2e-tests/tests/data/auth_event/sources/auth_event.move index 851ca0e6f1e93..278ab37ead606 100644 --- a/crates/sui-e2e-tests/tests/data/auth_event/sources/auth_event.move +++ b/crates/sui-e2e-tests/tests/data/auth_event/sources/auth_event.move @@ -7,14 +7,29 @@ use sui::event; public struct E has copy, drop { value: u64 } +public struct LargeE has copy, drop { + value: u64, + data: vector, +} + public entry fun emit(value: u64) { event::emit_authenticated(E { value }); } -public entry fun emit_multiple(values: vector) { +public entry fun emit_multiple(start_value: u64, count: u64) { + let mut i = 0; + while (i < count) { + event::emit_authenticated(E { value: start_value + i }); + i = i + 1; + }; +} + +public entry fun emit_large(value: u64, size: u64) { + let mut data = vector::empty(); let mut i = 0; - while (i < values.length()) { - event::emit_authenticated(E { value: values[i] }); + while (i < size) { + data.push_back(0); i = i + 1; }; + event::emit_authenticated(LargeE { value, data }); } diff --git a/crates/sui-e2e-tests/tests/get_object_inclusion_proof_tests.rs b/crates/sui-e2e-tests/tests/get_object_inclusion_proof_tests.rs index d7dd66d247444..592f49964070b 100644 --- a/crates/sui-e2e-tests/tests/get_object_inclusion_proof_tests.rs +++ b/crates/sui-e2e-tests/tests/get_object_inclusion_proof_tests.rs @@ -211,8 +211,8 @@ async fn test_object_not_found_in_checkpoint() { assert!(result.is_err()); let err = result.unwrap_err(); - assert_eq!(err.code(), tonic::Code::NotFound); - assert!(err.message().contains("not found in checkpoint")); + assert_eq!(err.code(), tonic::Code::FailedPrecondition); + assert!(err.message().contains("was not written at checkpoint")); } #[sim_test] diff --git a/crates/sui-rpc-api/src/grpc/alpha/list_authenticated_events.rs b/crates/sui-rpc-api/src/grpc/alpha/list_authenticated_events.rs index 7ca0e5500d408..23eb6e0731c3c 100644 --- a/crates/sui-rpc-api/src/grpc/alpha/list_authenticated_events.rs +++ b/crates/sui-rpc-api/src/grpc/alpha/list_authenticated_events.rs @@ -21,10 +21,9 @@ const MAX_PAGE_SIZE_BYTES: usize = 512 * 1024; // 512KiB #[derive(serde::Serialize, serde::Deserialize)] struct PageToken { stream_id: SuiAddress, - start_checkpoint: u64, - last_event_checkpoint: u64, - last_event_transaction_idx: u32, - last_event_index: u32, + next_checkpoint: u64, + next_transaction_idx: u32, + next_event_idx: u32, } fn to_grpc_event(ev: &sui_types::event::Event) -> Event { @@ -153,59 +152,66 @@ pub fn list_authenticated_events( return Ok(response); } - let start_transaction_idx = page_token.as_ref().map(|t| t.last_event_transaction_idx); - let start_event_idx = page_token.as_ref().map(|t| t.last_event_index); + let (actual_start, start_transaction_idx, start_event_idx) = if let Some(token) = &page_token { + ( + token.next_checkpoint, + Some(token.next_transaction_idx), + Some(token.next_event_idx), + ) + } else { + (start, None, None) + }; let iter = indexes .authenticated_event_iter( stream_addr, - start, + actual_start, start_transaction_idx, start_event_idx, highest_indexed, - page_size, + page_size + 1, ) .map_err(|e| RpcError::new(tonic::Code::Internal, e.to_string()))?; let mut events = Vec::new(); let mut size_bytes = 0; - let mut events_processed: u32 = 0; - let mut last_event_info: Option<(u64, u32, u32)> = None; + let mut next_page_token = None; - for event_result in iter { + for (i, event_result) in iter.enumerate() { let (cp, transaction_idx, event_idx, ev) = event_result.map_err(|e| RpcError::new(tonic::Code::Internal, e.to_string()))?; - // Check if we've reached our size limit - if size_bytes >= MAX_PAGE_SIZE_BYTES { + if i >= page_size as usize { + next_page_token = Some(encode_page_token(PageToken { + stream_id: stream_addr, + next_checkpoint: cp, + next_transaction_idx: transaction_idx, + next_event_idx: event_idx, + })); break; } let authenticated_event = to_authenticated_event(&stream_id, cp, transaction_idx, event_idx, &ev); - size_bytes += authenticated_event.encoded_len(); - events.push(authenticated_event); - last_event_info = Some((cp, transaction_idx, event_idx)); - events_processed += 1; - } + let event_size = authenticated_event.encoded_len(); - let next_page_token = if events_processed == page_size { - last_event_info.map(|(last_cp, last_tx_idx, last_ev_idx)| { - encode_page_token(PageToken { + if i > 0 && size_bytes + event_size > MAX_PAGE_SIZE_BYTES { + next_page_token = Some(encode_page_token(PageToken { stream_id: stream_addr, - start_checkpoint: start, - last_event_checkpoint: last_cp, - last_event_transaction_idx: last_tx_idx, - last_event_index: last_ev_idx, - }) - }) - } else { - None - }; + next_checkpoint: cp, + next_transaction_idx: transaction_idx, + next_event_idx: event_idx, + })); + break; + } + + size_bytes += event_size; + events.push(authenticated_event); + } let mut response = ListAuthenticatedEventsResponse::default(); response.events = events; response.highest_indexed_checkpoint = Some(highest_indexed); - response.next_page_token = next_page_token.map(|token| token.to_vec()); + response.next_page_token = next_page_token.map(|t| t.into()); Ok(response) } diff --git a/crates/sui-rpc-api/src/grpc/alpha/proof_service.rs b/crates/sui-rpc-api/src/grpc/alpha/proof_service.rs index a3ced0c9cf223..c5885af1136df 100644 --- a/crates/sui-rpc-api/src/grpc/alpha/proof_service.rs +++ b/crates/sui-rpc-api/src/grpc/alpha/proof_service.rs @@ -42,6 +42,7 @@ impl ProofService for ProofServiceImpl { fn build_ocs_inclusion_proof( checkpoint: &sui_types::full_checkpoint_content::Checkpoint, object_id: ObjectID, + checkpoint_seq: u64, ) -> Result<(OcsInclusionProof, ObjectRef), RpcError> { let effects_refs: Vec<&_> = checkpoint .transactions @@ -57,8 +58,11 @@ fn build_ocs_inclusion_proof( let object_ref_from_checkpoint = object_states.get(&object_id).ok_or_else(|| { RpcError::new( - tonic::Code::NotFound, - format!("Object {} not found in checkpoint", object_id), + tonic::Code::FailedPrecondition, + format!( + "Object {} was not written at checkpoint {}", + object_id, checkpoint_seq + ), ) })?; @@ -199,7 +203,7 @@ fn get_object_inclusion_proof_impl( .map_err(|e| RpcError::new(tonic::Code::Internal, e.to_string()))?; let (proto_inclusion_proof, object_ref) = - build_ocs_inclusion_proof(&checkpoint_data, object_id)?; + build_ocs_inclusion_proof(&checkpoint_data, object_id, checkpoint_seq)?; let object = reader .get_object_by_key(&object_id, object_ref.1)