Skip to content

Commit a7253bd

Browse files
author
Stephane Royer
committed
Refactor DbContext handling and update version to 2.3.21-beta.0
1 parent 0686bd5 commit a7253bd

File tree

11 files changed

+113
-76
lines changed

11 files changed

+113
-76
lines changed

src/Paillave.EntityFrameworkCoreExtension/EfSave/DbContextSaveExtensions.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,9 @@ namespace Paillave.EntityFrameworkCoreExtension.EfSave;
1010
public static class DbContextSaveExtensions
1111
{
1212
public static Task EfSaveAsync<T>(this DbContext context, IList<T> entities, Expression<Func<T, object>> pivotKey, bool doNotUpdateIfExists = false, bool insertOnly = false) where T : class
13-
=> EfSaveAsync(context, entities, new Expression<Func<T, object>>[] { pivotKey }, CancellationToken.None, doNotUpdateIfExists, insertOnly);
13+
=> EfSaveAsync(context, entities, [pivotKey], CancellationToken.None, doNotUpdateIfExists, insertOnly);
1414
public static Task EfSaveAsync<T>(this DbContext context, IList<T> entities, Expression<Func<T, object>> pivotKey, CancellationToken cancellationToken, bool doNotUpdateIfExists = false, bool insertOnly = false) where T : class
15-
=> EfSaveAsync(context, entities, new Expression<Func<T, object>>[] { pivotKey }, cancellationToken, doNotUpdateIfExists, insertOnly);
15+
=> EfSaveAsync(context, entities, [pivotKey], cancellationToken, doNotUpdateIfExists, insertOnly);
1616
public static Task EfSaveAsync<T>(this DbContext context, IList<T> entities, Expression<Func<T, object>>[] pivotKeys, bool doNotUpdateIfExists = false, bool insertOnly = false) where T : class
1717
=> EfSaveAsync<T>(context, entities, pivotKeys, CancellationToken.None, doNotUpdateIfExists, insertOnly);
1818
public static async Task EfSaveAsync<T>(this DbContext context, IList<T> entities, Expression<Func<T, object>>[] pivotKeys, CancellationToken? cancellationToken = null, bool doNotUpdateIfExists = false, bool insertOnly = false) where T : class
Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,9 @@
11
using System.Linq;
22
using Microsoft.EntityFrameworkCore;
33

4-
namespace Paillave.Etl.EntityFrameworkCore
4+
namespace Paillave.Etl.EntityFrameworkCore;
5+
6+
public class DbContextWrapper(DbContext dbContext)
57
{
6-
public class DbContextWrapper
7-
{
8-
private readonly DbContext _dbContext;
9-
public DbContextWrapper(DbContext dbContext) => _dbContext = dbContext;
10-
public IQueryable<T> Set<T>() where T : class => _dbContext.Set<T>().AsNoTracking();
11-
}
12-
}
8+
public IQueryable<T> Set<T>() where T : class => dbContext.Set<T>().AsNoTracking();
9+
}

src/Paillave.Etl.EntityFrameworkCore/DeleteEntityFrameworkCoreStreamNode.cs

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ public class DeleteEntityFrameworkCoreArgs<TIn, TValue, TEntity>
8686
public IStream<TIn> InputStream { get; set; }
8787
public Expression<Func<TValue, TEntity, bool>> Match { get; set; }
8888
public Func<TIn, TValue> GetValue { get; set; }
89-
internal string KeyedConnection { get; set; } = null;
89+
internal string? KeyedConnection { get; set; } = null;
9090
}
9191
public class DeleteEntityFrameworkCoreStreamNode<TIn, TValue, TEntity> : StreamNodeBase<TIn, IStream<TIn>, DeleteEntityFrameworkCoreArgs<TIn, TValue, TEntity>>
9292
where TEntity : class
@@ -104,9 +104,8 @@ protected override IStream<TIn> CreateOutputStream(DeleteEntityFrameworkCoreArgs
104104
var matchingS = args.InputStream.Observable
105105
.Map(i =>
106106
{
107-
var ctx = args.KeyedConnection == null
108-
? this.ExecutionContext.DependencyResolver.Resolve<DbContext>()
109-
: this.ExecutionContext.DependencyResolver.Resolve<DbContext>(args.KeyedConnection);
107+
var ctx = this.ExecutionContext.DependencyResolver.ResolveDbContext<DbContext>(args.KeyedConnection)
108+
?? throw new InvalidOperationException($"No DbContext could be resolved for type '{typeof(DbContext).FullName}'. Please check your dependency injection configuration.");
110109
TValue val = args.GetValue(i);
111110
this.ExecutionContext.InvokeInDedicatedThreadAsync(ctx, async () =>
112111
{
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
using System;
2+
using Microsoft.EntityFrameworkCore;
3+
using Paillave.Etl.Core;
4+
5+
namespace Paillave.Etl.EntityFrameworkCore;
6+
7+
public static class DependencyResolverExtension
8+
{
9+
public static DisposeWrapper<DbContext>? ResolveDbContextWrapper<TDbContext>(this IDependencyResolver dependencyResolver, string? keyedConnection = null) where TDbContext : DbContext
10+
=> dependencyResolver.ResolveDbContextWrapper(typeof(TDbContext), keyedConnection);
11+
public static DisposeWrapper<DbContext>? ResolveDbContextWrapper(this IDependencyResolver dependencyResolver, Type? dbContextType, string? keyedConnection = null)
12+
{
13+
dbContextType ??= typeof(DbContext);
14+
var dbContextFactoryType = typeof(IDbContextFactory<>).MakeGenericType(dbContextType);
15+
if (keyedConnection == null)
16+
{
17+
if (dependencyResolver.TryResolve(dbContextFactoryType, out var dbContextFactory))
18+
{
19+
var contextFactory = (DbContext)dependencyResolver.Resolve(dbContextType);
20+
var dbContext = dbContextFactoryType.GetMethod(nameof(IDbContextFactory<DbContext>.CreateDbContext))?.Invoke(contextFactory, null)!;
21+
return new DisposeWrapper<DbContext>((DbContext)dbContext, true);
22+
}
23+
else if (dependencyResolver.TryResolve(dbContextType, out var dbContext))
24+
{
25+
return new DisposeWrapper<DbContext>((DbContext)dbContext, false);
26+
}
27+
return null;
28+
}
29+
else
30+
{
31+
if (dependencyResolver.TryResolve(dbContextFactoryType, keyedConnection, out var dbContextFactory))
32+
{
33+
var contextFactory = (DbContext)dependencyResolver.Resolve(dbContextType);
34+
var dbContext = dbContextFactoryType.GetMethod(nameof(IDbContextFactory<DbContext>.CreateDbContext))?.Invoke(contextFactory, null)!;
35+
return new DisposeWrapper<DbContext>((DbContext)dbContext, true);
36+
}
37+
else if (dependencyResolver.TryResolve(dbContextType, keyedConnection, out var dbContext))
38+
{
39+
return new DisposeWrapper<DbContext>((DbContext)dbContext, false);
40+
}
41+
return null;
42+
}
43+
}
44+
public static DbContext? ResolveDbContext<TDbContext>(this IDependencyResolver dependencyResolver, string? keyedConnection = null) where TDbContext : DbContext
45+
=> dependencyResolver.ResolveDbContext(typeof(TDbContext), keyedConnection);
46+
public static DbContext? ResolveDbContext(this IDependencyResolver dependencyResolver, Type? dbContextType, string? keyedConnection = null)
47+
{
48+
dbContextType ??= typeof(DbContext);
49+
var dbContextFactoryType = typeof(IDbContextFactory<>).MakeGenericType(dbContextType);
50+
if (keyedConnection == null)
51+
{
52+
if (dependencyResolver.TryResolve(dbContextFactoryType, out var dbContextFactory))
53+
{
54+
var contextFactory = (DbContext)dependencyResolver.Resolve(dbContextType);
55+
var dbContext = dbContextFactoryType.GetMethod(nameof(IDbContextFactory<DbContext>.CreateDbContext))?.Invoke(contextFactory, null)!;
56+
return (DbContext)dbContext;
57+
}
58+
else if (dependencyResolver.TryResolve(dbContextType, out var dbContext))
59+
{
60+
return (DbContext)dbContext;
61+
}
62+
return null;
63+
}
64+
else
65+
{
66+
if (dependencyResolver.TryResolve(dbContextFactoryType, keyedConnection, out var dbContextFactory))
67+
{
68+
var contextFactory = (DbContext)dependencyResolver.Resolve(dbContextType);
69+
var dbContext = dbContextFactoryType.GetMethod(nameof(IDbContextFactory<DbContext>.CreateDbContext))?.Invoke(contextFactory, null)!;
70+
return (DbContext)dbContext;
71+
}
72+
else if (dependencyResolver.TryResolve(dbContextType, keyedConnection, out var dbContext))
73+
{
74+
return (DbContext)dbContext;
75+
}
76+
return null;
77+
}
78+
}
79+
}

src/Paillave.Etl.EntityFrameworkCore/EfCoreLookupStreamNode.cs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -200,9 +200,8 @@ protected override IStream<TOut> CreateOutputStream(EfCoreLookupArgs<TIn, TValue
200200
{
201201
var matcher = this.ExecutionContext.ContextBag.Resolve(this.NodeName, () =>
202202
{
203-
var ctx = args.KeyedConnection == null
204-
? this.ExecutionContext.DependencyResolver.Resolve<DbContext>()
205-
: this.ExecutionContext.DependencyResolver.Resolve<DbContext>(args.KeyedConnection);
203+
var ctx = this.ExecutionContext.DependencyResolver.ResolveDbContext<DbContext>(args.KeyedConnection)
204+
?? throw new InvalidOperationException($"No DbContext could be resolved for type '{typeof(DbContext).FullName}'. Please check your dependency injection configuration.");
206205
return this.ExecutionContext.InvokeInDedicatedThreadAsync(ctx, () => new EfMatcher<TValue, TEntity, TKey>(new EfMatcherConfig<TValue, TEntity, TKey>
207206
{
208207
Context = ctx,

src/Paillave.Etl.EntityFrameworkCore/EfCoreSaveStreamNode.cs

Lines changed: 6 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ private TArgs UpdateArgs<TArgs>(TArgs args) where TArgs : IThroughEntityFramewor
135135
return args;
136136
}
137137
public EfCoreSaveCorrelatedArgsBuilder<TNewInEf, TIn, TNewInEf> Entity<TNewInEf>(Func<TIn, TNewInEf> getEntity) where TNewInEf : class
138-
=> new EfCoreSaveCorrelatedArgsBuilder<TNewInEf, TIn, TNewInEf>(UpdateArgs(new EfCoreSaveArgs<TNewInEf, Correlated<TIn>, Correlated<TNewInEf>>
138+
=> new(UpdateArgs(new EfCoreSaveArgs<TNewInEf, Correlated<TIn>, Correlated<TNewInEf>>
139139
{
140140
GetEntity = i => getEntity(i.Row),
141141
GetOutput = (i, j) => new Correlated<TNewInEf> { Row = j, CorrelationKeys = i.CorrelationKeys }
@@ -227,17 +227,13 @@ public enum SaveMode
227227
EntityFrameworkCore,
228228
SqlServerBulk,
229229
}
230-
public class EfCoreSaveStreamNode<TInEf, TIn, TOut> : StreamNodeBase<TOut, IStream<TOut>, EfCoreSaveArgs<TInEf, TIn, TOut>>
230+
public class EfCoreSaveStreamNode<TInEf, TIn, TOut>(string name, EfCoreSaveArgs<TInEf, TIn, TOut> args) : StreamNodeBase<TOut, IStream<TOut>, EfCoreSaveArgs<TInEf, TIn, TOut>>(name, args)
231231
where TInEf : class
232232
{
233233
public override ProcessImpact PerformanceImpact => ProcessImpact.Heavy;
234234

235235
public override ProcessImpact MemoryFootPrint => ProcessImpact.Light;
236236

237-
public EfCoreSaveStreamNode(string name, EfCoreSaveArgs<TInEf, TIn, TOut> args) : base(name, args)
238-
{
239-
}
240-
241237
protected override IStream<TOut> CreateOutputStream(EfCoreSaveArgs<TInEf, TIn, TOut> args)
242238
{
243239
var ret = args.SourceStream.Observable
@@ -250,53 +246,20 @@ protected override IStream<TOut> CreateOutputStream(EfCoreSaveArgs<TInEf, TIn, T
250246
}
251247
private void ProcessChunk(List<(TIn Input, TInEf Entity)> i)
252248
{
253-
using var dbContextWrapper = this.ResolveDbContext();
249+
using var dbContextWrapper = this.ExecutionContext.DependencyResolver.ResolveDbContextWrapper(this.Args.DbContextType, this.Args.KeyedConnection);
254250
this.ExecutionContext.InvokeInDedicatedThreadAsync(dbContextWrapper.Object, async () => await ProcessBatchAsync(i, dbContextWrapper.Object, this.Args.BulkLoadMode)).Wait();
255251
}
256-
private DisposeWrapper<DbContext> ResolveDbContext()
257-
{
258-
var dbContextType = this.Args.DbContextType ?? typeof(DbContext);
259-
var dbContextFactoryType = typeof(IDbContextFactory<>).MakeGenericType(dbContextType);
260-
// if (IDbContextFactory<ApplicationDbContext>)
261-
if (this.Args.KeyedConnection == null)
262-
{
263-
if (this.ExecutionContext.DependencyResolver.TryResolve(dbContextType, out var dbContext))
264-
{
265-
return new DisposeWrapper<DbContext>((DbContext)dbContext, false);
266-
}
267-
else if (this.ExecutionContext.DependencyResolver.TryResolve(dbContextFactoryType, out var dbContextFactory))
268-
{
269-
var contextFactory = (DbContext)this.ExecutionContext.DependencyResolver.Resolve(dbContextType);
270-
dbContext = dbContextFactoryType.GetMethod(nameof(IDbContextFactory<DbContext>.CreateDbContext))?.Invoke(contextFactory, null);
271-
return new DisposeWrapper<DbContext>((DbContext)dbContext, true);
272-
}
273-
return null;
274-
}
275-
else
276-
{
277-
if (this.ExecutionContext.DependencyResolver.TryResolve(dbContextType, this.Args.KeyedConnection, out var dbContext))
278-
{
279-
return new DisposeWrapper<DbContext>((DbContext)dbContext, false);
280-
}
281-
else if (this.ExecutionContext.DependencyResolver.TryResolve(dbContextFactoryType, this.Args.KeyedConnection, out var dbContextFactory))
282-
{
283-
var contextFactory = (DbContext)this.ExecutionContext.DependencyResolver.Resolve(dbContextType);
284-
dbContext = dbContextFactoryType.GetMethod(nameof(IDbContextFactory<DbContext>.CreateDbContext))?.Invoke(contextFactory, null);
285-
return new DisposeWrapper<DbContext>((DbContext)dbContext, true);
286-
}
287-
return null;
288-
}
289-
}
252+
290253
public async Task ProcessBatchAsync(List<(TIn Input, TInEf Entity)> items, DbContext dbContext, SaveMode bulkLoadMode)
291254
{
292255
var entities = items.Select(i => i.Item2).ToArray();
293256
if (Args.PivotCriteria != null)
294257
{
295-
dbContext.EfSaveAsync(entities, Args.PivotCriteria, Args.SourceStream.Observable.CancellationToken, Args.DoNotUpdateIfExists, Args.InsertOnly).Wait();
258+
dbContext.EfSaveAsync(entities, Args.PivotCriteria, Args.SourceStream.Observable.CancellationToken, Args.DoNotUpdateIfExists, Args.InsertOnly).Wait();
296259
}
297260
else
298261
{
299-
var pivotKeys = Args.PivotKeys == null ? (Expression<Func<TInEf, object>>[])null : Args.PivotKeys.ToArray();
262+
var pivotKeys = Args.PivotKeys == null ? null : Args.PivotKeys.ToArray();
300263
if (bulkLoadMode == SaveMode.EntityFrameworkCore)
301264
{
302265
dbContext.EfSaveAsync(entities, pivotKeys, Args.SourceStream.Observable.CancellationToken, Args.DoNotUpdateIfExists, Args.InsertOnly).Wait();

src/Paillave.Etl.EntityFrameworkCore/EfCoreValuesProvider.cs

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -32,9 +32,9 @@ public class EfCoreValuesProvider<TIn, TOut> : ValuesProviderBase<TIn, TOut>
3232
public override ProcessImpact MemoryFootPrint => ProcessImpact.Light;
3333
public override void PushValues(TIn input, Action<TOut> push, CancellationToken cancellationToken, IExecutionContext context)
3434
{
35-
var dbContext = _args.ConnectionKey == null
36-
? context.DependencyResolver.Resolve<DbContext>()
37-
: context.DependencyResolver.Resolve<DbContext>(_args.ConnectionKey);
35+
var dbContext = context.DependencyResolver.ResolveDbContext<DbContext>(_args.ConnectionKey)
36+
?? throw new InvalidOperationException($"No DbContext could be resolved for type '{typeof(DbContext).FullName}'. Please check your dependency injection configuration.");
37+
3838
if (_args.StreamMode)
3939
{
4040
context.InvokeInDedicatedThreadAsync(dbContext, () =>
@@ -63,9 +63,8 @@ public class EfCoreSingleValueProvider<TIn, TOut> : ValuesProviderBase<TIn, TOut
6363
public override ProcessImpact MemoryFootPrint => ProcessImpact.Light;
6464
public override void PushValues(TIn input, Action<TOut> push, CancellationToken cancellationToken, IExecutionContext context)
6565
{
66-
var dbContext = _args.ConnectionKey == null
67-
? context.DependencyResolver.Resolve<DbContext>()
68-
: context.DependencyResolver.Resolve<DbContext>(_args.ConnectionKey);
66+
var dbContext = context.DependencyResolver.ResolveDbContext<DbContext>(_args.ConnectionKey)
67+
?? throw new InvalidOperationException($"No DbContext could be resolved for type '{typeof(DbContext).FullName}'. Please check your dependency injection configuration.");
6968
var res = context.InvokeInDedicatedThreadAsync(dbContext, async () => await _args.GetQuery(new DbContextWrapper(dbContext), input).FirstOrDefaultAsync()).Result;
7069
push(res);
7170
}
@@ -87,11 +86,9 @@ protected override ISingleStream<TOut> CreateOutputStream(EfCoreSingleValueProvi
8786
{
8887
var obs = args.Stream.Observable.Map(input =>
8988
{
90-
var resolver = args.Stream.SourceNode.ExecutionContext.DependencyResolver;
89+
var dbContext = args.Stream.SourceNode.ExecutionContext.DependencyResolver.ResolveDbContext<DbContext>(args.ConnectionKey)
90+
?? throw new InvalidOperationException($"No DbContext could be resolved for type '{typeof(DbContext).FullName}'. Please check your dependency injection configuration.");
9191
var invoker = args.Stream.SourceNode.ExecutionContext;
92-
var dbContext = args.ConnectionKey == null
93-
? resolver.Resolve<DbContext>()
94-
: resolver.Resolve<DbContext>(args.ConnectionKey);
9592
var res = invoker.InvokeInDedicatedThreadAsync(dbContext, async () => await args.GetQuery(new DbContextWrapper(dbContext), input).FirstOrDefaultAsync()).Result;
9693
return res;
9794
});

src/Paillave.Etl.EntityFrameworkCore/EntityFrameworkCoreUpdate.Stream.ex.cs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ namespace Paillave.Etl.EntityFrameworkCore
66
{
77
public static class EntityFrameworkCoreUpdateEx
88
{
9-
public static IStream<TIn> EfCoreUpdate<TIn, TEntity>(this IStream<TIn> stream, string name, Expression<Func<TIn, TEntity>> updateKey, Expression<Func<TIn, TEntity>> updateValues, UpdateMode updateMode = UpdateMode.SqlServerBulk, int chunkSize = 10000)
9+
public static IStream<TIn> EfCoreUpdate<TIn, TEntity>(this IStream<TIn> stream, string name, Expression<Func<TIn, TEntity>> updateKey, Expression<Func<TIn, TEntity>> updateValues, UpdateMode updateMode = UpdateMode.SqlServerBulk, int chunkSize = 10000, string? connectionKey = null)
1010
where TEntity : class
1111
{
1212
return new UpdateEntityFrameworkCoreStreamNode<TEntity, TIn>(name, new UpdateEntityFrameworkCoreArgs<TEntity, TIn>
@@ -15,8 +15,9 @@ public static IStream<TIn> EfCoreUpdate<TIn, TEntity>(this IStream<TIn> stream,
1515
BatchSize = chunkSize,
1616
BulkLoadMode = updateMode,
1717
UpdateKey = updateKey,
18-
UpdateValues = updateValues
18+
UpdateValues = updateValues,
19+
ConnectionKey = connectionKey
1920
}).Output;
2021
}
21-
}
22+
}
2223
}

src/Paillave.Etl.EntityFrameworkCore/UpdateEntityFrameworkCoreStreamNode.cs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ public class UpdateEntityFrameworkCoreArgs<TEntity, TSource>
1313
where TEntity : class
1414
{
1515
public IStream<TSource> SourceStream { get; set; }
16+
public string? ConnectionKey { get; set; }
1617
public int BatchSize { get; set; } = 10000;
1718
public UpdateMode BulkLoadMode { get; set; } = UpdateMode.SqlServerBulk;
1819
public Expression<Func<TSource, TEntity>> UpdateKey { get; set; }
@@ -40,7 +41,9 @@ protected override IStream<TSource> CreateOutputStream(UpdateEntityFrameworkCore
4041
.Chunk(args.BatchSize)
4142
.Do(i =>
4243
{
43-
var dbContext = this.ExecutionContext.DependencyResolver.Resolve<DbContext>();
44+
var dbContext = this.ExecutionContext.DependencyResolver.ResolveDbContext<DbContext>(args.ConnectionKey)
45+
?? throw new InvalidOperationException($"No DbContext could be resolved for type '{typeof(DbContext).FullName}'. Please check your dependency injection configuration.");
46+
4447
this.ExecutionContext.InvokeInDedicatedThreadAsync(dbContext, () => ProcessBatch(i.ToList(), dbContext, args.BulkLoadMode)).Wait();
4548
})
4649
.FlatMap((i, ct) => PushObservable.FromEnumerable(i, ct));

src/Paillave.Etl/Core/FileReference.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
using System.Collections.Generic;
22
using System.IO;
3-
using System.Threading;
43

54
namespace Paillave.Etl.Core;
65

0 commit comments

Comments
 (0)