Feature/Data Wrangler API Changes.#353
Feature/Data Wrangler API Changes.#353srinivasan-acn wants to merge 9 commits intoGoogleCloudDataproc:mainfrom
Conversation
…-accenture/dataproc-jupyter-plugin-fork" This reverts commit bcea304, reversing changes made to da15ffc.
- Update PreviewController to extract filter, group_by, and sort arguments - Pass extended parameters to bq_client.bigquery_preview_data - Support multi-value arguments for filters and aggregations
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly upgrades the Data Wrangler's BigQuery data preview feature. By transitioning from direct API calls to the BigQuery Jobs API, it introduces advanced data manipulation options such as filtering, grouping, aggregation, and sorting. This change empowers users with greater control over how they view and analyze their BigQuery data directly within the Data Wrangler interface, enhancing its utility for data exploration. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Changelog
Activity
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
The pull request refactors the bigquery_preview_data method to dynamically construct and execute BigQuery SQL queries using the google.cloud.bigquery client, replacing a direct HTTP API call, and adds support for filtering, grouping, aggregation, and sorting. However, the review identifies critical issues including SQL injection vulnerabilities due to direct parameter interpolation, bugs in SQL query construction for aggregations and count queries, a runtime AttributeError because self.bqclient is uninitialized, and a performance issue from using a synchronous BigQuery client in an async method. Additionally, the review points out potential bugs related to mismatched input list lengths for filters and aggregations, suggests improving code readability by extracting repeated conditions, and recommends stripping whitespace from group_by fields for robustness.
| table_ref = f"`{project_id}.{dataset_id}.{table_id}`" | ||
|
|
||
| # Construct the SQL query with optional filtering, grouping, and aggregation | ||
| if ( | ||
| group_by | ||
| and aggregation_fields | ||
| and aggregation_operators | ||
| and len(aggregation_fields) > 0 | ||
| and len(aggregation_operators) > 0 | ||
| ): | ||
| aggregations_query = "" | ||
| for i, field in enumerate(aggregation_fields): | ||
| op = ( | ||
| aggregation_operators[i] | ||
| if i < len(aggregation_operators) | ||
| else "none" | ||
| ) | ||
| if op != "none": | ||
| aggregations_query += ( | ||
| f"{op.upper()}(`{field}`) AS `{field}_{op}`" | ||
| ) | ||
| sql_query = f"SELECT {group_by},{aggregations_query} FROM {table_ref}" | ||
| else: | ||
| sql_query = f"SELECT * FROM {table_ref}" | ||
|
|
||
| conditions = [] | ||
| query_params = [] | ||
|
|
||
| # Adding filtering conditions if provided | ||
| if filter_fields and len(filter_fields) > 0: | ||
| for i, field in enumerate(filter_fields): | ||
| op = filter_operators[i] if i < len(filter_operators) else "equals" | ||
| val = filter_vals[i] if i < len(filter_vals) else None | ||
|
|
||
| if val is not None and val != "": | ||
| param_name = f"filterVal{i}" | ||
|
|
||
| if op == "equals": | ||
| conditions.append(f"`{field}` = @{param_name}") | ||
| query_params.append( | ||
| bigquery.ScalarQueryParameter(param_name, "STRING", val) | ||
| ) | ||
| elif op == "contains": | ||
| conditions.append( | ||
| f"CONTAINS_SUBSTR(CAST(`{field}` AS STRING), @{param_name})" | ||
| ) | ||
| query_params.append( | ||
| bigquery.ScalarQueryParameter(param_name, "STRING", val) | ||
| ) | ||
|
|
||
| if conditions: | ||
| sql_query += f" WHERE {' AND '.join(conditions)}" | ||
|
|
||
| # Adding grouping and aggregation conditions if provided | ||
| if ( | ||
| group_by | ||
| and aggregation_fields | ||
| and aggregation_operators | ||
| and len(aggregation_fields) > 0 | ||
| and len(aggregation_operators) > 0 | ||
| ): | ||
| group_by_fields = [f"`{field}`" for field in group_by.split(",")] | ||
| sql_query += f" GROUP BY {', '.join(group_by_fields)}" | ||
|
|
||
| # Constructing count query to retrive total rows for pagination | ||
| if ( | ||
| group_by | ||
| and aggregation_fields | ||
| and aggregation_operators | ||
| and len(aggregation_fields) > 0 | ||
| and len(aggregation_operators) > 0 | ||
| ): | ||
| sql_count_query = ( | ||
| f"SELECT COUNT(*) as total_count FROM ({sql_query}) AS subquery" | ||
| ) | ||
| else: | ||
| sql_count_query = sql_query.replace( | ||
| f"SELECT * FROM {table_ref}", | ||
| f"SELECT COUNT(*) as total_count FROM {table_ref}", | ||
| ) | ||
|
|
||
| # Adding sorting if provided | ||
| if sort_field and sort_dir: | ||
| direction = "DESC" if sort_dir.lower() == "desc" else "ASC" | ||
| sql_query = sql_query + f"ORDER BY {sort_field} {direction}" |
There was a problem hiding this comment.
The bigquery_preview_data method constructs SQL queries by directly interpolating user-supplied parameters into the query string, leading to a critical SQL injection vulnerability. Parameters like group_by, aggregation_fields, filter_fields, and sort_field are used without proper validation or escaping. Attackers could close identifiers and inject arbitrary SQL commands. To remediate this, use parameterized queries for all values, validate identifiers against an allow-list or strict regex, and properly escape user-supplied identifiers. Additionally, the construction of aggregation queries has bugs that will lead to invalid SQL: multiple aggregations are concatenated without a comma separator, and an empty aggregations_query can result in a trailing comma in the SELECT statement, causing query execution to fail.
| sql_count_query = sql_query.replace( | ||
| f"SELECT * FROM {table_ref}", | ||
| f"SELECT COUNT(*) as total_count FROM {table_ref}", | ||
| ) |
There was a problem hiding this comment.
The logic for constructing the sql_count_query in the else block is buggy. It uses sql_query.replace(...), assuming sql_query is exactly SELECT * FROM {table_ref}. However, a WHERE clause might have been added to sql_query earlier, causing the replace to fail. This results in sql_count_query being a SELECT * query instead of a SELECT COUNT(*) query, leading to incorrect total row counts when filters are applied without grouping.
else:
from_and_where_clauses = sql_query[sql_query.find(' FROM '):]
sql_count_query = f"SELECT COUNT(*) as total_count{from_and_where_clauses}"| query_job = self.bqclient.query(sql_query_with_limit, job_config=job_config) | ||
| results = query_job.result() | ||
| # Execute the count query to get total rows | ||
| count_query_job = self.bqclient.query( | ||
| sql_count_query, job_config=job_config | ||
| ) | ||
| count_results = count_query_job.result() |
There was a problem hiding this comment.
There are two critical issues here:
AttributeError: The code callsself.bqclient.query(...), butself.bqclientis not initialized anywhere in theClientclass. This will cause the program to crash at runtime.- Blocking I/O: The
bigquery_preview_datamethod isasync, but it's using the synchronousgoogle.cloud.bigqueryclient. This will block the asyncio event loop, severely impacting performance.
You should initialize an async BigQuery client (from google.cloud.bigquery.aio) and use it to perform the queries with await.
| # Adding sorting if provided | ||
| if sort_field and sort_dir: | ||
| direction = "DESC" if sort_dir.lower() == "desc" else "ASC" | ||
| sql_query = sql_query + f"ORDER BY {sort_field} {direction}" |
There was a problem hiding this comment.
The sort_field parameter is used directly in the ORDER BY clause, which is a potential SQL injection vulnerability. Although the BigQuery client might prevent some forms of attacks, it's best practice to not concatenate user-provided strings into SQL queries directly. You should quote the sort_field to ensure it's treated as a column identifier.
sql_query = sql_query + f"ORDER BY `{sort_field}` {direction}"| for i, field in enumerate(filter_fields): | ||
| op = filter_operators[i] if i < len(filter_operators) else "equals" | ||
| val = filter_vals[i] if i < len(filter_vals) else None |
There was a problem hiding this comment.
The code assumes that filter_fields, filter_operators, and filter_vals lists have the same length. If filter_fields is longer, it falls back to default values for the operator and value. This can hide bugs from the client-side and lead to unexpected behavior. It would be more robust to check that the lists have the same length and raise an error or log a warning if they don't. This same issue exists for aggregation_fields and aggregation_operators.
| if ( | ||
| group_by | ||
| and aggregation_fields | ||
| and aggregation_operators | ||
| and len(aggregation_fields) > 0 | ||
| and len(aggregation_operators) > 0 | ||
| ): |
There was a problem hiding this comment.
The condition group_by and aggregation_fields and aggregation_operators and len(aggregation_fields) > 0 and len(aggregation_operators) > 0 is repeated multiple times in this method (here, and for constructing the SELECT and COUNT queries). This makes the code harder to read and maintain. Consider extracting this condition into a boolean variable, for example is_grouped_query, and reusing it.
| and len(aggregation_fields) > 0 | ||
| and len(aggregation_operators) > 0 | ||
| ): | ||
| group_by_fields = [f"`{field}`" for field in group_by.split(",")] |
There was a problem hiding this comment.
Using group_by.split(',') to get the fields for the GROUP BY clause can be brittle. If the field names have spaces around the comma (e.g., "field1, field2"), the split will result in ['field1', ' field2'], which can lead to issues. It's safer to strip whitespace from each field.
| group_by_fields = [f"`{field}`" for field in group_by.split(",")] | |
| group_by_fields = [f"`{field.strip()}`" for field in group_by.split(",")] |
feature: implemented BigQuery Jobs API for Data Wrangler.