Skip to content

Commit 03842c6

Browse files
authored
fix(electric-db-colleciton): ignore snapshot-end before up-to-date (#924)
* add failing test * fix it * changeset
1 parent 4eb7671 commit 03842c6

File tree

3 files changed

+100
-20
lines changed

3 files changed

+100
-20
lines changed
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@tanstack/electric-db-collection": patch
3+
---
4+
5+
Fix eager mode incorrectly committing data on `snapshot-end` before receiving the first `up-to-date` message. The `snapshot-end` in the Electric log can be from a significant period before the stream is actually up to date, so commits should only occur on `up-to-date` (or on `snapshot-end` after the first `up-to-date` has been received). This change does not affect `on-demand` mode where `snapshot-end` correctly triggers commits, or `progressive` mode which was already protected by its buffering mechanism.

packages/electric-db-collection/src/electric.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1038,7 +1038,13 @@ function createElectricSync<T extends Row<unknown>>(
10381038
)
10391039
} else {
10401040
// Normal mode or on-demand: commit transaction if one was started
1041-
if (transactionStarted) {
1041+
// In eager mode, only commit on snapshot-end if we've already received
1042+
// the first up-to-date, because the snapshot-end in the log could be from
1043+
// a significant period before the stream is actually up to date
1044+
const shouldCommit =
1045+
hasUpToDate || syncMode === `on-demand` || hasReceivedUpToDate
1046+
1047+
if (transactionStarted && shouldCommit) {
10421048
commit()
10431049
transactionStarted = false
10441050
}

packages/electric-db-collection/tests/electric.test.ts

Lines changed: 88 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -2264,22 +2264,35 @@ describe(`Electric Integration`, () => {
22642264

22652265
// Tests for commit and ready behavior with snapshot-end and up-to-date messages
22662266
describe(`Commit and ready behavior`, () => {
2267-
it(`should commit on snapshot-end in eager mode but not mark ready`, () => {
2267+
it(`should ignore snapshot-end before first up-to-date in progressive mode`, () => {
2268+
vi.clearAllMocks()
2269+
2270+
let testSubscriber!: (messages: Array<Message<Row>>) => void
2271+
mockSubscribe.mockImplementation((callback) => {
2272+
testSubscriber = callback
2273+
return () => {}
2274+
})
2275+
mockRequestSnapshot.mockResolvedValue(undefined)
2276+
mockFetchSnapshot.mockResolvedValue({ metadata: {}, data: [] })
2277+
22682278
const config = {
2269-
id: `eager-snapshot-end-test`,
2279+
id: `progressive-ignore-snapshot-end-test`,
22702280
shapeOptions: {
22712281
url: `http://test-url`,
22722282
params: { table: `test_table` },
22732283
},
2274-
syncMode: `eager` as const,
2284+
syncMode: `progressive` as const,
22752285
getKey: (item: Row) => item.id as number,
22762286
startSync: true,
22772287
}
22782288

22792289
const testCollection = createCollection(electricCollectionOptions(config))
22802290

22812291
// Send data followed by snapshot-end (but no up-to-date)
2282-
subscriber([
2292+
// In progressive mode, these messages should be BUFFERED, and snapshot-end
2293+
// should NOT trigger a commit because the snapshot-end in the log could be
2294+
// from a significant period before the stream is actually up to date
2295+
testSubscriber([
22832296
{
22842297
key: `1`,
22852298
value: { id: 1, name: `Test User` },
@@ -2295,21 +2308,74 @@ describe(`Electric Integration`, () => {
22952308
},
22962309
])
22972310

2298-
// Data should be committed (available in state)
2311+
// Data should NOT be visible yet (snapshot-end should be ignored before up-to-date)
2312+
expect(testCollection.has(1)).toBe(false)
2313+
expect(testCollection.status).toBe(`loading`)
2314+
2315+
// Now send up-to-date (triggers atomic swap)
2316+
testSubscriber([
2317+
{
2318+
headers: { control: `up-to-date` },
2319+
},
2320+
])
2321+
2322+
// Now data should be visible after atomic swap
22992323
expect(testCollection.has(1)).toBe(true)
23002324
expect(testCollection.get(1)).toEqual({ id: 1, name: `Test User` })
2325+
expect(testCollection.status).toBe(`ready`)
2326+
})
23012327

2302-
// But collection should NOT be marked as ready yet in eager mode
2303-
expect(testCollection.status).toBe(`loading`)
2328+
it(`should commit on snapshot-end in eager mode AFTER first up-to-date`, () => {
2329+
const config = {
2330+
id: `eager-snapshot-end-test`,
2331+
shapeOptions: {
2332+
url: `http://test-url`,
2333+
params: { table: `test_table` },
2334+
},
2335+
syncMode: `eager` as const,
2336+
getKey: (item: Row) => item.id as number,
2337+
startSync: true,
2338+
}
23042339

2305-
// Now send up-to-date
2340+
const testCollection = createCollection(electricCollectionOptions(config))
2341+
2342+
// First send up-to-date (with initial data) to establish the connection
23062343
subscriber([
2344+
{
2345+
key: `1`,
2346+
value: { id: 1, name: `Test User` },
2347+
headers: { operation: `insert` },
2348+
},
23072349
{
23082350
headers: { control: `up-to-date` },
23092351
},
23102352
])
23112353

2312-
// Now it should be ready
2354+
// Data should be committed and collection ready
2355+
expect(testCollection.has(1)).toBe(true)
2356+
expect(testCollection.status).toBe(`ready`)
2357+
2358+
// Now send more data followed by snapshot-end (simulating incremental snapshot)
2359+
// After the first up-to-date, snapshot-end SHOULD commit
2360+
subscriber([
2361+
{
2362+
key: `2`,
2363+
value: { id: 2, name: `Second User` },
2364+
headers: { operation: `insert` },
2365+
},
2366+
{
2367+
headers: {
2368+
control: `snapshot-end`,
2369+
xmin: `100`,
2370+
xmax: `110`,
2371+
xip_list: [],
2372+
},
2373+
},
2374+
])
2375+
2376+
// Data should be committed since we've already received up-to-date
2377+
expect(testCollection.has(2)).toBe(true)
2378+
expect(testCollection.get(2)).toEqual({ id: 2, name: `Second User` })
23132379
expect(testCollection.status).toBe(`ready`)
23142380
})
23152381

@@ -2415,7 +2481,7 @@ describe(`Electric Integration`, () => {
24152481
expect(testCollection.status).toBe(`ready`)
24162482
})
24172483

2418-
it(`should commit multiple snapshot-end messages before up-to-date in eager mode`, () => {
2484+
it(`should NOT commit multiple snapshot-end messages before up-to-date in eager mode`, () => {
24192485
const config = {
24202486
id: `eager-multiple-snapshots-test`,
24212487
shapeOptions: {
@@ -2429,7 +2495,7 @@ describe(`Electric Integration`, () => {
24292495

24302496
const testCollection = createCollection(electricCollectionOptions(config))
24312497

2432-
// First snapshot with data
2498+
// First snapshot with data - snapshot-end should be ignored before up-to-date
24332499
subscriber([
24342500
{
24352501
key: `1`,
@@ -2446,11 +2512,11 @@ describe(`Electric Integration`, () => {
24462512
},
24472513
])
24482514

2449-
// First data should be committed
2450-
expect(testCollection.has(1)).toBe(true)
2515+
// First data should NOT be committed yet (snapshot-end ignored before up-to-date)
2516+
expect(testCollection.has(1)).toBe(false)
24512517
expect(testCollection.status).toBe(`loading`)
24522518

2453-
// Second snapshot with more data
2519+
// Second snapshot with more data - still before up-to-date, so should be ignored
24542520
subscriber([
24552521
{
24562522
key: `2`,
@@ -2467,19 +2533,22 @@ describe(`Electric Integration`, () => {
24672533
},
24682534
])
24692535

2470-
// Second data should also be committed
2471-
expect(testCollection.has(2)).toBe(true)
2472-
expect(testCollection.size).toBe(2)
2536+
// Second data should also NOT be committed yet
2537+
expect(testCollection.has(2)).toBe(false)
2538+
expect(testCollection.size).toBe(0)
24732539
expect(testCollection.status).toBe(`loading`)
24742540

2475-
// Finally send up-to-date
2541+
// Finally send up-to-date - this commits all the pending data
24762542
subscriber([
24772543
{
24782544
headers: { control: `up-to-date` },
24792545
},
24802546
])
24812547

2482-
// Now should be ready
2548+
// Now all data should be committed and collection ready
2549+
expect(testCollection.has(1)).toBe(true)
2550+
expect(testCollection.has(2)).toBe(true)
2551+
expect(testCollection.size).toBe(2)
24832552
expect(testCollection.status).toBe(`ready`)
24842553
})
24852554

0 commit comments

Comments
 (0)