Skip to content
This repository was archived by the owner on May 24, 2024. It is now read-only.

Commit 66e617e

Browse files
authored
Merge pull request #17 from LF-Engineering/remove-duplicated-events
Prevent duplicated update events
2 parents 459ac79 + 4c8ffd1 commit 66e617e

File tree

1 file changed

+102
-12
lines changed

1 file changed

+102
-12
lines changed

cmd/confluence/confluence.go

Lines changed: 102 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
package main
22

33
import (
4+
"crypto/sha256"
45
"encoding/base64"
6+
"encoding/json"
57
"flag"
68
"fmt"
79
"os"
@@ -47,6 +49,7 @@ const (
4749
ConfluenceAddHistoryCreatedByRole = false
4850
// ConfluenceAddHistoryLastUpdatedByRole - should we add contributor for history->lastUpdatedBy page version edit?
4951
ConfluenceAddHistoryLastUpdatedByRole = false
52+
contentHashField = "contentHash"
5053
)
5154

5255
var (
@@ -1287,7 +1290,7 @@ func (j *DSConfluence) GetModelData(ctx *shared.Ctx, docs []interface{}) (data m
12871290
cacheID := fmt.Sprintf("content-%s", confluenceContentID)
12881291
isCreated, err := j.cacheProvider.IsKeyCreated(j.endpoint, cacheID)
12891292
if err != nil {
1290-
j.log.WithFields(logrus.Fields{"operation": "GetModelDataPullRequest"}).Errorf("error getting cache for endpoint %s. error: %+v", j.endpoint, err)
1293+
j.log.WithFields(logrus.Fields{"operation": "GetModelData"}).Errorf("error getting cache for endpoint %s. error: %+v", j.endpoint, err)
12911294
return data, err
12921295
}
12931296
key := "updated"
@@ -1337,26 +1340,37 @@ func (j *DSConfluence) ConfluenceEnrichItems(ctx *shared.Ctx, thrN int, items []
13371340
case "created":
13381341
ev, _ := v[0].(insightsConf.ContentCreatedEvent)
13391342
err = j.Publisher.PushEvents(ev.Event(), insightsStr, ConfluenceDataSource, contentsStr, envStr, v)
1340-
for _, val := range v {
1341-
id := fmt.Sprintf("%s-%s", "content", val.(insightsConf.ContentCreatedEvent).Payload.ID)
1342-
d = append(d, map[string]interface{}{
1343-
"id": id,
1344-
"data": "",
1345-
})
1343+
cacheData, err := j.cachedCreatedContent(v)
1344+
if err != nil {
1345+
j.log.WithFields(logrus.Fields{"operation": "ConfluenceEnrichItems"}).Errorf("cachedCreatedContent error: %+v", err)
1346+
return
13461347
}
1348+
d = append(d, cacheData...)
13471349
case "updated":
1348-
ev, _ := v[0].(insightsConf.ContentUpdatedEvent)
1349-
err = j.Publisher.PushEvents(ev.Event(), insightsStr, ConfluenceDataSource, contentsStr, envStr, v)
1350+
updates, cacheData, err := j.preventUpdateDuplication(v)
1351+
if err != nil {
1352+
j.log.WithFields(logrus.Fields{"operation": "ConfluenceEnrichItems"}).Errorf("preventUpdateDuplication error: %+v", err)
1353+
return
1354+
}
1355+
if len(cacheData) > 0 {
1356+
d = append(d, cacheData...)
1357+
}
1358+
if len(updates) > 0 {
1359+
ev, _ := updates[0].(insightsConf.ContentUpdatedEvent)
1360+
err = j.Publisher.PushEvents(ev.Event(), insightsStr, ConfluenceDataSource, contentsStr, envStr, updates)
1361+
}
13501362
default:
13511363
err = fmt.Errorf("unknown confluence event type '%s'", k)
13521364
}
13531365
if err != nil {
13541366
break
13551367
}
13561368
}
1357-
err = j.cacheProvider.Create(j.endpoint, d)
1358-
if err != nil {
1359-
j.log.WithFields(logrus.Fields{"operation": "ConfluenceEnrichItems"}).Errorf("error creating cache for endpoint %s. Error: %+v", j.endpoint, err)
1369+
if len(d) > 0 {
1370+
err = j.cacheProvider.Create(j.endpoint, d)
1371+
if err != nil {
1372+
j.log.WithFields(logrus.Fields{"operation": "ConfluenceEnrichItems"}).Errorf("error creating cache for endpoint %s. Error: %+v", j.endpoint, err)
1373+
}
13601374
}
13611375
} else {
13621376
jsonBytes, err = jsoniter.Marshal(data)
@@ -1529,3 +1543,79 @@ func (j *DSConfluence) AddCacheProvider() {
15291543
j.cacheProvider = *cacheProvider
15301544
j.endpoint = strings.ReplaceAll(strings.TrimPrefix(strings.TrimPrefix(j.URL, "https://"), "http://"), "/", "-")
15311545
}
1546+
1547+
func (j *DSConfluence) cachedCreatedContent(v []interface{}) ([]map[string]interface{}, error) {
1548+
cacheData := make([]map[string]interface{}, 0)
1549+
for _, val := range v {
1550+
content := val.(insightsConf.ContentCreatedEvent).Payload
1551+
id := fmt.Sprintf("%s-%s", "content", val.(insightsConf.ContentCreatedEvent).Payload.ID)
1552+
c := insightsConf.Content{
1553+
ID: content.ID,
1554+
EndpointID: content.EndpointID,
1555+
Space: content.Space,
1556+
ServerURL: content.ServerURL,
1557+
ContentID: content.ContentID,
1558+
ContentURL: content.ContentURL,
1559+
Type: content.Type,
1560+
Title: content.Title,
1561+
Body: content.Body,
1562+
Contributors: content.Contributors,
1563+
Children: content.Children,
1564+
}
1565+
b, err := json.Marshal(c)
1566+
if err != nil {
1567+
return cacheData, err
1568+
}
1569+
contentHash := fmt.Sprintf("%x", sha256.Sum256(b))
1570+
cacheData = append(cacheData, map[string]interface{}{
1571+
"id": id,
1572+
"data": map[string]interface{}{
1573+
contentHashField: contentHash,
1574+
},
1575+
})
1576+
}
1577+
return cacheData, nil
1578+
}
1579+
1580+
func (j *DSConfluence) preventUpdateDuplication(v []interface{}) ([]interface{}, []map[string]interface{}, error) {
1581+
updatedVals := make([]interface{}, 0, len(v))
1582+
cacheData := make([]map[string]interface{}, 0)
1583+
for _, val := range v {
1584+
content := val.(insightsConf.ContentUpdatedEvent).Payload
1585+
c := insightsConf.Content{
1586+
ID: content.ID,
1587+
EndpointID: content.EndpointID,
1588+
Space: content.Space,
1589+
ServerURL: content.ServerURL,
1590+
ContentID: content.ContentID,
1591+
ContentURL: content.ContentURL,
1592+
Type: content.Type,
1593+
Title: content.Title,
1594+
Body: content.Body,
1595+
Contributors: content.Contributors,
1596+
Children: content.Children,
1597+
}
1598+
b, err := json.Marshal(c)
1599+
if err != nil {
1600+
return updatedVals, cacheData, nil
1601+
}
1602+
contentHash := fmt.Sprintf("%x", sha256.Sum256(b))
1603+
cacheID := fmt.Sprintf("content-%s", content.ID)
1604+
byt, err := j.cacheProvider.GetFileByKey(j.endpoint, cacheID)
1605+
if err != nil {
1606+
return updatedVals, cacheData, nil
1607+
}
1608+
cachedHash := make(map[string]interface{})
1609+
err = json.Unmarshal(byt, &cachedHash)
1610+
if contentHash != cachedHash["contentHash"] {
1611+
updatedVals = append(updatedVals, val)
1612+
cacheData = append(cacheData, map[string]interface{}{
1613+
"id": cacheID,
1614+
"data": map[string]interface{}{
1615+
contentHashField: contentHash,
1616+
},
1617+
})
1618+
}
1619+
}
1620+
return updatedVals, cacheData, nil
1621+
}

0 commit comments

Comments
 (0)