Skip to content

Commit d217a6c

Browse files
committed
Remove the default set of schemas from the Compactor
These are effectively never used in prod - we always have at least one set of schemas available in practice - and once we stop asserting the schemas on handle creation, this would let us write batches that don't match the shard schema.
1 parent da55b89 commit d217a6c

File tree

5 files changed

+38
-48
lines changed

5 files changed

+38
-48
lines changed

src/persist-client/src/cli/admin.rs

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -754,12 +754,7 @@ pub async fn dangerous_force_compaction_and_break_pushdown<K, V, T, D>(
754754
);
755755
machine.applier.metrics.compaction.requested.inc();
756756
let start = Instant::now();
757-
let res = Compactor::<K, V, T, D>::compact_and_apply(
758-
&machine,
759-
req,
760-
write.write_schemas.clone(),
761-
)
762-
.await;
757+
let res = Compactor::<K, V, T, D>::compact_and_apply(&machine, req).await;
763758
let apply_maintenance = match res {
764759
Ok(x) => x,
765760
Err(err) => {

src/persist-client/src/internal/compact.rs

Lines changed: 27 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,6 @@ where
180180
pub fn new(
181181
cfg: PersistConfig,
182182
metrics: Arc<Metrics>,
183-
write_schemas: Schemas<K, V>,
184183
gc: GarbageCollector<K, V, T, D>,
185184
) -> Self {
186185
let (compact_req_sender, mut compact_req_receiver) = mpsc::channel::<(
@@ -242,14 +241,12 @@ where
242241
.queued_seconds
243242
.inc_by(enqueued.elapsed().as_secs_f64());
244243

245-
let write_schemas = write_schemas.clone();
246-
247244
let compact_span =
248245
debug_span!(parent: None, "compact::apply", shard_id=%machine.shard_id());
249246
compact_span.follows_from(&Span::current());
250247
let gc = gc.clone();
251248
mz_ore::task::spawn(|| "PersistCompactionWorker", async move {
252-
let res = Self::compact_and_apply(&machine, req, write_schemas)
249+
let res = Self::compact_and_apply(&machine, req)
253250
.instrument(compact_span)
254251
.await;
255252
if let Ok(maintenance) = res {
@@ -328,7 +325,6 @@ where
328325
pub(crate) async fn compact_and_apply(
329326
machine: &Machine<K, V, T, D>,
330327
req: CompactReq<T>,
331-
write_schemas: Schemas<K, V>,
332328
) -> Result<RoutineMaintenance, anyhow::Error> {
333329
let metrics = Arc::clone(&machine.applier.metrics);
334330
metrics.compaction.started.inc();
@@ -349,38 +345,38 @@ where
349345
);
350346
// always use most recent schema from all the Runs we're compacting to prevent Compactors
351347
// created before the schema was evolved, from trying to "de-evolve" a Part.
352-
let compaction_schema_id = req
348+
let Some(compaction_schema_id) = req
353349
.inputs
354350
.iter()
355351
.flat_map(|batch| batch.batch.run_meta.iter())
356352
.filter_map(|run_meta| run_meta.schema)
357353
// It's an invariant that SchemaIds are ordered.
358-
.max();
359-
let maybe_compaction_schema = match compaction_schema_id {
360-
Some(id) => machine
361-
.get_schema(id)
362-
.map(|(key_schema, val_schema)| (id, key_schema, val_schema)),
363-
None => None,
354+
.max()
355+
else {
356+
metrics.compaction.schema_selection.no_schema.inc();
357+
metrics.compaction.failed.inc();
358+
return Err(anyhow!(
359+
"compacting {shard_id} and spine ids {spine_ids}: could not determine schema id from inputs",
360+
shard_id = req.shard_id,
361+
spine_ids = mz_ore::str::separated(", ", req.inputs.iter().map(|i| i.id))
362+
));
364363
};
365-
let use_most_recent_schema = COMPACTION_USE_MOST_RECENT_SCHEMA.get(&machine.applier.cfg);
366-
367-
let compaction_schema = match maybe_compaction_schema {
368-
Some((id, key_schema, val_schema)) if use_most_recent_schema => {
369-
metrics.compaction.schema_selection.recent_schema.inc();
370-
Schemas {
371-
id: Some(id),
372-
key: Arc::new(key_schema),
373-
val: Arc::new(val_schema),
374-
}
375-
}
376-
Some(_) => {
377-
metrics.compaction.schema_selection.disabled.inc();
378-
write_schemas
379-
}
380-
None => {
381-
metrics.compaction.schema_selection.no_schema.inc();
382-
write_schemas
383-
}
364+
let Some((key_schema, val_schema)) = machine.get_schema(compaction_schema_id) else {
365+
metrics.compaction.schema_selection.no_schema.inc();
366+
metrics.compaction.failed.inc();
367+
return Err(anyhow!(
368+
"compacting {shard_id} and spine ids {spine_ids}: schema id {compaction_schema_id} not present in machine state",
369+
shard_id = req.shard_id,
370+
spine_ids = mz_ore::str::separated(", ", req.inputs.iter().map(|i| i.id))
371+
));
372+
};
373+
374+
metrics.compaction.schema_selection.recent_schema.inc();
375+
376+
let compaction_schema = Schemas {
377+
id: Some(compaction_schema_id),
378+
key: Arc::new(key_schema),
379+
val: Arc::new(val_schema),
384380
};
385381

386382
trace!(

src/persist-client/src/internal/metrics.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -955,15 +955,13 @@ impl CompactionStepTimings {
955955
pub struct CompactionSchemaSelection {
956956
pub(crate) recent_schema: Counter,
957957
pub(crate) no_schema: Counter,
958-
pub(crate) disabled: Counter,
959958
}
960959

961960
impl CompactionSchemaSelection {
962961
fn new(schema_selection: CounterVec) -> CompactionSchemaSelection {
963962
CompactionSchemaSelection {
964963
recent_schema: schema_selection.with_label_values(&["recent"]),
965964
no_schema: schema_selection.with_label_values(&["none"]),
966-
disabled: schema_selection.with_label_values(&["disabled"]),
967965
}
968966
}
969967
}

src/persist-client/src/internal/trace.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@
4949
5050
use std::cmp::Ordering;
5151
use std::collections::{BTreeMap, BTreeSet};
52-
use std::fmt::Debug;
52+
use std::fmt::{Debug, Display};
5353
use std::mem;
5454
use std::ops::Range;
5555
use std::sync::Arc;
@@ -726,6 +726,12 @@ pub enum CompactionInput {
726726
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
727727
pub struct SpineId(pub usize, pub usize);
728728

729+
impl Display for SpineId {
730+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
731+
write!(f, "[{}, {})", self.0, self.1)
732+
}
733+
}
734+
729735
impl Serialize for SpineId {
730736
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
731737
where

src/persist-client/src/write.rs

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -160,14 +160,9 @@ where
160160
write_schemas: Schemas<K, V>,
161161
) -> Self {
162162
let isolated_runtime = Arc::clone(&machine.isolated_runtime);
163-
let compact = cfg.compaction_enabled.then(|| {
164-
Compactor::new(
165-
cfg.clone(),
166-
Arc::clone(&metrics),
167-
write_schemas.clone(),
168-
gc.clone(),
169-
)
170-
});
163+
let compact = cfg
164+
.compaction_enabled
165+
.then(|| Compactor::new(cfg.clone(), Arc::clone(&metrics), gc.clone()));
171166
let debug_state = HandleDebugState {
172167
hostname: cfg.hostname.to_owned(),
173168
purpose: purpose.to_owned(),

0 commit comments

Comments
 (0)