Skip to content

Feature/Data Wrangler API Changes.#353

Open
srinivasan-acn wants to merge 9 commits intoGoogleCloudDataproc:mainfrom
Shubha-accenture:feature/table-preview
Open

Feature/Data Wrangler API Changes.#353
srinivasan-acn wants to merge 9 commits intoGoogleCloudDataproc:mainfrom
Shubha-accenture:feature/table-preview

Conversation

@srinivasan-acn
Copy link
Copy Markdown
Contributor

feature: implemented BigQuery Jobs API for Data Wrangler.

  • Refactored PreviewController to initiate BigQuery Jobs

@gemini-code-assist
Copy link
Copy Markdown
Contributor

Summary of Changes

Hello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly upgrades the Data Wrangler's BigQuery data preview feature. By transitioning from direct API calls to the BigQuery Jobs API, it introduces advanced data manipulation options such as filtering, grouping, aggregation, and sorting. This change empowers users with greater control over how they view and analyze their BigQuery data directly within the Data Wrangler interface, enhancing its utility for data exploration.

Highlights

  • BigQuery Jobs API Integration: The Data Wrangler's BigQuery preview functionality has been refactored to utilize the BigQuery Jobs API, enabling more robust data manipulation capabilities.
  • Enhanced Data Preview Parameters: The bigquery_preview_data endpoint now supports additional parameters for filtering, grouping, aggregation, and sorting, allowing users to customize their data views.
  • Dynamic SQL Query Construction: The backend service now dynamically constructs SQL queries based on user-provided parameters, including WHERE, GROUP BY, ORDER BY, and aggregation functions.
  • Pagination Support: Pagination has been implemented for BigQuery data previews, including a count query to determine the total number of rows, improving performance and user experience for large datasets.

🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console.

Changelog
  • dataproc_jupyter_plugin/controllers/bigquery.py
    • Modified the bigquery_preview_data method to retrieve and pass new query parameters (filter, group by, aggregation, sort) to the BigQuery service.
  • dataproc_jupyter_plugin/services/bigquery.py
    • Imported google.cloud.bigquery, json, and datetime modules.
    • Refactored the bigquery_preview_data method to use the google.cloud.bigquery client for executing queries.
    • Implemented logic to dynamically construct SQL queries based on provided filter, group by, aggregation, and sort parameters.
    • Added pagination support using LIMIT and OFFSET clauses and a separate count query to determine total rows.
    • Included transformation of datetime objects in query results to ISO format for consistent frontend display.
Activity
  • No specific activity (comments, reviews, etc.) was provided in the context for this pull request.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

The pull request refactors the bigquery_preview_data method to dynamically construct and execute BigQuery SQL queries using the google.cloud.bigquery client, replacing a direct HTTP API call, and adds support for filtering, grouping, aggregation, and sorting. However, the review identifies critical issues including SQL injection vulnerabilities due to direct parameter interpolation, bugs in SQL query construction for aggregations and count queries, a runtime AttributeError because self.bqclient is uninitialized, and a performance issue from using a synchronous BigQuery client in an async method. Additionally, the review points out potential bugs related to mismatched input list lengths for filters and aggregations, suggests improving code readability by extracting repeated conditions, and recommends stripping whitespace from group_by fields for robustness.

Comment on lines +160 to +244
table_ref = f"`{project_id}.{dataset_id}.{table_id}`"

# Construct the SQL query with optional filtering, grouping, and aggregation
if (
group_by
and aggregation_fields
and aggregation_operators
and len(aggregation_fields) > 0
and len(aggregation_operators) > 0
):
aggregations_query = ""
for i, field in enumerate(aggregation_fields):
op = (
aggregation_operators[i]
if i < len(aggregation_operators)
else "none"
)
if op != "none":
aggregations_query += (
f"{op.upper()}(`{field}`) AS `{field}_{op}`"
)
sql_query = f"SELECT {group_by},{aggregations_query} FROM {table_ref}"
else:
sql_query = f"SELECT * FROM {table_ref}"

conditions = []
query_params = []

# Adding filtering conditions if provided
if filter_fields and len(filter_fields) > 0:
for i, field in enumerate(filter_fields):
op = filter_operators[i] if i < len(filter_operators) else "equals"
val = filter_vals[i] if i < len(filter_vals) else None

if val is not None and val != "":
param_name = f"filterVal{i}"

if op == "equals":
conditions.append(f"`{field}` = @{param_name}")
query_params.append(
bigquery.ScalarQueryParameter(param_name, "STRING", val)
)
elif op == "contains":
conditions.append(
f"CONTAINS_SUBSTR(CAST(`{field}` AS STRING), @{param_name})"
)
query_params.append(
bigquery.ScalarQueryParameter(param_name, "STRING", val)
)

if conditions:
sql_query += f" WHERE {' AND '.join(conditions)}"

# Adding grouping and aggregation conditions if provided
if (
group_by
and aggregation_fields
and aggregation_operators
and len(aggregation_fields) > 0
and len(aggregation_operators) > 0
):
group_by_fields = [f"`{field}`" for field in group_by.split(",")]
sql_query += f" GROUP BY {', '.join(group_by_fields)}"

# Constructing count query to retrive total rows for pagination
if (
group_by
and aggregation_fields
and aggregation_operators
and len(aggregation_fields) > 0
and len(aggregation_operators) > 0
):
sql_count_query = (
f"SELECT COUNT(*) as total_count FROM ({sql_query}) AS subquery"
)
else:
sql_count_query = sql_query.replace(
f"SELECT * FROM {table_ref}",
f"SELECT COUNT(*) as total_count FROM {table_ref}",
)

# Adding sorting if provided
if sort_field and sort_dir:
direction = "DESC" if sort_dir.lower() == "desc" else "ASC"
sql_query = sql_query + f"ORDER BY {sort_field} {direction}"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

security-critical critical

The bigquery_preview_data method constructs SQL queries by directly interpolating user-supplied parameters into the query string, leading to a critical SQL injection vulnerability. Parameters like group_by, aggregation_fields, filter_fields, and sort_field are used without proper validation or escaping. Attackers could close identifiers and inject arbitrary SQL commands. To remediate this, use parameterized queries for all values, validate identifiers against an allow-list or strict regex, and properly escape user-supplied identifiers. Additionally, the construction of aggregation queries has bugs that will lead to invalid SQL: multiple aggregations are concatenated without a comma separator, and an empty aggregations_query can result in a trailing comma in the SELECT statement, causing query execution to fail.

Comment on lines +236 to +239
sql_count_query = sql_query.replace(
f"SELECT * FROM {table_ref}",
f"SELECT COUNT(*) as total_count FROM {table_ref}",
)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

The logic for constructing the sql_count_query in the else block is buggy. It uses sql_query.replace(...), assuming sql_query is exactly SELECT * FROM {table_ref}. However, a WHERE clause might have been added to sql_query earlier, causing the replace to fail. This results in sql_count_query being a SELECT * query instead of a SELECT COUNT(*) query, leading to incorrect total row counts when filters are applied without grouping.

            else:
                from_and_where_clauses = sql_query[sql_query.find(' FROM '):]
                sql_count_query = f"SELECT COUNT(*) as total_count{from_and_where_clauses}"

Comment on lines +257 to +263
query_job = self.bqclient.query(sql_query_with_limit, job_config=job_config)
results = query_job.result()
# Execute the count query to get total rows
count_query_job = self.bqclient.query(
sql_count_query, job_config=job_config
)
count_results = count_query_job.result()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

critical

There are two critical issues here:

  1. AttributeError: The code calls self.bqclient.query(...), but self.bqclient is not initialized anywhere in the Client class. This will cause the program to crash at runtime.
  2. Blocking I/O: The bigquery_preview_data method is async, but it's using the synchronous google.cloud.bigquery client. This will block the asyncio event loop, severely impacting performance.

You should initialize an async BigQuery client (from google.cloud.bigquery.aio) and use it to perform the queries with await.

# Adding sorting if provided
if sort_field and sort_dir:
direction = "DESC" if sort_dir.lower() == "desc" else "ASC"
sql_query = sql_query + f"ORDER BY {sort_field} {direction}"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

The sort_field parameter is used directly in the ORDER BY clause, which is a potential SQL injection vulnerability. Although the BigQuery client might prevent some forms of attacks, it's best practice to not concatenate user-provided strings into SQL queries directly. You should quote the sort_field to ensure it's treated as a column identifier.

                sql_query = sql_query + f"ORDER BY `{sort_field}` {direction}"

Comment on lines +190 to +192
for i, field in enumerate(filter_fields):
op = filter_operators[i] if i < len(filter_operators) else "equals"
val = filter_vals[i] if i < len(filter_vals) else None
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The code assumes that filter_fields, filter_operators, and filter_vals lists have the same length. If filter_fields is longer, it falls back to default values for the operator and value. This can hide bugs from the client-side and lead to unexpected behavior. It would be more robust to check that the lists have the same length and raise an error or log a warning if they don't. This same issue exists for aggregation_fields and aggregation_operators.

Comment on lines +214 to +220
if (
group_by
and aggregation_fields
and aggregation_operators
and len(aggregation_fields) > 0
and len(aggregation_operators) > 0
):
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The condition group_by and aggregation_fields and aggregation_operators and len(aggregation_fields) > 0 and len(aggregation_operators) > 0 is repeated multiple times in this method (here, and for constructing the SELECT and COUNT queries). This makes the code harder to read and maintain. Consider extracting this condition into a boolean variable, for example is_grouped_query, and reusing it.

and len(aggregation_fields) > 0
and len(aggregation_operators) > 0
):
group_by_fields = [f"`{field}`" for field in group_by.split(",")]
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Using group_by.split(',') to get the fields for the GROUP BY clause can be brittle. If the field names have spaces around the comma (e.g., "field1, field2"), the split will result in ['field1', ' field2'], which can lead to issues. It's safer to strip whitespace from each field.

Suggested change
group_by_fields = [f"`{field}`" for field in group_by.split(",")]
group_by_fields = [f"`{field.strip()}`" for field in group_by.split(",")]

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants