1818using MongoDB . Bson ;
1919using MongoDB . Bson . Serialization . Serializers ;
2020using MongoDB . Driver . Core . Bindings ;
21+ using MongoDB . Driver . Core . Connections ;
2122using MongoDB . Driver . Core . Events ;
2223using MongoDB . Driver . Core . Misc ;
2324using MongoDB . Driver . Core . WireProtocol . Messages . Encoders ;
2627
2728namespace MongoDB . Driver . Core . Operations
2829{
29- internal sealed class CreateCollectionOperation : IWriteOperation < BsonDocument >
30+ internal sealed class CreateCollectionOperation : IWriteOperation < BsonDocument > , IRetryableWriteOperation < BsonDocument >
3031 {
3132 #region static
3233
@@ -211,7 +212,7 @@ public BsonDocument ClusteredIndex
211212 set => _clusteredIndex = value ;
212213 }
213214
214- internal BsonDocument CreateCommand ( OperationContext operationContext , ICoreSessionHandle session )
215+ internal BsonDocument CreateCommand ( OperationContext operationContext , ICoreSessionHandle session , ConnectionDescription connectionDescription , long ? transactionNumber )
215216 {
216217 var writeConcern = WriteConcernHelper . GetEffectiveWriteConcern ( operationContext , session , _writeConcern ) ;
217218 return new BsonDocument
@@ -232,49 +233,76 @@ internal BsonDocument CreateCommand(OperationContext operationContext, ICoreSess
232233 { "expireAfterSeconds" , ( ) => _expireAfter . Value . TotalSeconds , _expireAfter . HasValue } ,
233234 { "timeseries" , ( ) => _timeSeriesOptions . ToBsonDocument ( ) , _timeSeriesOptions != null } ,
234235 { "encryptedFields" , _encryptedFields , _encryptedFields != null } ,
235- { "changeStreamPreAndPostImages" , _changeStreamPreAndPostImages , _changeStreamPreAndPostImages != null }
236+ { "changeStreamPreAndPostImages" , _changeStreamPreAndPostImages , _changeStreamPreAndPostImages != null } ,
237+ { "txnNumber" , ( ) => transactionNumber . Value , transactionNumber . HasValue }
236238 } ;
237239 }
238240
239241 public BsonDocument Execute ( OperationContext operationContext , IWriteBinding binding )
240242 {
241- Ensure . IsNotNull ( binding , nameof ( binding ) ) ;
243+ using ( BeginOperation ( ) )
244+ {
245+ return RetryableWriteOperationExecutor . Execute ( operationContext , this , binding , retryRequested : false ) ;
246+ }
247+ }
242248
249+ public BsonDocument Execute ( OperationContext operationContext , RetryableWriteContext context )
250+ {
243251 using ( BeginOperation ( ) )
244- using ( var channelSource = binding . GetWriteChannelSource ( operationContext ) )
245- using ( var channel = channelSource . GetChannel ( operationContext ) )
246252 {
247- EnsureServerIsValid ( channel . ConnectionDescription . MaxWireVersion ) ;
248- using ( var channelBinding = new ChannelReadWriteBinding ( channelSource . Server , channel , binding . Session . Fork ( ) ) )
249- {
250- var operation = CreateOperation ( operationContext , channelBinding . Session ) ;
251- return operation . Execute ( operationContext , channelBinding ) ;
252- }
253+ return RetryableWriteOperationExecutor . Execute ( operationContext , this , context ) ;
253254 }
254255 }
255256
256- public async Task < BsonDocument > ExecuteAsync ( OperationContext operationContext , IWriteBinding binding )
257+ public Task < BsonDocument > ExecuteAsync ( OperationContext operationContext , IWriteBinding binding )
257258 {
258- Ensure . IsNotNull ( binding , nameof ( binding ) ) ;
259+ using ( BeginOperation ( ) )
260+ {
261+ return RetryableWriteOperationExecutor . ExecuteAsync ( operationContext , this , binding , retryRequested : false ) ;
262+ }
263+ }
259264
265+ public Task < BsonDocument > ExecuteAsync ( OperationContext operationContext , RetryableWriteContext context )
266+ {
260267 using ( BeginOperation ( ) )
261- using ( var channelSource = await binding . GetWriteChannelSourceAsync ( operationContext ) . ConfigureAwait ( false ) )
262- using ( var channel = await channelSource . GetChannelAsync ( operationContext ) . ConfigureAwait ( false ) )
263268 {
264- EnsureServerIsValid ( channel . ConnectionDescription . MaxWireVersion ) ;
265- using ( var channelBinding = new ChannelReadWriteBinding ( channelSource . Server , channel , binding . Session . Fork ( ) ) )
266- {
267- var operation = CreateOperation ( operationContext , channelBinding . Session ) ;
268- return await operation . ExecuteAsync ( operationContext , channelBinding ) . ConfigureAwait ( false ) ;
269- }
269+ return RetryableWriteOperationExecutor . ExecuteAsync ( operationContext , this , context ) ;
270+ }
271+ }
272+
273+ public BsonDocument ExecuteAttempt ( OperationContext operationContext , RetryableWriteContext context , int attempt , long ? transactionNumber )
274+ {
275+ var binding = context . Binding ;
276+ var channelSource = context . ChannelSource ;
277+ var channel = context . Channel ;
278+
279+ EnsureServerIsValid ( channel . ConnectionDescription . MaxWireVersion ) ;
280+ using ( var channelBinding = new ChannelReadWriteBinding ( channelSource . Server , channel , binding . Session . Fork ( ) ) )
281+ {
282+ var operation = CreateOperation ( operationContext , channelBinding . Session , channel . ConnectionDescription , transactionNumber ) ;
283+ return operation . Execute ( operationContext , channelBinding ) ;
284+ }
285+ }
286+
287+ public async Task < BsonDocument > ExecuteAttemptAsync ( OperationContext operationContext , RetryableWriteContext context , int attempt , long ? transactionNumber )
288+ {
289+ var binding = context . Binding ;
290+ var channelSource = context . ChannelSource ;
291+ var channel = context . Channel ;
292+
293+ EnsureServerIsValid ( channel . ConnectionDescription . MaxWireVersion ) ;
294+ using ( var channelBinding = new ChannelReadWriteBinding ( channelSource . Server , channel , binding . Session . Fork ( ) ) )
295+ {
296+ var operation = CreateOperation ( operationContext , channelBinding . Session , channel . ConnectionDescription , transactionNumber ) ;
297+ return await operation . ExecuteAsync ( operationContext , channelBinding ) . ConfigureAwait ( false ) ;
270298 }
271299 }
272300
273301 private IDisposable BeginOperation ( ) => EventContext . BeginOperation ( "create" ) ;
274302
275- private WriteCommandOperation < BsonDocument > CreateOperation ( OperationContext operationContext , ICoreSessionHandle session )
303+ private WriteCommandOperation < BsonDocument > CreateOperation ( OperationContext operationContext , ICoreSessionHandle session , ConnectionDescription connectionDescription , long ? transactionNumber )
276304 {
277- var command = CreateCommand ( operationContext , session ) ;
305+ var command = CreateCommand ( operationContext , session , connectionDescription , transactionNumber ) ;
278306 return new WriteCommandOperation < BsonDocument > ( _collectionNamespace . DatabaseNamespace , command , BsonDocumentSerializer . Instance , _messageEncoderSettings ) ;
279307 }
280308
0 commit comments