Skip to content

Commit c8a2c16

Browse files
authored
fix for schedular bug with lazy left-join sources (#898)
* add failing test for scheduler bug * additional test * fix bug * changeset * address feedback
1 parent 4121723 commit c8a2c16

File tree

4 files changed

+283
-2
lines changed

4 files changed

+283
-2
lines changed

.changeset/soft-lazy-deps.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"@tanstack/db": patch
3+
---
4+
5+
Fix scheduler handling of lazy left-join/live-query dependencies: treat non-enqueued lazy deps as satisfied to avoid unresolved-dependency deadlocks, and block only when a dep actually has pending work.

packages/db/src/query/live/collection-config-builder.ts

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -406,6 +406,16 @@ export class CollectionConfigBuilder<
406406
return Array.from(deps)
407407
})()
408408

409+
// Ensure dependent builders are actually scheduled in this context so that
410+
// dependency edges always point to a real job (or a deduped no-op if already scheduled).
411+
if (contextId) {
412+
for (const dep of dependentBuilders) {
413+
if (typeof dep.scheduleGraphRun === `function`) {
414+
dep.scheduleGraphRun(undefined, { contextId })
415+
}
416+
}
417+
}
418+
409419
// We intentionally scope deduplication to the builder instance. Each instance
410420
// owns caches and compiled pipelines, so sharing work across instances that
411421
// merely reuse the same string id would execute the wrong builder's graph.
@@ -451,6 +461,13 @@ export class CollectionConfigBuilder<
451461
this.pendingGraphRuns.delete(contextId)
452462
}
453463

464+
/**
465+
* Returns true if this builder has a pending graph run for the given context.
466+
*/
467+
hasPendingGraphRun(contextId: SchedulerContextId): boolean {
468+
return this.pendingGraphRuns.has(contextId)
469+
}
470+
454471
/**
455472
* Executes a pending graph run. Called by the scheduler when dependencies are satisfied.
456473
* Clears the pending state BEFORE execution so that any re-schedules during the run

packages/db/src/scheduler.ts

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,18 @@ interface SchedulerContextState {
2525
completed: Set<unknown>
2626
}
2727

28+
interface PendingAwareJob {
29+
hasPendingGraphRun: (contextId: SchedulerContextId) => boolean
30+
}
31+
32+
function isPendingAwareJob(dep: any): dep is PendingAwareJob {
33+
return (
34+
typeof dep === `object` &&
35+
dep !== null &&
36+
typeof dep.hasPendingGraphRun === `function`
37+
)
38+
}
39+
2840
/**
2941
* Scoped scheduler that coalesces work by context and job.
3042
*
@@ -119,7 +131,19 @@ export class Scheduler {
119131
if (deps) {
120132
ready = true
121133
for (const dep of deps) {
122-
if (dep !== jobId && !completed.has(dep)) {
134+
if (dep === jobId) continue
135+
136+
const depHasPending =
137+
isPendingAwareJob(dep) && dep.hasPendingGraphRun(contextId)
138+
139+
// Treat dependencies as blocking if the dep has a pending run in this
140+
// context or if it's enqueued and not yet complete. If the dep is
141+
// neither pending nor enqueued, consider it satisfied to avoid deadlocks
142+
// on lazy sources that never schedule work.
143+
if (
144+
(jobs.has(dep) && !completed.has(dep)) ||
145+
(!jobs.has(dep) && depHasPending)
146+
) {
123147
ready = false
124148
break
125149
}

packages/db/tests/query/scheduler.test.ts

Lines changed: 236 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
import { afterEach, describe, expect, it, vi } from "vitest"
22
import { createCollection } from "../../src/collection/index.js"
3-
import { createLiveQueryCollection, eq } from "../../src/query/index.js"
3+
import { createLiveQueryCollection, eq, isNull } from "../../src/query/index.js"
44
import { createTransaction } from "../../src/transactions.js"
5+
import { createOptimisticAction } from "../../src/optimistic-action.js"
56
import { transactionScopedScheduler } from "../../src/scheduler.js"
67
import { CollectionConfigBuilder } from "../../src/query/live/collection-config-builder.js"
8+
import { mockSyncCollectionOptions } from "../utils.js"
79
import type { FullSyncState } from "../../src/query/live/types.js"
810
import type { SyncConfig } from "../../src/types.js"
911

@@ -524,4 +526,237 @@ describe(`live query scheduler`, () => {
524526

525527
maybeRunGraphSpy.mockRestore()
526528
})
529+
530+
it(`should handle optimistic mutations with nested left joins without scheduler errors`, async () => {
531+
// This test verifies that optimistic mutations on collections with nested live query
532+
// collections using left joins complete successfully without scheduler errors.
533+
//
534+
// Expected behavior:
535+
// 1. Collections are pre-populated with initialData (via mockSyncCollectionOptions)
536+
// 2. Nested live query collections use left joins
537+
// 3. An optimistic action updates an existing item using draft mutations
538+
// 4. The scheduler should flush the transaction successfully without detecting unresolved dependencies
539+
540+
interface Account {
541+
id: string
542+
user_id: string
543+
name: string
544+
}
545+
546+
interface UserProfile {
547+
id: string
548+
profile: string
549+
}
550+
551+
interface Team {
552+
id: string
553+
account_id: string
554+
deleted_ts: string | null
555+
}
556+
557+
// Use mockSyncCollectionOptions with initialData to match the failing test
558+
// Note: mockSyncCollectionOptions already sets startSync: true internally
559+
const accounts = createCollection<Account>(
560+
mockSyncCollectionOptions({
561+
id: `left-join-bug-accounts`,
562+
getKey: (account) => account.id,
563+
initialData: [
564+
{ id: `account-1`, user_id: `user-1`, name: `Account 1` },
565+
],
566+
})
567+
)
568+
569+
const users = createCollection<UserProfile>(
570+
mockSyncCollectionOptions({
571+
id: `left-join-bug-users`,
572+
getKey: (user) => user.id,
573+
initialData: [{ id: `user-1`, profile: `Profile 1` }],
574+
})
575+
)
576+
577+
const teams = createCollection<Team>(
578+
mockSyncCollectionOptions({
579+
id: `left-join-bug-teams`,
580+
getKey: (team) => team.id,
581+
initialData: [
582+
{
583+
id: `team-1`,
584+
account_id: `account-1`,
585+
deleted_ts: null as string | null,
586+
},
587+
],
588+
})
589+
)
590+
591+
// Create nested live query collections similar to the bug report
592+
const accountsWithUsers = createLiveQueryCollection({
593+
id: `left-join-bug-accounts-with-users`,
594+
startSync: true,
595+
query: (q) =>
596+
q
597+
.from({ account: accounts })
598+
.join({ user: users }, ({ user, account }) =>
599+
eq(user.id, account.user_id)
600+
)
601+
.select(({ account, user }) => ({
602+
account: account,
603+
profile: user?.profile,
604+
})),
605+
})
606+
607+
const activeTeams = createLiveQueryCollection({
608+
id: `left-join-bug-active-teams`,
609+
startSync: true,
610+
query: (q) =>
611+
q
612+
.from({ team: teams })
613+
.where(({ team }) => isNull(team.deleted_ts))
614+
.select(({ team }) => ({ team })),
615+
})
616+
617+
const accountsWithTeams = createLiveQueryCollection({
618+
id: `left-join-bug-accounts-with-teams`,
619+
startSync: true,
620+
query: (q) =>
621+
q
622+
.from({ accountWithUser: accountsWithUsers })
623+
.leftJoin({ team: activeTeams }, ({ accountWithUser, team }) =>
624+
eq(team.team.account_id, accountWithUser.account.id)
625+
)
626+
.select(({ accountWithUser, team }) => ({
627+
account: accountWithUser.account,
628+
profile: accountWithUser.profile,
629+
team: team?.team,
630+
})),
631+
})
632+
633+
// Wait for all queries to be ready
634+
await Promise.all([
635+
accountsWithUsers.preload(),
636+
activeTeams.preload(),
637+
accountsWithTeams.preload(),
638+
])
639+
640+
// Create an optimistic action that mutates using draft
641+
const testAction = createOptimisticAction<string>({
642+
onMutate: (id) => {
643+
// Update existing data using draft mutation
644+
accounts.update(id, (draft) => {
645+
draft.name = `new name here`
646+
})
647+
},
648+
mutationFn: (_id, _params) => {
649+
return Promise.resolve({ txid: 0 })
650+
},
651+
})
652+
653+
// Execute the optimistic action and flush - this should complete without scheduler errors
654+
let error: Error | undefined
655+
let transaction: any
656+
657+
try {
658+
transaction = testAction(`account-1`)
659+
660+
// Wait for the transaction to process
661+
await new Promise((resolve) => setTimeout(resolve, 10))
662+
663+
// The scheduler should flush successfully without detecting unresolved dependencies
664+
transactionScopedScheduler.flushAll()
665+
} catch (e) {
666+
error = e as Error
667+
}
668+
669+
// The scheduler should not throw unresolved dependency errors
670+
expect(error).toBeUndefined()
671+
672+
// Verify the transaction was created successfully
673+
expect(transaction).toBeDefined()
674+
})
675+
676+
it(`should prevent stale data when lazy source also depends on modified collection`, async () => {
677+
interface BaseItem {
678+
id: string
679+
value: number
680+
}
681+
682+
// Base collection
683+
const baseCollection = createCollection<BaseItem>(
684+
mockSyncCollectionOptions({
685+
id: `race-base`,
686+
getKey: (item) => item.id,
687+
initialData: [{ id: `1`, value: 10 }],
688+
})
689+
)
690+
691+
// QueryA: depends on base
692+
const queryA = createLiveQueryCollection({
693+
id: `race-queryA`,
694+
startSync: true,
695+
query: (q) =>
696+
q.from({ item: baseCollection }).select(({ item }) => ({
697+
id: item.id,
698+
value: item.value,
699+
})),
700+
})
701+
702+
// QueryB: also depends on base (independent from queryA)
703+
const queryB = createLiveQueryCollection({
704+
id: `race-queryB`,
705+
startSync: true,
706+
query: (q) =>
707+
q.from({ item: baseCollection }).select(({ item }) => ({
708+
id: item.id,
709+
value: item.value,
710+
})),
711+
})
712+
713+
// QueryC: depends on queryA, left joins queryB (lazy)
714+
const queryC = createLiveQueryCollection({
715+
id: `race-queryC`,
716+
startSync: true,
717+
query: (q) =>
718+
q
719+
.from({ a: queryA })
720+
.leftJoin({ b: queryB }, ({ a, b }) => eq(a.id, b.id))
721+
.select(({ a, b }) => ({
722+
id: a.id,
723+
aValue: a.value,
724+
bValue: b?.value ?? null,
725+
})),
726+
})
727+
728+
// Wait for initial sync
729+
await Promise.all([queryA.preload(), queryB.preload(), queryC.preload()])
730+
731+
// Verify initial state
732+
const initialC = [...queryC.values()][0]
733+
expect(initialC?.aValue).toBe(10)
734+
expect(initialC?.bValue).toBe(10)
735+
736+
// Mutate the base collection
737+
const action = createOptimisticAction<string>({
738+
autoCommit: false,
739+
onMutate: (id) => {
740+
baseCollection.update(id, (draft) => {
741+
draft.value = 100
742+
})
743+
},
744+
mutationFn: (_id) => Promise.resolve({ txid: 0 }),
745+
})
746+
747+
let error: Error | undefined
748+
try {
749+
action(`1`)
750+
await new Promise((resolve) => setTimeout(resolve, 10))
751+
transactionScopedScheduler.flushAll()
752+
} catch (e) {
753+
error = e as Error
754+
}
755+
756+
expect(error).toBeUndefined()
757+
758+
const finalC = [...queryC.values()][0]
759+
expect(finalC?.aValue).toBe(100)
760+
expect(finalC?.bValue).toBe(100)
761+
})
527762
})

0 commit comments

Comments
 (0)