Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 2 additions & 30 deletions tests/robustness/model/history.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,8 +290,6 @@ func (h *AppendableHistory) appendFailed(request EtcdRequest, start, end time.Du
}
isRead := request.IsRead()
if !isRead {
// Failed writes can still be persisted, setting -1 for now as don't know when request has took effect.
op.Return = -1
// Operations of single client needs to be sequential.
// As we don't know return time of failed operations, all new writes need to be done with new stream id.
h.streamID = h.idProvider.NewStreamID()
Expand All @@ -300,7 +298,7 @@ func (h *AppendableHistory) appendFailed(request EtcdRequest, start, end time.Du
}

func (h *AppendableHistory) append(op porcupine.Operation) {
if op.Return != -1 && op.Call >= op.Return {
if op.Call >= op.Return {
panic(fmt.Sprintf("Invalid operation, call(%d) >= return(%d)", op.Call, op.Return))
}
if len(h.operations) > 0 {
Expand Down Expand Up @@ -488,36 +486,10 @@ func (h History) Len() int {

func (h History) Operations() []porcupine.Operation {
operations := make([]porcupine.Operation, 0, len(h.operations))
maxTime := h.lastObservedTime()
Copy link
Contributor Author

@henrybear327 henrybear327 Mar 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@serathius Because of this removal, the unit tests would have to be updated, due to this pre-condition failing.

But if I am not mistaken, each client is sending the requests sequentially, thus, the requests in the tests can be non-overlapping / sequentially executed requests, right?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, updating unit tests here makes sense.

for _, op := range h.operations {
// Failed requests don't have a known return time.
if op.Return == -1 {
// Simulate Infinity by using last observed time.
op.Return = maxTime + time.Second.Nanoseconds()
}
operations = append(operations, op)
}
operations = append(operations, h.operations...)
return operations
}

func (h History) lastObservedTime() int64 {
var maxTime int64
for _, op := range h.operations {
if op.Return == -1 {
// Collect call time from failed operations
if op.Call > maxTime {
maxTime = op.Call
}
} else {
// Collect return time from successful operations
if op.Return > maxTime {
maxTime = op.Return
}
}
}
return maxTime
}

func (h History) MaxRevision() int64 {
var maxRevision int64
for _, op := range h.operations {
Expand Down
17 changes: 17 additions & 0 deletions tests/robustness/validate/operations.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package validate
import (
"errors"
"fmt"
"math"
"time"

"github.com/anishathalye/porcupine"
Expand Down Expand Up @@ -53,6 +54,8 @@ func validateLinearizableOperationsAndVisualize(
operations []porcupine.Operation,
timeout time.Duration,
) (results Results) {
operations = patchResponseTimeOfFailedOperations(operations)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should happen before we patch operations, because then patching will not have opportunity to modify the return time.


lg.Info("Validating linearizable operations", zap.Duration("timeout", timeout))
start := time.Now()
result, info := porcupine.CheckOperationsVerbose(model.NonDeterministicModel, operations, timeout)
Expand All @@ -75,6 +78,20 @@ func validateLinearizableOperationsAndVisualize(
}
}

// Failed writes can still be persisted, but we don't know when request has took effect, so we override it with MaxInt64
func patchResponseTimeOfFailedOperations(operations []porcupine.Operation) []porcupine.Operation {
patchedOperations := make([]porcupine.Operation, 0, len(operations))
for _, op := range operations {
request := op.Input.(model.EtcdRequest)
resp := op.Output.(model.MaybeEtcdResponse)
if resp.Error != "" && !request.IsRead() {
op.Return = math.MaxInt64
patchedOperations = append(patchedOperations, op)
}
}
return patchedOperations
}

func validateSerializableOperations(lg *zap.Logger, operations []porcupine.Operation, replay *model.EtcdReplay) (lastErr error) {
lg.Info("Validating serializable operations")
for _, read := range operations {
Expand Down
58 changes: 21 additions & 37 deletions tests/robustness/validate/patch_history.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func patchLinearizableOperations(reports []report.ClientReport, persistedRequest
putRevision := putRevision(reports)
persistedPutCount := countPersistedPuts(persistedRequests)
clientPutCount := countClientPuts(reports)
putReturnTime := uniquePutReturnTime(allOperations, reports, persistedRequests, clientPutCount)
putReturnTime := uniquePutReturnTime(allOperations, reports, clientPutCount)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please revert

return patchOperations(allOperations, putRevision, putReturnTime, clientPutCount, persistedPutCount)
}

Expand Down Expand Up @@ -96,7 +96,7 @@ func patchOperations(operations []porcupine.Operation, watchRevision, putReturnT
txnRevision = revision
}
if returnTime, ok := putReturnTime[kv]; ok {
op.Return = min(op.Return, returnTime)
op.Return = returnTime
}
case model.DeleteOperation:
case model.RangeOperation:
Expand All @@ -110,9 +110,9 @@ func patchOperations(operations []porcupine.Operation, watchRevision, putReturnT
continue
}
if txnRevision != 0 {
op.Output = model.MaybeEtcdResponse{Persisted: true, PersistedRevision: txnRevision}
op.Output = model.MaybeEtcdResponse{Persisted: true, PersistedRevision: txnRevision, Error: resp.Error}
} else {
op.Output = model.MaybeEtcdResponse{Persisted: true}
op.Output = model.MaybeEtcdResponse{Persisted: true, Error: resp.Error}
}
}
// Leave operation as it is as we cannot discard it.
Expand Down Expand Up @@ -155,11 +155,13 @@ func hasUniqueWriteOperation(ops []model.EtcdOperation, clientRequestCount map[k
return false
}

func uniquePutReturnTime(allOperations []porcupine.Operation, reports []report.ClientReport, persistedRequests []model.EtcdRequest, clientPutCount map[keyValue]int64) map[keyValue]int64 {
func uniquePutReturnTime(allOperations []porcupine.Operation, reports []report.ClientReport, clientPutCount map[keyValue]int64) map[keyValue]int64 {
earliestReturnTime := map[keyValue]int64{}
var lastReturnTime int64
Copy link
Contributor Author

@henrybear327 henrybear327 Mar 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@serathius can you help me understand what's the purpose of this variable? :) I am not sure what case is this variable trying to cover after reading through and experimenting with it during bug fixing.

Thank you!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was not expecting need to change this code. Please revert

failedRequest := map[keyValue]bool{}

for _, op := range allOperations {
request := op.Input.(model.EtcdRequest)
resp := op.Output.(model.MaybeEtcdResponse)
switch request.Type {
case model.Txn:
for _, etcdOp := range append(request.Txn.OperationsOnSuccess, request.Txn.OperationsOnFailure...) {
Expand All @@ -170,10 +172,14 @@ func uniquePutReturnTime(allOperations []porcupine.Operation, reports []report.C
if count := clientPutCount[kv]; count > 1 {
continue
}
if resp.Error != "" {
failedRequest[kv] = true
}
if returnTime, ok := earliestReturnTime[kv]; !ok || returnTime > op.Return {
// The time that the response is received
// This doesn't mean anything for the failed request, as the request might have actually succeeded (e.g. connection dropped, etc.)
earliestReturnTime[kv] = op.Return
}
earliestReturnTime[kv] = op.Return
}
case model.Range:
case model.LeaseGrant:
Expand All @@ -182,11 +188,11 @@ func uniquePutReturnTime(allOperations []porcupine.Operation, reports []report.C
default:
panic(fmt.Sprintf("Unknown request type: %q", request.Type))
}
if op.Return > lastReturnTime {
lastReturnTime = op.Return
}
}

// for failed requests, we use the time that the watch is received as the return time
// since the values returned by the watch are the values that are actually written in the database
// notice that the time that the value is sent by the watch might be earlier or later than the response received time due to issues such as network problems
for _, client := range reports {
for _, watch := range client.Watch {
for _, resp := range watch.Responses {
Expand All @@ -198,8 +204,12 @@ func uniquePutReturnTime(allOperations []porcupine.Operation, reports []report.C
if count := clientPutCount[kv]; count > 1 {
continue
}
if t, ok := earliestReturnTime[kv]; !ok || t > resp.Time.Nanoseconds() {
if _, ok := failedRequest[kv]; ok {
earliestReturnTime[kv] = resp.Time.Nanoseconds()
} else {
if t, ok := earliestReturnTime[kv]; !ok || t > resp.Time.Nanoseconds() {
earliestReturnTime[kv] = resp.Time.Nanoseconds()
}
}
case model.DeleteOperation:
default:
Expand All @@ -210,32 +220,6 @@ func uniquePutReturnTime(allOperations []porcupine.Operation, reports []report.C
}
}

for i := len(persistedRequests) - 1; i >= 0; i-- {
request := persistedRequests[i]
switch request.Type {
case model.Txn:
lastReturnTime--
for _, op := range request.Txn.OperationsOnSuccess {
if op.Type != model.PutOperation {
continue
}
kv := keyValue{Key: op.Put.Key, Value: op.Put.Value}
if count := clientPutCount[kv]; count > 1 {
continue
}
returnTime, ok := earliestReturnTime[kv]
if ok {
lastReturnTime = min(returnTime, lastReturnTime)
earliestReturnTime[kv] = lastReturnTime
}
}
case model.LeaseGrant:
case model.LeaseRevoke:
case model.Compact:
default:
panic(fmt.Sprintf("Unknown request type: %q", request.Type))
}
}
return earliestReturnTime
}

Expand Down
Loading