Skip to content
This repository was archived by the owner on May 24, 2024. It is now read-only.

Commit 3399f44

Browse files
Handle missing merged events (#70)
* handle missing merged events Signed-off-by: Ayman <[email protected]> * clean up Signed-off-by: Ayman <[email protected]> * use schema latest version Signed-off-by: Ayman <[email protected]> --------- Signed-off-by: Ayman <[email protected]> Co-authored-by: Ayman <[email protected]>
1 parent d9aeaae commit 3399f44

File tree

3 files changed

+34
-47
lines changed

3 files changed

+34
-47
lines changed

cmd/github/github.go

Lines changed: 31 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,7 @@ var (
181181

182182
// Publisher - for streaming data to Kinesis
183183
type Publisher interface {
184-
PushEvents(action, source, eventType, subEventType, env string, data []interface{}) (string, error)
184+
PushEvents(action, source, eventType, subEventType, env string, data []interface{}, endpoint string) (string, error)
185185
}
186186

187187
// DSGitHub - DS implementation for GitHub
@@ -5773,6 +5773,7 @@ func (j *DSGitHub) OutputDocs(ctx *shared.Ctx, items []interface{}, docs *[]inte
57735773
jsonBytes []byte
57745774
err error
57755775
)
5776+
endpoint := fmt.Sprintf("%s-%s", j.Org, j.Repo)
57765777
switch j.CurrentCategory {
57775778
case "repository":
57785779
repos, err = j.GetModelDataRepository(ctx, *docs)
@@ -5783,7 +5784,7 @@ func (j *DSGitHub) OutputDocs(ctx *shared.Ctx, items []interface{}, docs *[]inte
57835784
formattedData = append(formattedData, d)
57845785
}
57855786
if len(repos) > 0 {
5786-
_, err = j.Publisher.PushEvents(repos[0].Event(), "insights", GitHubDataSource, "repository", os.Getenv("STAGE"), formattedData)
5787+
_, err = j.Publisher.PushEvents(repos[0].Event(), "insights", GitHubDataSource, "repository", os.Getenv("STAGE"), formattedData, endpoint)
57875788
}
57885789
} else {
57895790
jsonBytes, err = jsoniter.Marshal(repos)
@@ -5800,7 +5801,7 @@ func (j *DSGitHub) OutputDocs(ctx *shared.Ctx, items []interface{}, docs *[]inte
58005801
switch k {
58015802
case "created":
58025803
ev, _ := v[0].(igh.IssueCreatedEvent)
5803-
path, err := j.Publisher.PushEvents(ev.Event(), insightsStr, GitHubDataSource, issuesStr, envStr, v)
5804+
path, err := j.Publisher.PushEvents(ev.Event(), insightsStr, GitHubDataSource, issuesStr, envStr, v, endpoint)
58045805
if err != nil {
58055806
j.log.WithFields(logrus.Fields{"operation": "OutputDocs"}).Errorf("cacheCreatedIssues error: %+v", err)
58065807
return
@@ -5819,7 +5820,7 @@ func (j *DSGitHub) OutputDocs(ctx *shared.Ctx, items []interface{}, docs *[]inte
58195820
path := ""
58205821
if len(updates) > 0 {
58215822
ev, _ := updates[0].(igh.IssueUpdatedEvent)
5822-
path, err = j.Publisher.PushEvents(ev.Event(), insightsStr, GitHubDataSource, issuesStr, envStr, updates)
5823+
path, err = j.Publisher.PushEvents(ev.Event(), insightsStr, GitHubDataSource, issuesStr, envStr, updates, endpoint)
58235824
}
58245825
if len(cacheData) > 0 {
58255826
for _, c := range cacheData {
@@ -5835,35 +5836,35 @@ func (j *DSGitHub) OutputDocs(ctx *shared.Ctx, items []interface{}, docs *[]inte
58355836
}
58365837
if len(updates) > 0 {
58375838
ev, _ := v[0].(igh.IssueClosedEvent)
5838-
_, err = j.Publisher.PushEvents(ev.Event(), insightsStr, GitHubDataSource, issuesStr, envStr, v)
5839+
_, err = j.Publisher.PushEvents(ev.Event(), insightsStr, GitHubDataSource, issuesStr, envStr, v, endpoint)
58395840
}
58405841
case "assignee_added":
58415842
ev, _ := v[0].(igh.IssueAssigneeAddedEvent)
5842-
_, err = j.Publisher.PushEvents(ev.Event(), insightsStr, GitHubDataSource, issuesStr, envStr, v)
5843+
_, err = j.Publisher.PushEvents(ev.Event(), insightsStr, GitHubDataSource, issuesStr, envStr, v, endpoint)
58435844
case "assignee_removed":
58445845
ev, _ := v[0].(igh.IssueAssigneeRemovedEvent)
5845-
_, err = j.Publisher.PushEvents(ev.Event(), insightsStr, GitHubDataSource, issuesStr, envStr, v)
5846+
_, err = j.Publisher.PushEvents(ev.Event(), insightsStr, GitHubDataSource, issuesStr, envStr, v, endpoint)
58465847
case "comment_added":
58475848
ev, _ := v[0].(igh.IssueCommentAddedEvent)
5848-
_, err = j.Publisher.PushEvents(ev.Event(), insightsStr, GitHubDataSource, issuesStr, envStr, v)
5849+
_, err = j.Publisher.PushEvents(ev.Event(), insightsStr, GitHubDataSource, issuesStr, envStr, v, endpoint)
58495850
case "comment_edited":
58505851
ev, _ := v[0].(igh.IssueCommentEditedEvent)
5851-
_, err = j.Publisher.PushEvents(ev.Event(), insightsStr, GitHubDataSource, issuesStr, envStr, v)
5852+
_, err = j.Publisher.PushEvents(ev.Event(), insightsStr, GitHubDataSource, issuesStr, envStr, v, endpoint)
58525853
case "comment_deleted":
58535854
ev, _ := v[0].(igh.IssueCommentDeletedEvent)
5854-
_, err = j.Publisher.PushEvents(ev.Event(), insightsStr, GitHubDataSource, issuesStr, envStr, v)
5855+
_, err = j.Publisher.PushEvents(ev.Event(), insightsStr, GitHubDataSource, issuesStr, envStr, v, endpoint)
58555856
case "reaction_added":
58565857
ev, _ := v[0].(igh.IssueReactionAddedEvent)
5857-
_, err = j.Publisher.PushEvents(ev.Event(), insightsStr, GitHubDataSource, issuesStr, envStr, v)
5858+
_, err = j.Publisher.PushEvents(ev.Event(), insightsStr, GitHubDataSource, issuesStr, envStr, v, endpoint)
58585859
case "reaction_removed":
58595860
ev, _ := v[0].(igh.IssueReactionRemovedEvent)
5860-
_, err = j.Publisher.PushEvents(ev.Event(), insightsStr, GitHubDataSource, issuesStr, envStr, v)
5861+
_, err = j.Publisher.PushEvents(ev.Event(), insightsStr, GitHubDataSource, issuesStr, envStr, v, endpoint)
58615862
case "comment_reaction_added":
58625863
ev, _ := v[0].(igh.IssueCommentReactionAddedEvent)
5863-
_, err = j.Publisher.PushEvents(ev.Event(), insightsStr, GitHubDataSource, issuesStr, envStr, v)
5864+
_, err = j.Publisher.PushEvents(ev.Event(), insightsStr, GitHubDataSource, issuesStr, envStr, v, endpoint)
58645865
case "comment_reaction_removed":
58655866
ev, _ := v[0].(igh.IssueCommentReactionRemovedEvent)
5866-
_, err = j.Publisher.PushEvents(ev.Event(), insightsStr, GitHubDataSource, issuesStr, envStr, v)
5867+
_, err = j.Publisher.PushEvents(ev.Event(), insightsStr, GitHubDataSource, issuesStr, envStr, v, endpoint)
58675868
default:
58685869
err = fmt.Errorf("unknown issue event type '%s'", k)
58695870
}
@@ -5920,7 +5921,7 @@ func (j *DSGitHub) OutputDocs(ctx *shared.Ctx, items []interface{}, docs *[]inte
59205921
switch k {
59215922
case "created":
59225923
ev, _ := v[0].(igh.PullRequestCreatedEvent)
5923-
path, err := j.Publisher.PushEvents(ev.Event(), insightsStr, GitHubDataSource, pullsStr, envStr, v)
5924+
path, err := j.Publisher.PushEvents(ev.Event(), insightsStr, GitHubDataSource, pullsStr, envStr, v, endpoint)
59245925
err = j.cacheCreatedPullrequest(v, path)
59255926
if err != nil {
59265927
j.log.WithFields(logrus.Fields{"operation": "OutputDocs"}).Errorf("cacheCreatedPullrequest error: %+v", err)
@@ -5935,7 +5936,7 @@ func (j *DSGitHub) OutputDocs(ctx *shared.Ctx, items []interface{}, docs *[]inte
59355936
path := ""
59365937
if len(updates) > 0 {
59375938
ev, _ := updates[0].(igh.PullRequestUpdatedEvent)
5938-
path, err = j.Publisher.PushEvents(ev.Event(), insightsStr, GitHubDataSource, pullsStr, envStr, updates)
5939+
path, err = j.Publisher.PushEvents(ev.Event(), insightsStr, GitHubDataSource, pullsStr, envStr, updates, endpoint)
59395940
}
59405941
if len(cacheData) > 0 {
59415942
for _, c := range cacheData {
@@ -5945,46 +5946,32 @@ func (j *DSGitHub) OutputDocs(ctx *shared.Ctx, items []interface{}, docs *[]inte
59455946
}
59465947

59475948
case "closed":
5948-
updates, _, err := j.preventUpdatePullrequestDuplication(v, "closed")
5949-
if err != nil {
5950-
j.log.WithFields(logrus.Fields{"operation": "OutputDocs"}).Errorf("preventUpdatePullrequestDuplication error: %+v", err)
5951-
return
5952-
}
5953-
if len(updates) > 0 {
5954-
ev, _ := updates[0].(igh.PullRequestClosedEvent)
5955-
_, err = j.Publisher.PushEvents(ev.Event(), insightsStr, GitHubDataSource, pullsStr, envStr, updates)
5956-
}
5949+
ev, _ := v[0].(igh.PullRequestClosedEvent)
5950+
_, err = j.Publisher.PushEvents(ev.Event(), insightsStr, GitHubDataSource, pullsStr, envStr, v, endpoint)
59575951
case "merged":
5958-
updates, _, err := j.preventUpdatePullrequestDuplication(v, "merged")
5959-
if err != nil {
5960-
j.log.WithFields(logrus.Fields{"operation": "OutputDocs"}).Errorf("preventUpdatePullrequestDuplication error: %+v", err)
5961-
return
5962-
}
5963-
if len(updates) > 0 {
5964-
ev, _ := updates[0].(igh.PullRequestMergedEvent)
5965-
_, err = j.Publisher.PushEvents(ev.Event(), insightsStr, GitHubDataSource, pullsStr, envStr, updates)
5966-
}
5952+
ev, _ := v[0].(igh.PullRequestMergedEvent)
5953+
_, err = j.Publisher.PushEvents(ev.Event(), insightsStr, GitHubDataSource, pullsStr, envStr, v, endpoint)
59675954
case "assignee_added":
59685955
ev, _ := v[0].(igh.PullRequestAssigneeAddedEvent)
5969-
_, err = j.Publisher.PushEvents(ev.Event(), insightsStr, GitHubDataSource, pullsStr, envStr, v)
5956+
_, err = j.Publisher.PushEvents(ev.Event(), insightsStr, GitHubDataSource, pullsStr, envStr, v, endpoint)
59705957
case "assignee_removed":
59715958
ev, _ := v[0].(igh.PullRequestAssigneeRemovedEvent)
5972-
_, err = j.Publisher.PushEvents(ev.Event(), insightsStr, GitHubDataSource, pullsStr, envStr, v)
5959+
_, err = j.Publisher.PushEvents(ev.Event(), insightsStr, GitHubDataSource, pullsStr, envStr, v, endpoint)
59735960
case "comment_added":
59745961
ev, _ := v[0].(igh.PullRequestCommentAddedEvent)
5975-
_, err = j.Publisher.PushEvents(ev.Event(), insightsStr, GitHubDataSource, pullsStr, envStr, v)
5962+
_, err = j.Publisher.PushEvents(ev.Event(), insightsStr, GitHubDataSource, pullsStr, envStr, v, endpoint)
59765963
case "comment_edited":
59775964
ev, _ := v[0].(igh.PullRequestCommentEditedEvent)
5978-
_, err = j.Publisher.PushEvents(ev.Event(), insightsStr, GitHubDataSource, pullsStr, envStr, v)
5965+
_, err = j.Publisher.PushEvents(ev.Event(), insightsStr, GitHubDataSource, pullsStr, envStr, v, endpoint)
59795966
case "comment_deleted":
59805967
ev, _ := v[0].(igh.PullRequestCommentDeletedEvent)
5981-
_, err = j.Publisher.PushEvents(ev.Event(), insightsStr, GitHubDataSource, pullsStr, envStr, v)
5968+
_, err = j.Publisher.PushEvents(ev.Event(), insightsStr, GitHubDataSource, pullsStr, envStr, v, endpoint)
59825969
case "comment_reaction_added":
59835970
ev, _ := v[0].(igh.PullRequestCommentReactionAddedEvent)
5984-
_, err = j.Publisher.PushEvents(ev.Event(), insightsStr, GitHubDataSource, pullsStr, envStr, v)
5971+
_, err = j.Publisher.PushEvents(ev.Event(), insightsStr, GitHubDataSource, pullsStr, envStr, v, endpoint)
59855972
case "comment_reaction_removed":
59865973
ev, _ := v[0].(igh.PullRequestCommentReactionRemovedEvent)
5987-
_, err = j.Publisher.PushEvents(ev.Event(), insightsStr, GitHubDataSource, pullsStr, envStr, v)
5974+
_, err = j.Publisher.PushEvents(ev.Event(), insightsStr, GitHubDataSource, pullsStr, envStr, v, endpoint)
59885975
/* there are no such events
59895976
case "reaction_added":
59905977
ev, _ := v[0].(igh.PullRequestReactionAddedEvent)
@@ -5995,13 +5982,13 @@ func (j *DSGitHub) OutputDocs(ctx *shared.Ctx, items []interface{}, docs *[]inte
59955982
*/
59965983
case "review_added":
59975984
ev, _ := v[0].(igh.PullRequestReviewAddedEvent)
5998-
_, err = j.Publisher.PushEvents(ev.Event(), insightsStr, GitHubDataSource, pullsStr, envStr, v)
5985+
_, err = j.Publisher.PushEvents(ev.Event(), insightsStr, GitHubDataSource, pullsStr, envStr, v, endpoint)
59995986
case "reviewer_added":
60005987
ev, _ := v[0].(igh.PullRequestReviewerAddedEvent)
6001-
_, err = j.Publisher.PushEvents(ev.Event(), insightsStr, GitHubDataSource, pullsStr, envStr, v)
5988+
_, err = j.Publisher.PushEvents(ev.Event(), insightsStr, GitHubDataSource, pullsStr, envStr, v, endpoint)
60025989
case "reviewer_removed":
60035990
ev, _ := v[0].(igh.PullRequestReviewerRemovedEvent)
6004-
_, err = j.Publisher.PushEvents(ev.Event(), insightsStr, GitHubDataSource, pullsStr, envStr, v)
5991+
_, err = j.Publisher.PushEvents(ev.Event(), insightsStr, GitHubDataSource, pullsStr, envStr, v, endpoint)
60055992
default:
60065993
err = fmt.Errorf("unknown pull request event type '%s'", k)
60075994
}

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ go 1.17
55
require (
66
github.com/LF-Engineering/dev-analytics-libraries v1.1.28
77
github.com/LF-Engineering/insights-datasource-shared v1.5.18-0.20221031165121-cb720cbf98a9
8-
github.com/LF-Engineering/lfx-event-schema v0.1.36
8+
github.com/LF-Engineering/lfx-event-schema v0.1.37
99
github.com/aws/aws-lambda-go v1.28.0
1010
github.com/aws/aws-sdk-go v1.43.22
1111
github.com/google/go-github/v43 v43.0.0

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,8 @@ github.com/LF-Engineering/dev-analytics-libraries v1.1.28/go.mod h1:O+9mOX1nf6qG
4646
github.com/LF-Engineering/insights-datasource-shared v1.5.18-0.20221031165121-cb720cbf98a9 h1:FBZZ1+Znl6t5JZoO9UMUQhF9VsGBI4YIs67U8DfLDEU=
4747
github.com/LF-Engineering/insights-datasource-shared v1.5.18-0.20221031165121-cb720cbf98a9/go.mod h1:9DmFQbC8nnm1C7k+/tDo3Rmqzzx7AzmhPBlFouXaBZ8=
4848
github.com/LF-Engineering/lfx-event-schema v0.1.14/go.mod h1:CfFIZ4mwzo88umf5+KxDQEzqlVkPG7Vx8eLK2oDfWIs=
49-
github.com/LF-Engineering/lfx-event-schema v0.1.36 h1:JshYAFzOVEiDnreGbo63ifAN8aDFtTnbyDCVRYack+s=
50-
github.com/LF-Engineering/lfx-event-schema v0.1.36/go.mod h1:CfFIZ4mwzo88umf5+KxDQEzqlVkPG7Vx8eLK2oDfWIs=
49+
github.com/LF-Engineering/lfx-event-schema v0.1.37 h1:ny46D2NdCXokvJZ01GJcw2RfQM64ousJjaYsrRj5zzg=
50+
github.com/LF-Engineering/lfx-event-schema v0.1.37/go.mod h1:CfFIZ4mwzo88umf5+KxDQEzqlVkPG7Vx8eLK2oDfWIs=
5151
github.com/Microsoft/go-winio v0.5.0/go.mod h1:JPGBdM1cNvN/6ISo+n8V5iA4v8pBzdOpzfwIujj1a84=
5252
github.com/Microsoft/go-winio v0.5.1 h1:aPJp2QD7OOrhO5tQXqQoGSJc+DjDtWTGLOmNyAm6FgY=
5353
github.com/Microsoft/go-winio v0.5.1/go.mod h1:JPGBdM1cNvN/6ISo+n8V5iA4v8pBzdOpzfwIujj1a84=

0 commit comments

Comments
 (0)