-
Notifications
You must be signed in to change notification settings - Fork 48
fix: Filter replication key items for use_fake_since_parameter
#379
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 2 commits
5f5c18d
1bee702
3966cca
8a8b497
04e447f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -57,6 +57,9 @@ def url_base(self) -> str: | |
| replication_key: str | None = None | ||
| tolerated_http_errors: ClassVar[list[int]] = [] | ||
|
|
||
| # Save the context from the requests so it can be available to the parse_response method | ||
| context: dict | None = None | ||
|
|
||
| @property | ||
| def http_headers(self) -> dict[str, str]: | ||
| """Return the http headers needed.""" | ||
|
|
@@ -142,6 +145,9 @@ def get_url_params( | |
| context: dict | None, | ||
| next_page_token: Any | None, # noqa: ANN401 | ||
| ) -> dict[str, Any]: | ||
| # save the context from the requests so it can be available to the parse_response method | ||
| self.context = context | ||
|
||
|
|
||
cbrammer marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| """Return a dictionary of values to be used in URL parameterization.""" | ||
| params: dict = {"per_page": self.MAX_PER_PAGE} | ||
| if next_page_token: | ||
|
|
@@ -250,7 +256,7 @@ def validate_response(self, response: requests.Response) -> None: | |
|
|
||
| def parse_response(self, response: requests.Response) -> Iterable[dict]: | ||
| """Parse the response and return an iterator of result rows.""" | ||
| # TODO - Split into handle_reponse and parse_response. | ||
| # TODO - Split into handle_response and parse_response. | ||
| if response.status_code in ( | ||
| [*self.tolerated_http_errors, EMPTY_REPO_ERROR_STATUS] | ||
| ): | ||
|
|
@@ -259,16 +265,30 @@ def parse_response(self, response: requests.Response) -> Iterable[dict]: | |
| # Update token rate limit info and loop through tokens if needed. | ||
| self.authenticator.update_rate_limit(response.headers) | ||
|
|
||
| # Get all items from the response | ||
| resp_json = response.json() | ||
|
|
||
| if isinstance(resp_json, list): | ||
| results = resp_json | ||
| elif resp_json.get("items") is not None: | ||
| results = resp_json.get("items") | ||
| else: | ||
| results = [resp_json] | ||
|
|
||
| yield from results | ||
| if not results: | ||
| return | ||
|
|
||
| # Filter items based on replication key's date if needed | ||
| since = self.get_starting_timestamp(self.context) | ||
| filtered_results = [] | ||
| if self.replication_key and self.use_fake_since_parameter and since: | ||
| for item in results: | ||
| item_date = parse(item[self.replication_key]) | ||
| if item_date >= since: | ||
| filtered_results.append(item) | ||
| else: | ||
| filtered_results = results | ||
|
|
||
| yield from filtered_results | ||
|
|
||
| def post_process(self, row: dict, context: dict[str, str] | None = None) -> dict: | ||
| """Add `repo_id` by default to all streams.""" | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.