Skip to content

Commit cf751ca

Browse files
committed
[receiver/github] add concurrency limits and pull request time filtering
1 parent 320ef1c commit cf751ca

File tree

12 files changed

+431
-40
lines changed

12 files changed

+431
-40
lines changed

.chloggen/gh-scrape-limits.yaml

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
# Use this changelog template to create an entry for release notes.
2+
3+
# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
4+
change_type: enhancement
5+
6+
# The name of the component, or a single word describing the area of concern, (e.g. receiver/filelog)
7+
component: receiver/github
8+
9+
# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
10+
note: Add concurrency limit and pull request filtering to reduce rate limiting
11+
12+
# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
13+
issues: [43388]
14+
15+
# (Optional) One or more lines of additional information to render under the primary note.
16+
# These lines will be padded with 2 spaces and then inserted directly into the document.
17+
# Use pipe (|) for multiline entries.
18+
subtext:
19+
20+
# If your change doesn't affect end users or the exported elements of any package,
21+
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
22+
# Optional: The change log or logs in which this entry should be included.
23+
# e.g. '[user]' or '[user, api]'
24+
# Include 'user' if the change is relevant to end users.
25+
# Include 'api' if there is a change to a library API.
26+
# Default: '[user]'
27+
change_logs: []

receiver/githubreceiver/README.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,8 @@ receivers:
7878
github_org: <myfancyorg>
7979
search_query: "org:<myfancyorg> topic:<o11yalltheway>" # Recommended optional query override, defaults to "{org,user}:<github_org>"
8080
endpoint: "https://selfmanagedenterpriseserver.com" # Optional
81+
concurrency_limit: 50 # Limit to 50 concurrent repos
82+
merged_pr_lookback_days: 30 # Only fetch PRs merged in last 30 days
8183
auth:
8284
authenticator: bearertokenauth/github
8385
service:
@@ -97,6 +99,10 @@ service:
9799

98100
`search_query` (optional): A filter to narrow down repositories. Defaults to `org:<github_org>` (or `user:<username>`). For example, use `repo:<org>/<repo>` to target a specific repository. Any valid GitHub search syntax is allowed.
99101

102+
`concurrency_limit` (optional): Maximum number of repositories to process concurrently. Defaults to 50. Set to 0 for unlimited (not recommended). GitHub enforces a secondary rate limit of 100 concurrent requests across REST and GraphQL APIs, so values above 100 may cause rate limiting.
103+
104+
`merged_pr_lookback_days` (optional): Limits how far back to look for merged pull requests in days. Defaults to 30 days. Set to 0 to fetch all merged PRs (no filtering). This helps reduce API quota usage by focusing on recent data. Open PRs are always fetched regardless of this setting.
105+
100106
`metrics` (optional): Enable or disable metrics scraping. See the [metrics documentation](./documentation.md) for details.
101107

102108
### Scraping

receiver/githubreceiver/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ go 1.24.0
44

55
require (
66
github.com/Khan/genqlient v0.8.1
7+
github.com/cenkalti/backoff/v5 v5.0.3
78
github.com/google/go-cmp v0.7.0
89
github.com/google/go-github/v79 v79.0.0
910
github.com/gorilla/mux v1.8.1
@@ -35,7 +36,6 @@ require (
3536

3637
require (
3738
github.com/beorn7/perks v1.0.1 // indirect
38-
github.com/cenkalti/backoff/v5 v5.0.3 // indirect
3939
github.com/cespare/xxhash/v2 v2.3.0 // indirect
4040
github.com/davecgh/go-spew v1.1.1 // indirect
4141
github.com/ebitengine/purego v0.9.0 // indirect

receiver/githubreceiver/internal/scraper/githubscraper/README.md

Lines changed: 4 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -42,21 +42,16 @@ to prevent abuse and maintain API availability. The following secondary limit is
4242
particularly relevant:
4343

4444
- **Concurrent Requests Limit**: The API allows no more than 100 concurrent
45-
requests. This limit is shared across the REST and GraphQL APIs. Since the
46-
scraper creates a goroutine per repository, having more than 100 repositories
47-
returned by the `search_query` will result in exceeding this limit.
48-
It is recommended to use the `search_query` config option to limit the number of
49-
repositories that are scraped. We recommend one instance of the receiver per
50-
team (note: `team` is not a valid quantifier when searching repositories `topic`
51-
is). Reminder that each instance of the receiver should have its own
52-
corresponding token for authentication as this is what rate limits are tied to.
45+
requests. This limit is shared across the REST and GraphQL APIs. The receiver
46+
includes a configurable `concurrency_limit` (default 50) to stay under this
47+
limit.
5348

5449
In summary, we recommend the following:
5550

5651
- One instance of the receiver per team
5752
- Each instance of the receiver should have its own token
5853
- Leverage `search_query` config option to limit repositories returned to 100 or
59-
less per instance
54+
less per instance (or configure `concurrency_limit` appropriately)
6055
- `collection_interval` should be long enough to avoid rate limiting (see above
6156
formula). A sensible default is `300s`.
6257

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
// Copyright The OpenTelemetry Authors
2+
// SPDX-License-Identifier: Apache-2.0
3+
4+
package githubscraper
5+
6+
import (
7+
"context"
8+
"strings"
9+
"time"
10+
11+
"github.com/cenkalti/backoff/v5"
12+
"go.opentelemetry.io/collector/component"
13+
"go.uber.org/zap"
14+
semconv "go.opentelemetry.io/otel/semconv/v1.37.0"
15+
)
16+
17+
// graphqlCallWithRetry wraps a GraphQL API call with exponential backoff
18+
// retrying on rate limit errors (403, 429) and transient failures
19+
func graphqlCallWithRetry[T any](
20+
ctx context.Context,
21+
logger *zap.Logger,
22+
telemetry component.TelemetrySettings,
23+
scrapeInterval time.Duration,
24+
apiCall func() (*T, error),
25+
) (*T, error) {
26+
expBackoff := &backoff.ExponentialBackOff{
27+
InitialInterval: 1 * time.Second,
28+
RandomizationFactor: backoff.DefaultRandomizationFactor,
29+
Multiplier: backoff.DefaultMultiplier,
30+
MaxInterval: 30 * time.Second,
31+
}
32+
33+
var attempts int64
34+
operation := func() (*T, error) {
35+
attempts++
36+
resp, err := apiCall()
37+
38+
if err == nil {
39+
return resp, nil
40+
}
41+
42+
if !isRetriableError(err) {
43+
return nil, backoff.Permanent(err)
44+
}
45+
46+
// Log retry attempt with warning level
47+
logger.Warn("GitHub API rate limited or temporary failure, retrying",
48+
zap.Int64("attempt", attempts),
49+
zap.Error(err),
50+
)
51+
52+
// Record retry metric via internal telemetry
53+
counter, meterErr := telemetry.MeterProvider.Meter("github.com/open-telemetry/opentelemetry-collector-contrib/receiver/githubreceiver").
54+
Int64Counter("http.request.resend_count")
55+
if meterErr == nil {
56+
counter.Add(ctx, 1)
57+
}
58+
59+
return nil, err
60+
}
61+
62+
// Execute with backoff
63+
result, err := backoff.Retry(ctx, operation,
64+
backoff.WithBackOff(expBackoff),
65+
backoff.WithMaxElapsedTime(scrapeInterval))
66+
if err != nil {
67+
// Max elapsed time exceeded - log error
68+
logger.Error("GitHub API call failed after retries",
69+
zap.Int64("total_attempts", attempts),
70+
zap.Error(err),
71+
)
72+
73+
// Record failure metric
74+
counter, meterErr := telemetry.MeterProvider.Meter("github.com/open-telemetry/opentelemetry-collector-contrib/receiver/githubreceiver").
75+
Int64Counter(string(semconv.HTTPRequestResendCountKey))
76+
if meterErr == nil {
77+
counter.Add(ctx, 1)
78+
}
79+
}
80+
81+
return result, err
82+
}
83+
84+
// isRetriableError determines if an error should trigger a retry
85+
func isRetriableError(err error) bool {
86+
if err == nil {
87+
return false
88+
}
89+
90+
errStr := err.Error()
91+
92+
// GraphQL rate limit errors
93+
if strings.Contains(errStr, "API rate limit exceeded") {
94+
return true
95+
}
96+
97+
// HTTP status code errors
98+
if strings.Contains(errStr, "returned error 403") ||
99+
strings.Contains(errStr, "returned error 429") ||
100+
strings.Contains(errStr, "returned error 502") ||
101+
strings.Contains(errStr, "returned error 503") ||
102+
strings.Contains(errStr, "returned error 504") {
103+
return true
104+
}
105+
106+
// Secondary rate limit
107+
if strings.Contains(errStr, "secondary rate limit") {
108+
return true
109+
}
110+
111+
return false
112+
}

receiver/githubreceiver/internal/scraper/githubscraper/config.go

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@
44
package githubscraper // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/githubreceiver/internal/scraper/githubscraper"
55

66
import (
7+
"errors"
8+
79
"go.opentelemetry.io/collector/config/confighttp"
810

911
"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/githubreceiver/internal"
@@ -19,4 +21,18 @@ type Config struct {
1921
GitHubOrg string `mapstructure:"github_org"`
2022
// SearchQuery is the query to use when defining a custom search for repository data
2123
SearchQuery string `mapstructure:"search_query"`
24+
// ConcurrencyLimit limits the number of concurrent repository processing goroutines
25+
// Default is 50 to stay well under GitHub's 100 concurrent request limit
26+
ConcurrencyLimit int `mapstructure:"concurrency_limit"`
27+
// MergedPRLookbackDays limits how far back to look for merged pull requests
28+
// Default is 30 days. Set to 0 to fetch all merged PRs (backwards compatible)
29+
MergedPRLookbackDays int `mapstructure:"merged_pr_lookback_days"`
30+
}
31+
32+
// Validate validates the configuration
33+
func (cfg *Config) Validate() error {
34+
if cfg.ConcurrencyLimit < 0 {
35+
return errors.New("concurrency_limit must be non-negative")
36+
}
37+
return nil
2238
}

receiver/githubreceiver/internal/scraper/githubscraper/config_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@ func TestConfig(t *testing.T) {
2525
expectedConfig := &Config{
2626
MetricsBuilderConfig: metadata.DefaultMetricsBuilderConfig(),
2727
ClientConfig: clientConfig,
28+
ConcurrencyLimit: 50,
29+
MergedPRLookbackDays: 30,
2830
}
2931

3032
assert.Equal(t, expectedConfig, defaultConfig)

receiver/githubreceiver/internal/scraper/githubscraper/factory.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,8 @@ func (*Factory) CreateDefaultConfig() internal.Config {
3030
return &Config{
3131
MetricsBuilderConfig: metadata.DefaultMetricsBuilderConfig(),
3232
ClientConfig: clientConfig,
33+
ConcurrencyLimit: 50, // Default to 50 concurrent calls
34+
MergedPRLookbackDays: 30, // Default to 30 days
3335
}
3436
}
3537

@@ -41,6 +43,11 @@ func (*Factory) CreateMetricsScraper(
4143
conf := cfg.(*Config)
4244
s := newGitHubScraper(params, conf)
4345

46+
// // Set scrape interval for backoff max elapsed time
47+
// // Default to 30 seconds if not configured
48+
// scrapeInterval := 30 * time.Second
49+
// s.scrapeInterval = scrapeInterval
50+
4451
return scraper.NewMetrics(
4552
s.scrape,
4653
scraper.WithStart(s.start),

receiver/githubreceiver/internal/scraper/githubscraper/github_scraper.go

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,13 @@ import (
2424
var errClientNotInitErr = errors.New("http client not initialized")
2525

2626
type githubScraper struct {
27-
client *http.Client
28-
cfg *Config
29-
settings component.TelemetrySettings
30-
logger *zap.Logger
31-
mb *metadata.MetricsBuilder
32-
rb *metadata.ResourceBuilder
27+
client *http.Client
28+
cfg *Config
29+
settings component.TelemetrySettings
30+
logger *zap.Logger
31+
mb *metadata.MetricsBuilder
32+
rb *metadata.ResourceBuilder
33+
scrapeInterval time.Duration
3334
}
3435

3536
func (ghs *githubScraper) start(ctx context.Context, host component.Host) (err error) {
@@ -95,6 +96,15 @@ func (ghs *githubScraper) scrape(ctx context.Context) (pmetric.Metrics, error) {
9596

9697
ghs.mb.RecordVcsRepositoryCountDataPoint(now, int64(count))
9798

99+
// Create semaphore for concurrency limiting
100+
var sem chan struct{}
101+
if ghs.cfg.ConcurrencyLimit > 0 {
102+
sem = make(chan struct{}, ghs.cfg.ConcurrencyLimit)
103+
ghs.logger.Debug("Using concurrency limit for repository processing",
104+
zap.Int("limit", ghs.cfg.ConcurrencyLimit),
105+
zap.Int("total_repos", len(repos)))
106+
}
107+
98108
// Get the ref (branch) count (future branch data) for each repo and record
99109
// the given metrics
100110
var wg sync.WaitGroup
@@ -107,8 +117,24 @@ func (ghs *githubScraper) scrape(ctx context.Context) (pmetric.Metrics, error) {
107117
trunk := repo.DefaultBranchRef.Name
108118
now := now
109119

120+
// Acquire semaphore slot before launching goroutine
121+
if sem != nil {
122+
select {
123+
case sem <- struct{}{}:
124+
// Acquired slot, continue
125+
case <-ctx.Done():
126+
// Context cancelled, skip remaining repos
127+
wg.Done()
128+
continue
129+
}
130+
}
131+
110132
go func() {
111133
defer wg.Done()
134+
// Release semaphore slot when done
135+
if sem != nil {
136+
defer func() { <-sem }()
137+
}
112138

113139
branches, count, err := ghs.getBranches(ctx, genClient, name, trunk)
114140
if err != nil {

receiver/githubreceiver/internal/scraper/githubscraper/github_scraper_test.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,7 @@ func TestScrape(t *testing.T) {
9191
},
9292
prResponse: prResponse{
9393
prs: []getPullRequestDataRepositoryPullRequestsPullRequestConnection{
94+
// First call for OPEN PRs
9495
{
9596
PageInfo: getPullRequestDataRepositoryPullRequestsPullRequestConnectionPageInfo{
9697
HasNextPage: false,
@@ -99,6 +100,14 @@ func TestScrape(t *testing.T) {
99100
{
100101
Merged: false,
101102
},
103+
},
104+
},
105+
// Second call for MERGED PRs
106+
{
107+
PageInfo: getPullRequestDataRepositoryPullRequestsPullRequestConnectionPageInfo{
108+
HasNextPage: false,
109+
},
110+
Nodes: []PullRequestNode{
102111
{
103112
Merged: true,
104113
},
@@ -162,7 +171,10 @@ func TestScrape(t *testing.T) {
162171
server := httptest.NewServer(tc.server)
163172
defer server.Close()
164173

165-
cfg := &Config{MetricsBuilderConfig: metadata.DefaultMetricsBuilderConfig()}
174+
cfg := &Config{
175+
MetricsBuilderConfig: metadata.DefaultMetricsBuilderConfig(),
176+
MergedPRLookbackDays: 0, // Fetch all PRs for test
177+
}
166178

167179
ghs := newGitHubScraper(receivertest.NewNopSettings(metadata.Type), cfg)
168180
ghs.cfg.GitHubOrg = "open-telemetry"

0 commit comments

Comments
 (0)