diff --git a/rust_snuba/benches/processors.rs b/rust_snuba/benches/processors.rs index 58cf9a75fca..b3ac6d8f53f 100644 --- a/rust_snuba/benches/processors.rs +++ b/rust_snuba/benches/processors.rs @@ -1,3 +1,4 @@ +use std::collections::HashMap; use std::sync::Arc; use std::time::{Duration, SystemTime}; @@ -62,6 +63,7 @@ fn create_factory( python_class_name: python_class_name.into(), python_module: "test".into(), }, + writer_options: HashMap::new(), }; let processing_concurrency = diff --git a/rust_snuba/src/config.rs b/rust_snuba/src/config.rs index 1e974c66e09..eae7ea4ad81 100644 --- a/rust_snuba/src/config.rs +++ b/rust_snuba/src/config.rs @@ -76,6 +76,7 @@ pub struct StorageConfig { pub clickhouse_table_name: String, pub clickhouse_cluster: ClickhouseConfig, pub message_processor: MessageProcessorConfig, + pub writer_options: HashMap, } #[derive(Deserialize, Clone, Debug)] diff --git a/rust_snuba/src/consumer.rs b/rust_snuba/src/consumer.rs index 9da36d0ff22..7ec79f0eee0 100644 --- a/rust_snuba/src/consumer.rs +++ b/rust_snuba/src/consumer.rs @@ -107,7 +107,7 @@ pub fn consumer_impl( for storage in &consumer_config.storages { tracing::info!( - "Storage: {}, ClickHouse Table Name: {}, Message Processor: {:?}, ClickHouse host: {}, ClickHouse port: {}, ClickHouse HTTP port: {}, ClickHouse database: {}", + "Storage: {}, ClickHouse Table Name: {}, Message Processor: {:?}, ClickHouse host: {}, ClickHouse port: {}, ClickHouse HTTP port: {}, ClickHouse database: {}, ClickHouse writer options: {:?}", storage.name, storage.clickhouse_table_name, &storage.message_processor, @@ -115,6 +115,7 @@ pub fn consumer_impl( storage.clickhouse_cluster.port, storage.clickhouse_cluster.http_port, storage.clickhouse_cluster.database, + storage.writer_options, ); } diff --git a/rust_snuba/src/factory.rs b/rust_snuba/src/factory.rs index 0ac543bc1fe..2e3a9fc5437 100644 --- a/rust_snuba/src/factory.rs +++ b/rust_snuba/src/factory.rs @@ -117,6 +117,7 @@ impl ProcessingStrategyFactory for ConsumerStrategyFactory { &self.storage_config.clickhouse_cluster.password, self.async_inserts, self.batch_write_timeout, + &self.storage_config.writer_options, ); let accumulator = Arc::new( diff --git a/rust_snuba/src/strategies/clickhouse/batch.rs b/rust_snuba/src/strategies/clickhouse/batch.rs index b43e5acc495..cb0a85a7abc 100644 --- a/rust_snuba/src/strategies/clickhouse/batch.rs +++ b/rust_snuba/src/strategies/clickhouse/batch.rs @@ -1,3 +1,5 @@ +use std::collections::HashMap; + use reqwest::header::{HeaderMap, HeaderValue, ACCEPT_ENCODING, CONNECTION}; use reqwest::{Client, ClientBuilder}; use sentry_arroyo::gauge; @@ -43,6 +45,7 @@ impl BatchFactory { clickhouse_password: &str, async_inserts: bool, batch_write_timeout: Option, + writer_options: &HashMap, ) -> Self { let mut headers = HeaderMap::with_capacity(5); headers.insert(CONNECTION, HeaderValue::from_static("keep-alive")); @@ -69,6 +72,9 @@ impl BatchFactory { query_params.push_str("&async_insert=1&wait_for_async_insert=1"); } } + for (key, value) in writer_options { + query_params.push_str(&format!("&{key}={value}")); + } let url = format!("http://{hostname}:{http_port}?{query_params}"); let query = format!("INSERT INTO {table} FORMAT JSONEachRow"); @@ -268,6 +274,7 @@ mod tests { "", false, None, + &HashMap::new(), ); let mut batch = factory.new_batch(); @@ -303,6 +310,48 @@ mod tests { "", true, None, + &HashMap::new(), + ); + + let mut batch = factory.new_batch(); + + batch + .write_rows(&RowData::from_encoded_rows(vec![ + br#"{"hello": "world"}"#.to_vec() + ])) + .unwrap(); + + concurrency.handle().block_on(batch.finish()).unwrap(); + + mock.assert(); + } + + #[test] + fn test_write_skip_unknown_fields() { + crate::testutils::initialize_python(); + let server = MockServer::start(); + let mock = server.mock(|when, then| { + when.method(POST) + .query_param("input_format_skip_unknown_fields", "1"); + then.status(200).body("hi"); + }); + + let concurrency = ConcurrencyConfig::new(1); + let writer_options = HashMap::from([( + "input_format_skip_unknown_fields".to_string(), + "1".to_string(), + )]); + let factory = BatchFactory::new( + &server.host(), + server.port(), + "testtable", + "testdb", + &concurrency, + "default", + "", + true, + None, + &writer_options, ); let mut batch = factory.new_batch(); @@ -337,6 +386,7 @@ mod tests { "", false, None, + &HashMap::new(), ); let mut batch = factory.new_batch(); @@ -369,6 +419,7 @@ mod tests { "", false, None, + &HashMap::new(), ); let mut batch = factory.new_batch(); @@ -405,6 +456,7 @@ mod tests { // pass in an unreasonably short timeout // which prevents the client request from reaching Clickhouse Some(Duration::from_millis(0)), + &HashMap::new(), ); let mut batch = factory.new_batch(); @@ -439,6 +491,7 @@ mod tests { true, // pass in a reasonable timeout Some(Duration::from_millis(1000)), + &HashMap::new(), ); let mut batch = factory.new_batch(); diff --git a/snuba/consumers/consumer_config.py b/snuba/consumers/consumer_config.py index 8cc7bb05384..47e12023782 100644 --- a/snuba/consumers/consumer_config.py +++ b/snuba/consumers/consumer_config.py @@ -33,6 +33,7 @@ class StorageConfig: clickhouse_table_name: str clickhouse_cluster: ClickhouseClusterConfig message_processor: MessageProcessorConfig + writer_options: dict[str, str] @dataclass(frozen=True) @@ -171,7 +172,8 @@ def resolve_consumer_config( validate_storages([*storages.values()]) - stream_loader = storages[storage_names[0]].get_table_writer().get_stream_loader() + table_writer = storages[storage_names[0]].get_table_writer() + stream_loader = table_writer.get_stream_loader() default_topic_spec = stream_loader.get_default_topic_spec() resolved_raw_topic = _resolve_topic_config( @@ -273,7 +275,12 @@ def resolve_storage_config( database=cluster.get_database(), ) - processor = storage.get_table_writer().get_stream_loader().get_processor() + table_writer = storage.get_table_writer() + processor = table_writer.get_stream_loader().get_processor() + writer_options = dict(table_writer.get_writer_options() or {}) + # Ensure the writer options are strings + for k, v in writer_options.items(): + writer_options[k] = str(v) table_schema = storage.get_schema() assert isinstance(table_schema, TableSchema) @@ -285,6 +292,7 @@ def resolve_storage_config( python_class_name=processor.__class__.__name__, python_module=processor.__class__.__module__, ), + writer_options=writer_options, ) diff --git a/snuba/datasets/configuration/events/storages/errors.yaml b/snuba/datasets/configuration/events/storages/errors.yaml index 29d32270aca..74b73137853 100644 --- a/snuba/datasets/configuration/events/storages/errors.yaml +++ b/snuba/datasets/configuration/events/storages/errors.yaml @@ -393,6 +393,8 @@ replacer_processor: contexts: [] state_name: errors storage_key_str: errors +writer_options: + input_format_skip_unknown_fields: 1 stream_loader: processor: ErrorsProcessor default_topic: events diff --git a/snuba/datasets/table_storage.py b/snuba/datasets/table_storage.py index 3a675022f46..3ddf6d17b01 100644 --- a/snuba/datasets/table_storage.py +++ b/snuba/datasets/table_storage.py @@ -343,6 +343,9 @@ def get_replacer_processor(self) -> Optional[ReplacerProcessor[Any]]: """ return self.__replacer_processor + def get_writer_options(self) -> ClickhouseWriterOptions: + return self.__writer_options + def __update_writer_options( self, options: ClickhouseWriterOptions = None,