Skip to content

Commit 829fa9a

Browse files
committed
dynamolock: add context enabled calls
Addresses #106
1 parent 0c8bdc7 commit 829fa9a

File tree

4 files changed

+106
-38
lines changed

4 files changed

+106
-38
lines changed

client.go

Lines changed: 81 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -251,6 +251,12 @@ func WithSessionMonitor(safeTime time.Duration, callback func()) AcquireLockOpti
251251

252252
// AcquireLock holds the defined lock.
253253
func (c *Client) AcquireLock(key string, opts ...AcquireLockOption) (*Lock, error) {
254+
return c.AcquireLockWithContext(context.Background(), key, opts...)
255+
}
256+
257+
// AcquireLockWithContext holds the defined lock. The given context is passed
258+
// down to the underlying dynamoDB call.
259+
func (c *Client) AcquireLockWithContext(ctx context.Context, key string, opts ...AcquireLockOption) (*Lock, error) {
254260
if c.isClosed() {
255261
return nil, ErrClientClosed
256262
}
@@ -260,10 +266,10 @@ func (c *Client) AcquireLock(key string, opts ...AcquireLockOption) (*Lock, erro
260266
for _, opt := range opts {
261267
opt(req)
262268
}
263-
return c.acquireLock(req)
269+
return c.acquireLock(ctx, req)
264270
}
265271

266-
func (c *Client) acquireLock(opt *acquireLockOptions) (*Lock, error) {
272+
func (c *Client) acquireLock(ctx context.Context, opt *acquireLockOptions) (*Lock, error) {
267273
// Hold the read lock when acquiring locks. This prevents us from
268274
// acquiring a lock while the Client is being closed as we hold the
269275
// write lock during close.
@@ -311,7 +317,10 @@ func (c *Client) acquireLock(opt *acquireLockOptions) (*Lock, error) {
311317
}
312318

313319
for {
314-
l, err := c.storeLock(&getLockOptions)
320+
if err := ctx.Err(); err != nil {
321+
return nil, err
322+
}
323+
l, err := c.storeLock(ctx, &getLockOptions)
315324
if err != nil {
316325
return nil, err
317326
} else if l != nil {
@@ -322,10 +331,10 @@ func (c *Client) acquireLock(opt *acquireLockOptions) (*Lock, error) {
322331
}
323332
}
324333

325-
func (c *Client) storeLock(getLockOptions *getLockOptions) (*Lock, error) {
334+
func (c *Client) storeLock(ctx context.Context, getLockOptions *getLockOptions) (*Lock, error) {
326335
c.logger.Println("Call GetItem to see if the lock for ",
327336
c.partitionKeyName, " =", getLockOptions.partitionKeyName, " exists in the table")
328-
existingLock, err := c.getLockFromDynamoDB(*getLockOptions)
337+
existingLock, err := c.getLockFromDynamoDB(ctx, *getLockOptions)
329338
if err != nil {
330339
return nil, err
331340
}
@@ -369,6 +378,7 @@ func (c *Client) storeLock(getLockOptions *getLockOptions) (*Lock, error) {
369378
//if the existing lock does not exist or exists and is released
370379
if existingLock == nil || existingLock.isReleased {
371380
l, err := c.upsertAndMonitorNewOrReleasedLock(
381+
ctx,
372382
getLockOptions.additionalAttributes,
373383
getLockOptions.partitionKeyName,
374384
getLockOptions.deleteLockOnRelease,
@@ -406,6 +416,7 @@ func (c *Client) storeLock(getLockOptions *getLockOptions) (*Lock, error) {
406416
} else if getLockOptions.lockTryingToBeAcquired.recordVersionNumber == existingLock.recordVersionNumber && getLockOptions.lockTryingToBeAcquired.isExpired() {
407417
/* If the version numbers match, then we can acquire the lock, assuming it has already expired */
408418
l, err := c.upsertAndMonitorExpiredLock(
419+
ctx,
409420
getLockOptions.additionalAttributes,
410421
getLockOptions.partitionKeyName,
411422
getLockOptions.deleteLockOnRelease,
@@ -434,6 +445,7 @@ func (c *Client) storeLock(getLockOptions *getLockOptions) (*Lock, error) {
434445
}
435446

436447
func (c *Client) upsertAndMonitorExpiredLock(
448+
ctx context.Context,
437449
additionalAttributes map[string]*dynamodb.AttributeValue,
438450
key string,
439451
deleteLockOnRelease bool,
@@ -459,11 +471,12 @@ func (c *Client) upsertAndMonitorExpiredLock(
459471
c.logger.Println("Acquiring an existing lock whose revisionVersionNumber did not change for ",
460472
c.partitionKeyName, " partitionKeyName=", key)
461473
return c.putLockItemAndStartSessionMonitor(
462-
additionalAttributes, key, deleteLockOnRelease, newLockData,
474+
ctx, additionalAttributes, key, deleteLockOnRelease, newLockData,
463475
recordVersionNumber, sessionMonitor, putItemRequest)
464476
}
465477

466478
func (c *Client) upsertAndMonitorNewOrReleasedLock(
479+
ctx context.Context,
467480
additionalAttributes map[string]*dynamodb.AttributeValue,
468481
key string,
469482
deleteLockOnRelease bool,
@@ -494,12 +507,13 @@ func (c *Client) upsertAndMonitorNewOrReleasedLock(
494507
// expire sooner than it actually will, so they start counting towards
495508
// its expiration before the Put succeeds
496509
c.logger.Println("Acquiring a new lock or an existing yet released lock on ", c.partitionKeyName, "=", key)
497-
return c.putLockItemAndStartSessionMonitor(additionalAttributes, key,
510+
return c.putLockItemAndStartSessionMonitor(ctx, additionalAttributes, key,
498511
deleteLockOnRelease, newLockData,
499512
recordVersionNumber, sessionMonitor, req)
500513
}
501514

502515
func (c *Client) putLockItemAndStartSessionMonitor(
516+
ctx context.Context,
503517
additionalAttributes map[string]*dynamodb.AttributeValue,
504518
key string,
505519
deleteLockOnRelease bool,
@@ -510,7 +524,7 @@ func (c *Client) putLockItemAndStartSessionMonitor(
510524

511525
lastUpdatedTime := time.Now()
512526

513-
_, err := c.dynamoDB.PutItem(putItemRequest)
527+
_, err := c.dynamoDB.PutItemWithContext(ctx, putItemRequest)
514528
if err != nil {
515529
return nil, parseDynamoDBError(err, "cannot store lock item: lock already acquired by other client")
516530
}
@@ -533,8 +547,8 @@ func (c *Client) putLockItemAndStartSessionMonitor(
533547
return lockItem, nil
534548
}
535549

536-
func (c *Client) getLockFromDynamoDB(opt getLockOptions) (*Lock, error) {
537-
res, err := c.readFromDynamoDB(opt.partitionKeyName)
550+
func (c *Client) getLockFromDynamoDB(ctx context.Context, opt getLockOptions) (*Lock, error) {
551+
res, err := c.readFromDynamoDB(ctx, opt.partitionKeyName)
538552
if err != nil {
539553
return nil, err
540554
}
@@ -547,11 +561,11 @@ func (c *Client) getLockFromDynamoDB(opt getLockOptions) (*Lock, error) {
547561
return c.createLockItem(opt, item)
548562
}
549563

550-
func (c *Client) readFromDynamoDB(key string) (*dynamodb.GetItemOutput, error) {
564+
func (c *Client) readFromDynamoDB(ctx context.Context, key string) (*dynamodb.GetItemOutput, error) {
551565
dynamoDBKey := map[string]*dynamodb.AttributeValue{
552566
c.partitionKeyName: {S: aws.String(key)},
553567
}
554-
return c.dynamoDB.GetItem(&dynamodb.GetItemInput{
568+
return c.dynamoDB.GetItemWithContext(ctx, &dynamodb.GetItemInput{
555569
ConsistentRead: aws.Bool(true),
556570
TableName: aws.String(c.tableName),
557571
Key: dynamoDBKey,
@@ -648,6 +662,15 @@ func (c *Client) heartbeat(ctx context.Context) {
648662
// takes a few minutes for DynamoDB to provision a new instance. Also, if the
649663
// table already exists, it will return an error.
650664
func (c *Client) CreateTable(tableName string, opts ...CreateTableOption) (*dynamodb.CreateTableOutput, error) {
665+
return c.CreateTableWithContext(context.Background(), tableName, opts...)
666+
}
667+
668+
// CreateTableWithContext prepares a DynamoDB table with the right schema for it
669+
// to be used by this locking library. The table should be set up in advance,
670+
// because it takes a few minutes for DynamoDB to provision a new instance.
671+
// Also, if the table already exists, it will return an error. The given context
672+
// is passed down to the underlying dynamoDB call.
673+
func (c *Client) CreateTableWithContext(ctx context.Context, tableName string, opts ...CreateTableOption) (*dynamodb.CreateTableOutput, error) {
651674
if c.isClosed() {
652675
return nil, ErrClientClosed
653676
}
@@ -659,7 +682,7 @@ func (c *Client) CreateTable(tableName string, opts ...CreateTableOption) (*dyna
659682
for _, opt := range opts {
660683
opt(createTableOptions)
661684
}
662-
return c.createTable(createTableOptions)
685+
return c.createTable(ctx, createTableOptions)
663686
}
664687

665688
// CreateTableOption is an options type for the CreateTable method in the lock
@@ -692,7 +715,7 @@ func WithProvisionedThroughput(provisionedThroughput *dynamodb.ProvisionedThroug
692715
}
693716
}
694717

695-
func (c *Client) createTable(opt *createDynamoDBTableOptions) (*dynamodb.CreateTableOutput, error) {
718+
func (c *Client) createTable(ctx context.Context, opt *createDynamoDBTableOptions) (*dynamodb.CreateTableOutput, error) {
696719
keySchema := []*dynamodb.KeySchemaElement{
697720
{
698721
AttributeName: aws.String(opt.partitionKeyName),
@@ -722,18 +745,26 @@ func (c *Client) createTable(opt *createDynamoDBTableOptions) (*dynamodb.CreateT
722745
createTableInput.Tags = opt.tags
723746
}
724747

725-
return c.dynamoDB.CreateTable(createTableInput)
748+
return c.dynamoDB.CreateTableWithContext(ctx, createTableInput)
726749
}
727750

728751
// ReleaseLock releases the given lock if the current user still has it,
729752
// returning true if the lock was successfully released, and false if someone
730753
// else already stole the lock or a problem happened. Deletes the lock item if
731754
// it is released and deleteLockItemOnClose is set.
732755
func (c *Client) ReleaseLock(lockItem *Lock, opts ...ReleaseLockOption) (bool, error) {
756+
return c.ReleaseLockWithContext(context.Background(), lockItem, opts...)
757+
}
758+
759+
// ReleaseLockWithContext releases the given lock if the current user still has it,
760+
// returning true if the lock was successfully released, and false if someone
761+
// else already stole the lock or a problem happened. Deletes the lock item if
762+
// it is released and deleteLockItemOnClose is set.
763+
func (c *Client) ReleaseLockWithContext(ctx context.Context, lockItem *Lock, opts ...ReleaseLockOption) (bool, error) {
733764
if c.isClosed() {
734765
return false, ErrClientClosed
735766
}
736-
err := c.releaseLock(lockItem, opts...)
767+
err := c.releaseLock(ctx, lockItem, opts...)
737768
return err == nil, err
738769
}
739770

@@ -771,7 +802,7 @@ func ownershipLockCondition(partitionKeyName, recordVersionNumber, ownerName str
771802
return cond
772803
}
773804

774-
func (c *Client) releaseLock(lockItem *Lock, opts ...ReleaseLockOption) error {
805+
func (c *Client) releaseLock(ctx context.Context, lockItem *Lock, opts ...ReleaseLockOption) error {
775806
options := &releaseLockOptions{
776807
lockItem: lockItem,
777808
}
@@ -801,12 +832,12 @@ func (c *Client) releaseLock(lockItem *Lock, opts ...ReleaseLockOption) error {
801832
key := c.getItemKeys(lockItem)
802833
ownershipLockCond := ownershipLockCondition(c.partitionKeyName, lockItem.recordVersionNumber, lockItem.ownerName)
803834
if deleteLock {
804-
err := c.deleteLock(ownershipLockCond, key)
835+
err := c.deleteLock(ctx, ownershipLockCond, key)
805836
if err != nil {
806837
return err
807838
}
808839
} else {
809-
err := c.updateLock(data, ownershipLockCond, key)
840+
err := c.updateLock(ctx, data, ownershipLockCond, key)
810841
if err != nil {
811842
return err
812843
}
@@ -815,7 +846,7 @@ func (c *Client) releaseLock(lockItem *Lock, opts ...ReleaseLockOption) error {
815846
return nil
816847
}
817848

818-
func (c *Client) deleteLock(ownershipLockCond expression.ConditionBuilder, key map[string]*dynamodb.AttributeValue) error {
849+
func (c *Client) deleteLock(ctx context.Context, ownershipLockCond expression.ConditionBuilder, key map[string]*dynamodb.AttributeValue) error {
819850
delExpr, _ := expression.NewBuilder().WithCondition(ownershipLockCond).Build()
820851
deleteItemRequest := &dynamodb.DeleteItemInput{
821852
TableName: aws.String(c.tableName),
@@ -824,14 +855,14 @@ func (c *Client) deleteLock(ownershipLockCond expression.ConditionBuilder, key m
824855
ExpressionAttributeNames: delExpr.Names(),
825856
ExpressionAttributeValues: delExpr.Values(),
826857
}
827-
_, err := c.dynamoDB.DeleteItem(deleteItemRequest)
858+
_, err := c.dynamoDB.DeleteItemWithContext(ctx, deleteItemRequest)
828859
if err != nil {
829860
return err
830861
}
831862
return nil
832863
}
833864

834-
func (c *Client) updateLock(data []byte, ownershipLockCond expression.ConditionBuilder, key map[string]*dynamodb.AttributeValue) error {
865+
func (c *Client) updateLock(ctx context.Context, data []byte, ownershipLockCond expression.ConditionBuilder, key map[string]*dynamodb.AttributeValue) error {
835866
update := expression.Set(isReleasedAttr, isReleasedAttrVal)
836867
if len(data) > 0 {
837868
update = update.Set(dataAttr, expression.Value(data))
@@ -847,14 +878,14 @@ func (c *Client) updateLock(data []byte, ownershipLockCond expression.ConditionB
847878
ExpressionAttributeValues: updateExpr.Values(),
848879
}
849880

850-
_, err := c.dynamoDB.UpdateItem(updateItemRequest)
881+
_, err := c.dynamoDB.UpdateItemWithContext(ctx, updateItemRequest)
851882
return err
852883
}
853884

854-
func (c *Client) releaseAllLocks() error {
885+
func (c *Client) releaseAllLocks(ctx context.Context) error {
855886
var err error
856887
c.locks.Range(func(key interface{}, value interface{}) bool {
857-
err = c.releaseLock(value.(*Lock))
888+
err = c.releaseLock(ctx, value.(*Lock))
858889
return err == nil
859890
})
860891
return err
@@ -875,22 +906,37 @@ func (c *Client) getItemKeys(lockItem *Lock) map[string]*dynamodb.AttributeValue
875906
// should check lockItem.isExpired() to figure out if it currently has the
876907
// lock.)
877908
func (c *Client) Get(key string) (*Lock, error) {
909+
return c.GetWithContext(context.Background(), key)
910+
}
911+
912+
// GetWithContext finds out who owns the given lock, but does not acquire the
913+
// lock. It returns the metadata currently associated with the given lock. If
914+
// the client currently has the lock, it will return the lock, and operations
915+
// such as releaseLock will work. However, if the client does not have the lock,
916+
// then operations like releaseLock will not work (after calling Get, the caller
917+
// should check lockItem.isExpired() to figure out if it currently has the
918+
// lock.) If the context is canceled, it is going to return the context error
919+
// on local cache hit. The given context is passed down to the underlying
920+
// dynamoDB call.
921+
func (c *Client) GetWithContext(ctx context.Context, key string) (*Lock, error) {
878922
if c.isClosed() {
879923
return nil, ErrClientClosed
880924
}
881925

926+
if err := ctx.Err(); err != nil {
927+
return nil, err
928+
}
929+
882930
getLockOption := getLockOptions{
883931
partitionKeyName: key,
884932
}
885-
886933
keyName := getLockOption.partitionKeyName
887-
888934
v, ok := c.locks.Load(keyName)
889935
if ok {
890936
return v.(*Lock), nil
891937
}
892938

893-
lockItem, err := c.getLockFromDynamoDB(getLockOption)
939+
lockItem, err := c.getLockFromDynamoDB(ctx, getLockOption)
894940
if err != nil {
895941
return nil, err
896942
}
@@ -916,13 +962,19 @@ func (c *Client) isClosed() bool {
916962

917963
// Close releases all of the locks.
918964
func (c *Client) Close() error {
965+
return c.CloseWithContext(context.Background())
966+
}
967+
968+
// CloseWithContext releases all of the locks. The given context is passed down
969+
// to the underlying dynamoDB calls.
970+
func (c *Client) CloseWithContext(ctx context.Context) error {
919971
err := ErrClientClosed
920972
c.closeOnce.Do(func() {
921973
// Hold the write lock for the duration of the close operation
922974
// to prevent new locks from being acquired.
923975
c.mu.Lock()
924976
defer c.mu.Unlock()
925-
err = c.releaseAllLocks()
977+
err = c.releaseAllLocks(context.Background())
926978
c.stopHeartbeat()
927979
c.closed = true
928980
})

client_heartbeat.go

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ limitations under the License.
1717
package dynamolock
1818

1919
import (
20+
"context"
2021
"time"
2122

2223
"github.com/aws/aws-sdk-go/aws"
@@ -49,12 +50,22 @@ func ReplaceHeartbeatData(data []byte) SendHeartbeatOption {
4950
}
5051
}
5152

52-
// SendHeartbeat indicatee that the given lock is still being worked on. If
53+
// SendHeartbeat indicates that the given lock is still being worked on. If
5354
// using WithHeartbeatPeriod > 0 when setting up this object, then this method
5455
// is unnecessary, because the background thread will be periodically calling it
5556
// and sending heartbeats. However, if WithHeartbeatPeriod = 0, then this method
5657
// must be called to instruct DynamoDB that the lock should not be expired.
5758
func (c *Client) SendHeartbeat(lockItem *Lock, opts ...SendHeartbeatOption) error {
59+
return c.SendHeartbeatWithContext(context.Background(), lockItem, opts...)
60+
}
61+
62+
// SendHeartbeatWithContext indicates that the given lock is still being worked
63+
// on. If using WithHeartbeatPeriod > 0 when setting up this object, then this
64+
// method is unnecessary, because the background thread will be periodically
65+
// calling it and sending heartbeats. However, if WithHeartbeatPeriod = 0, then
66+
// this method must be called to instruct DynamoDB that the lock should not be
67+
// expired. The given context is passed down to the underlying dynamoDB call.
68+
func (c *Client) SendHeartbeatWithContext(ctx context.Context, lockItem *Lock, opts ...SendHeartbeatOption) error {
5869
if c.isClosed() {
5970
return ErrClientClosed
6071
}
@@ -64,10 +75,10 @@ func (c *Client) SendHeartbeat(lockItem *Lock, opts ...SendHeartbeatOption) erro
6475
for _, opt := range opts {
6576
opt(sho)
6677
}
67-
return c.sendHeartbeat(sho)
78+
return c.sendHeartbeat(ctx, sho)
6879
}
6980

70-
func (c *Client) sendHeartbeat(options *sendHeartbeatOptions) error {
81+
func (c *Client) sendHeartbeat(ctx context.Context, options *sendHeartbeatOptions) error {
7182
leaseDuration := c.leaseDuration
7283

7384
lockItem := options.lockItem
@@ -109,7 +120,7 @@ func (c *Client) sendHeartbeat(options *sendHeartbeatOptions) error {
109120

110121
lastUpdateOfLock := time.Now()
111122

112-
_, err := c.dynamoDB.UpdateItem(updateItemInput)
123+
_, err := c.dynamoDB.UpdateItemWithContext(ctx, updateItemInput)
113124
if err != nil {
114125
err := parseDynamoDBError(err, "already acquired lock, stopping heartbeats")
115126
if isLockNotGrantedError(err) {

0 commit comments

Comments
 (0)