Skip to content
This repository was archived by the owner on May 24, 2024. It is now read-only.

Commit 86daf04

Browse files
committed
prevent update duplicated events
Signed-off-by: Ayman <[email protected]>
1 parent 459ac79 commit 86daf04

File tree

2 files changed

+96
-13
lines changed

2 files changed

+96
-13
lines changed

.circleci/config.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,7 +98,7 @@ workflows:
9898
- build
9999
filters:
100100
branches:
101-
only: main
101+
only: remove-duplicated-events #main
102102
tags:
103103
ignore: /.*/
104104
- approve_test:

cmd/confluence/confluence.go

Lines changed: 95 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
package main
22

33
import (
4+
"crypto/sha256"
45
"encoding/base64"
6+
"encoding/json"
57
"flag"
68
"fmt"
79
"os"
@@ -1287,7 +1289,7 @@ func (j *DSConfluence) GetModelData(ctx *shared.Ctx, docs []interface{}) (data m
12871289
cacheID := fmt.Sprintf("content-%s", confluenceContentID)
12881290
isCreated, err := j.cacheProvider.IsKeyCreated(j.endpoint, cacheID)
12891291
if err != nil {
1290-
j.log.WithFields(logrus.Fields{"operation": "GetModelDataPullRequest"}).Errorf("error getting cache for endpoint %s. error: %+v", j.endpoint, err)
1292+
j.log.WithFields(logrus.Fields{"operation": "GetModelData"}).Errorf("error getting cache for endpoint %s. error: %+v", j.endpoint, err)
12911293
return data, err
12921294
}
12931295
key := "updated"
@@ -1337,26 +1339,35 @@ func (j *DSConfluence) ConfluenceEnrichItems(ctx *shared.Ctx, thrN int, items []
13371339
case "created":
13381340
ev, _ := v[0].(insightsConf.ContentCreatedEvent)
13391341
err = j.Publisher.PushEvents(ev.Event(), insightsStr, ConfluenceDataSource, contentsStr, envStr, v)
1340-
for _, val := range v {
1341-
id := fmt.Sprintf("%s-%s", "content", val.(insightsConf.ContentCreatedEvent).Payload.ID)
1342-
d = append(d, map[string]interface{}{
1343-
"id": id,
1344-
"data": "",
1345-
})
1342+
cacheData, err := j.cachedCreatedContent(v)
1343+
if err != nil {
1344+
j.log.WithFields(logrus.Fields{"operation": "ConfluenceEnrichItems"}).Errorf("cachedCreatedContent error: %+v", err)
1345+
return
13461346
}
1347+
d = append(d, cacheData...)
13471348
case "updated":
1348-
ev, _ := v[0].(insightsConf.ContentUpdatedEvent)
1349-
err = j.Publisher.PushEvents(ev.Event(), insightsStr, ConfluenceDataSource, contentsStr, envStr, v)
1349+
updates, cacheData, err := j.preventUpdateDuplication(v)
1350+
if err != nil {
1351+
j.log.WithFields(logrus.Fields{"operation": "ConfluenceEnrichItems"}).Errorf("preventUpdateDuplication error: %+v", err)
1352+
return
1353+
}
1354+
d = append(d, cacheData...)
1355+
if len(updates) > 0 {
1356+
ev, _ := updates[0].(insightsConf.ContentUpdatedEvent)
1357+
err = j.Publisher.PushEvents(ev.Event(), insightsStr, ConfluenceDataSource, contentsStr, envStr, updates)
1358+
}
13501359
default:
13511360
err = fmt.Errorf("unknown confluence event type '%s'", k)
13521361
}
13531362
if err != nil {
13541363
break
13551364
}
13561365
}
1357-
err = j.cacheProvider.Create(j.endpoint, d)
1358-
if err != nil {
1359-
j.log.WithFields(logrus.Fields{"operation": "ConfluenceEnrichItems"}).Errorf("error creating cache for endpoint %s. Error: %+v", j.endpoint, err)
1366+
if len(d) > 0 {
1367+
err = j.cacheProvider.Create(j.endpoint, d)
1368+
if err != nil {
1369+
j.log.WithFields(logrus.Fields{"operation": "ConfluenceEnrichItems"}).Errorf("error creating cache for endpoint %s. Error: %+v", j.endpoint, err)
1370+
}
13601371
}
13611372
} else {
13621373
jsonBytes, err = jsoniter.Marshal(data)
@@ -1529,3 +1540,75 @@ func (j *DSConfluence) AddCacheProvider() {
15291540
j.cacheProvider = *cacheProvider
15301541
j.endpoint = strings.ReplaceAll(strings.TrimPrefix(strings.TrimPrefix(j.URL, "https://"), "http://"), "/", "-")
15311542
}
1543+
1544+
func (j *DSConfluence) cachedCreatedContent(v []interface{}) ([]map[string]interface{}, error) {
1545+
cacheData := make([]map[string]interface{}, 0)
1546+
for _, val := range v {
1547+
content := val.(insightsConf.ContentCreatedEvent).Payload
1548+
id := fmt.Sprintf("%s-%s", "content", val.(insightsConf.ContentCreatedEvent).Payload.ID)
1549+
c := insightsConf.Content{
1550+
ID: content.ID,
1551+
EndpointID: content.EndpointID,
1552+
Space: content.Space,
1553+
ServerURL: content.ServerURL,
1554+
ContentID: content.ContentID,
1555+
ContentURL: content.ContentURL,
1556+
Type: content.Type,
1557+
Title: content.Title,
1558+
Body: content.Body,
1559+
Contributors: content.Contributors,
1560+
Children: content.Children,
1561+
}
1562+
b, err := json.Marshal(c)
1563+
if err != nil {
1564+
return cacheData, err
1565+
}
1566+
contentHash := fmt.Sprintf("%x", sha256.Sum256(b))
1567+
cacheData = append(cacheData, map[string]interface{}{
1568+
"id": id,
1569+
"data": contentHash,
1570+
})
1571+
}
1572+
return cacheData, nil
1573+
}
1574+
1575+
func (j *DSConfluence) preventUpdateDuplication(v []interface{}) ([]interface{}, []map[string]interface{}, error) {
1576+
updatedVals := make([]interface{}, 0, len(v))
1577+
cacheData := make([]map[string]interface{}, 0)
1578+
for _, val := range v {
1579+
content := val.(insightsConf.ContentUpdatedEvent).Payload
1580+
c := insightsConf.Content{
1581+
ID: content.ID,
1582+
EndpointID: content.EndpointID,
1583+
Space: content.Space,
1584+
ServerURL: content.ServerURL,
1585+
ContentID: content.ContentID,
1586+
ContentURL: content.ContentURL,
1587+
Type: content.Type,
1588+
Title: content.Title,
1589+
Body: content.Body,
1590+
Contributors: content.Contributors,
1591+
Children: content.Children,
1592+
}
1593+
b, err := json.Marshal(c)
1594+
if err != nil {
1595+
return updatedVals, cacheData, nil
1596+
}
1597+
contentHash := fmt.Sprintf("%x", sha256.Sum256(b))
1598+
cacheID := fmt.Sprintf("content-%s", content.ID)
1599+
byt, err := j.cacheProvider.GetFileByKey(j.endpoint, cacheID)
1600+
if err != nil {
1601+
return updatedVals, cacheData, nil
1602+
}
1603+
cachedHash := ""
1604+
err = json.Unmarshal(byt, &cachedHash)
1605+
if contentHash != cachedHash {
1606+
updatedVals = append(updatedVals, val)
1607+
cacheData = append(cacheData, map[string]interface{}{
1608+
"id": cacheID,
1609+
"data": contentHash,
1610+
})
1611+
}
1612+
}
1613+
return updatedVals, cacheData, nil
1614+
}

0 commit comments

Comments
 (0)