-
Notifications
You must be signed in to change notification settings - Fork 86
feat(CE) : Add MySQL source connector #890
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
feat(CE) : Add MySQL source connector #890
Conversation
WalkthroughAdds a MySQL source integration: loads the MySQL client in the integrations entrypoint, enables "MysqlDb" in rollout, implements Changes
Sequence Diagram(s)sequenceDiagram
participant App as Multiwoven
participant Client as MysqlDb::Client
participant DB as Sequel (mysql2)
participant IS as information_schema
rect rgba(200,220,255,0.12)
Note over Client,DB: create_connection
Client->>DB: connect(cfg)
DB-->>Client: connection
end
rect rgba(220,200,255,0.12)
Note over Client,IS: discover()
Client->>IS: SELECT from information_schema.columns
IS-->>Client: column rows
Client->>Client: convert_schema → build_streams
Client-->>App: return discovery catalog (streams)
end
rect rgba(255,230,200,0.12)
Note over Client,DB: read(sync_config)
App->>Client: read(sync_config)
Client->>DB: execute SQL (constructed from sync_config)
DB-->>Client: rowset
Client-->>App: emit RecordMessage(s) / state updates
end
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes
Suggested reviewers
Poem
Pre-merge checks and finishing touches❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
integrations/lib/multiwoven/integrations.rb(1 hunks)integrations/lib/multiwoven/integrations/rollout.rb(2 hunks)integrations/lib/multiwoven/integrations/source/mysql_db/client.rb(1 hunks)integrations/lib/multiwoven/integrations/source/mysql_db/config/meta.json(1 hunks)integrations/lib/multiwoven/integrations/source/mysql_db/config/spec.json(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
integrations/lib/multiwoven/integrations/source/mysql_db/client.rb (2)
integrations/lib/multiwoven/integrations/core/utils.rb (1)
handle_exception(74-80)integrations/lib/multiwoven/integrations/core/source_connector.rb (1)
batched_query(28-41)
🔇 Additional comments (10)
integrations/lib/multiwoven/integrations/source/mysql_db/config/spec.json (1)
1-49: LGTM!The connection specification is well-structured with appropriate field types, defaults, and security markings. The password field is correctly marked as
multiwoven_secret, and the port default of 3306 is accurate for MySQL.integrations/lib/multiwoven/integrations/source/mysql_db/client.rb (6)
15-20: LGTM!The connection check correctly validates MySQL connectivity using indifferent access for configuration and provides appropriate success/failure responses.
41-60: LGTM!The read method correctly handles batched queries, maps database rows to record messages, and includes comprehensive error context with sync identifiers.
64-73: LGTM!The connection setup correctly uses Sequel with the mysql2 adapter and provides sensible defaults (port 3306).
75-84: LGTM!The stream building logic correctly handles case-insensitive column names and groups metadata by table, which is essential for cross-platform compatibility.
86-97: LGTM!The schema conversion appropriately handles case variations in column metadata and generates valid JSON schema structures.
99-108: LGTM!The type mapping covers common MySQL data types and safely defaults to "string" for unrecognized types.
integrations/lib/multiwoven/integrations.rb (1)
83-83: LGTM!The MySQL client is correctly loaded alongside other source connectors following the established pattern.
integrations/lib/multiwoven/integrations/rollout.rb (1)
37-37: LGTM!The MySQL connector is correctly registered in both ENABLED_SOURCES and ENABLED_DESTINATIONS arrays, following the same pattern as MariaDB.
Also applies to: 67-67
integrations/lib/multiwoven/integrations/source/mysql_db/config/meta.json (1)
7-8: Categorization is consistent with similar database connectors; no changes needed.Verification confirms that MySQL's "Data Warehouse" category aligns with PostgreSQL and MariaDB, which use the same classification. All traditional OLTP relational databases in the codebase consistently use this category designation, suggesting it represents the system's classification for enterprise-grade databases rather than a misleading categorization specific to MySQL.
integrations/lib/multiwoven/integrations/source/mysql_db/client.rb
Outdated
Show resolved
Hide resolved
2f14e02 to
ae84bf3
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
♻️ Duplicate comments (1)
integrations/lib/multiwoven/integrations/source/mysql_db/client.rb (1)
26-31: Critical: SQL injection vulnerability in database name interpolation.Line 29 directly interpolates
cfg[:database]into the SQL query string, creating a SQL injection vulnerability. A malicious database name could inject arbitrary SQL.Apply this diff to use Sequel's dataset filtering instead:
-query = <<-SQL - SELECT table_name, column_name, data_type, is_nullable - FROM information_schema.columns - WHERE table_schema = '#{cfg[:database]}' - ORDER BY table_name, ordinal_position; -SQL - -results = db.fetch(query).all +results = db[:information_schema__columns] + .where(table_schema: cfg[:database]) + .order(:table_name, :ordinal_position) + .select(:table_name, :column_name, :data_type, :is_nullable) + .all
🧹 Nitpick comments (1)
integrations/lib/multiwoven/integrations/source/mysql_db/client.rb (1)
99-108: Consider adding explicit JSON type mapping.The type mapping covers common MySQL types well, but MySQL's native JSON type defaults to "string". While this works, explicitly mapping it could improve schema accuracy.
Apply this diff to add JSON type support:
def map_type(data_type) case data_type when /int/ then "integer" when /char|text|varchar/ then "string" when /date|time|timestamp/ then "string" when /decimal|float|double/ then "number" when /bool/ then "boolean" + when /json/ then "object" else "string" end end
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (5)
integrations/lib/multiwoven/integrations.rb(1 hunks)integrations/lib/multiwoven/integrations/rollout.rb(2 hunks)integrations/lib/multiwoven/integrations/source/mysql_db/client.rb(1 hunks)integrations/lib/multiwoven/integrations/source/mysql_db/config/meta.json(1 hunks)integrations/lib/multiwoven/integrations/source/mysql_db/config/spec.json(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (4)
- integrations/lib/multiwoven/integrations.rb
- integrations/lib/multiwoven/integrations/source/mysql_db/config/meta.json
- integrations/lib/multiwoven/integrations/source/mysql_db/config/spec.json
- integrations/lib/multiwoven/integrations/rollout.rb
🧰 Additional context used
🧬 Code graph analysis (1)
integrations/lib/multiwoven/integrations/source/mysql_db/client.rb (2)
integrations/lib/multiwoven/integrations/core/utils.rb (1)
handle_exception(74-80)integrations/lib/multiwoven/integrations/core/source_connector.rb (1)
batched_query(28-41)
🔇 Additional comments (3)
integrations/lib/multiwoven/integrations/source/mysql_db/client.rb (3)
1-13: LGTM!Module structure and dependencies are properly set up.
64-73: LGTM!Connection configuration is properly set up with appropriate defaults.
75-84: LGTM!Proper handling of table grouping and stream creation, with defensive key access for case variations.
demo.mp4 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (2)
integrations/lib/multiwoven/integrations/source/mysql_db/config/meta.json (1)
7-7: Consider category alignment.MySQL is typically categorized as a relational database rather than a data warehouse. Consider whether "Database" or "Relational Database System" might be more accurate for the
categoryfield, while keeping "Relational Database" as thesub_category.integrations/lib/multiwoven/integrations/source/mysql_db/client.rb (1)
104-109: Unused helper method.The
queryhelper method is not invoked anywhere in the current implementation. Thereadmethod contains similar inline logic. Consider removing this if it's not needed, or document its intended use case if it's meant for future extensions.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
integrations/lib/multiwoven/integrations/rollout.rb(1 hunks)integrations/lib/multiwoven/integrations/source/mysql_db/client.rb(1 hunks)integrations/lib/multiwoven/integrations/source/mysql_db/config/meta.json(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
integrations/lib/multiwoven/integrations/source/mysql_db/client.rb (2)
integrations/lib/multiwoven/integrations/core/utils.rb (1)
handle_exception(74-80)integrations/lib/multiwoven/integrations/core/source_connector.rb (1)
batched_query(28-41)
🔇 Additional comments (7)
integrations/lib/multiwoven/integrations/rollout.rb (1)
37-37: LGTM! MySQL source connector enabled.The addition of
MysqlDbtoENABLED_SOURCEScorrectly enables the new MySQL source connector in the platform.integrations/lib/multiwoven/integrations/source/mysql_db/client.rb (6)
14-21: Excellent! Resource leak fixed.The addition of the
ensureblock withdb&.disconnectproperly addresses the previously flagged resource leak issue. The connection is now guaranteed to be closed whether the health check succeeds or fails.
24-41: Excellent! SQL injection vulnerability and resource leak both fixed.The implementation now uses Sequel's dataset API (lines 28-32) with proper parameterization instead of string interpolation, eliminating the SQL injection vulnerability. The
ensureblock (lines 39-40) guarantees the connection is properly closed.
69-78: LGTM! Standard Sequel connection setup.The connection configuration is properly parameterized and uses the correct default port (3306) for MySQL.
80-89: LGTM! Schema discovery properly implemented.The method correctly groups columns by table and handles case variations in the result keys (
:table_namevs:TABLE_NAME).
91-102: LGTM! Schema conversion is correct.The method properly builds a JSON schema from database columns, handling key case variations consistently.
111-120: Type mapping is adequate for initial release.The basic type mapping covers common MySQL data types appropriately. For future enhancements, consider adding mappings for additional MySQL types such as
BINARY,JSON,ENUM,SET, and spatial types if needed by users.
| def read(sync_config) | ||
| cfg = sync_config.source.connection_specification.with_indifferent_access | ||
| db = create_connection(cfg) | ||
|
|
||
| sql = sync_config.model.query | ||
| sql = batched_query(sql, sync_config.limit, sync_config.offset) if sync_config.limit || sync_config.offset | ||
|
|
||
| rows = db.fetch(sql) | ||
| rows.map do |row| | ||
| RecordMessage.new(data: row, emitted_at: Time.now.to_i).to_multiwoven_message | ||
| end | ||
|
|
||
| rescue => e | ||
| handle_exception(e, { | ||
| context: "MYSQL:READ:EXCEPTION", | ||
| type: "error", | ||
| sync_id: sync_config.sync_id, | ||
| sync_run_id: sync_config.sync_run_id | ||
| }) | ||
| ensure | ||
| db&.disconnect | ||
| end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
Resource leak fixed; verify query validation.
The ensure block properly addresses the previously flagged resource leak. The connection is guaranteed to be cleaned up after the read operation.
However, the SQL query comes directly from sync_config.model.query (line 48). Ensure that user-provided queries are validated or sanitized elsewhere in the system to prevent SQL injection vulnerabilities.
Run the following script to check if query validation exists:
🏁 Script executed:
#!/bin/bash
# Description: Search for query validation or sanitization logic
# Look for validation of sync_config.model.query or similar patterns
rg -n -C3 --type=ruby "model\.query|validate.*query|sanitize.*query"
# Look for SQL injection prevention patterns
rg -n -C3 --type=ruby "sql.*injection|query.*validation|prepared.*statement"Length of output: 69960
Resource leak fixed; SQL injection validation disabled.
The ensure block properly addresses the previously flagged resource leak—the connection is guaranteed to be cleaned up after the read operation.
However, the SQL injection concern is confirmed: query validation infrastructure exists but is currently disabled with a TODO noting "Enable this once we have query validation implemented for all the connectors." The query comes directly from sync_config.model.query (line 48) and is executed without parameterization. This pattern affects all source connectors—see server/app/controllers/api/v1/models_controller.rb:14-15 and server/app/controllers/api/v1/connectors_controller.rb:11-12 for the commented-out validation hooks. Until query validation is enforced, user-provided queries execute without sanitization, creating SQL injection risk across all database sources.
🤖 Prompt for AI Agents
integrations/lib/multiwoven/integrations/source/mysql_db/client.rb lines 44-65:
the code executes raw SQL from sync_config.model.query without validation or
parameterization (SQL injection risk); fix by re-enabling and invoking the
existing query validation flow before executing (hook into the validation
helpers used by server/app/controllers/... to reject or sanitize unsafe
queries), require/verify that queries are only read-only (SELECT) and do not
contain banned tokens, and where user inputs are expected use
parameterized/prepared statements or binding instead of string interpolation; if
validation fails, raise a clear error and do not call db.fetch; keep the ensure
db&.disconnect to prevent leaks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (4)
integrations/spec/multiwoven/integrations/source/mysql_db/client_spec.rb (4)
5-47: Consider simplifying the sync_config fixture.The fixture includes destination and stream configuration that aren't used by most source connector tests. While this doesn't break functionality, a leaner fixture focused on source-specific fields would improve readability and reduce test maintenance.
80-93: Consider verifying the actual data content.The test confirms that records are returned as an Array of MultiwovenMessage instances but doesn't verify the actual data content matches the mocked response. Adding assertions on the data field would strengthen the test.
Example:
expect(records.first.record.data).to include(col1: 1, col2: "Row1", col3: "Extra1")
148-153: Consider using a more robust name extraction.The array indexing
[-2]assumes a specific class naming convention. If the module structure changes, this test will break.Consider a more explicit approach:
# Assuming the pattern is always Source::<ConnectorName>::Client meta_name = client.class.to_s.split("::").reverse.find { |part| part != "Client" }Or document the assumption clearly if the convention is stable.
155-159: Testing private method existence is overly prescriptive.This test couples the spec to implementation details. If the
querymethod is refactored or renamed while maintaining correct behavior, this test will fail. Consider removing this test or replacing it with behavioral tests that verify what the method accomplishes rather than its existence.
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
integrations/lib/multiwoven/integrations/source/mysql_db/icon.svgis excluded by!**/*.svg
📒 Files selected for processing (1)
integrations/spec/multiwoven/integrations/source/mysql_db/client_spec.rb(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
integrations/spec/multiwoven/integrations/source/mysql_db/client_spec.rb (2)
integrations/lib/multiwoven/integrations/protocol/protocol.rb (1)
from_json(35-38)server/app/interactors/connectors/discover_connector.rb (1)
streams(34-37)
🔇 Additional comments (2)
integrations/spec/multiwoven/integrations/source/mysql_db/client_spec.rb (2)
51-78: LGTM!The check_connection tests provide good coverage of both success and failure scenarios with appropriate mocking and assertions.
114-146: LGTM!The discover tests properly mock the Sequel dataset chain and verify both success and failure scenarios with appropriate error handling expectations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (1)
integrations/lib/multiwoven/integrations/source/mysql_db/client.rb (1)
64-73: Consider adding timeout and SSL/TLS configuration.The basic connection configuration is correct, but consider adding:
- Connection and read timeout values to prevent operations from hanging indefinitely
- SSL/TLS configuration options for secure connections to remote databases
Example enhancement:
def create_connection(cfg) Sequel.connect( adapter: "mysql2", host: cfg[:host], port: cfg[:port] || 3306, user: cfg[:username], password: cfg[:password], database: cfg[:database], connect_timeout: cfg[:connect_timeout] || 30, read_timeout: cfg[:read_timeout] || 30, write_timeout: cfg[:write_timeout] || 30, ssl_mode: cfg[:ssl_mode] ) end
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
integrations/lib/multiwoven/integrations/source/mysql_db/client.rb(1 hunks)integrations/spec/multiwoven/integrations/source/mysql_db/client_spec.rb(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- integrations/spec/multiwoven/integrations/source/mysql_db/client_spec.rb
🧰 Additional context used
🧬 Code graph analysis (1)
integrations/lib/multiwoven/integrations/source/mysql_db/client.rb (2)
integrations/lib/multiwoven/integrations/core/utils.rb (1)
handle_exception(74-80)integrations/lib/multiwoven/integrations/core/source_connector.rb (1)
batched_query(28-41)
🔇 Additional comments (5)
integrations/lib/multiwoven/integrations/source/mysql_db/client.rb (5)
13-20: LGTM: Connection handling is correct.The connection is properly cleaned up in the
ensureblock, and error handling appropriately returns success or failure status.
22-38: LGTM: Schema discovery implementation is correct.The Sequel dataset query prevents SQL injection by using bound parameters, and the connection is properly cleaned up in the
ensureblock.
40-60: LGTM: Read implementation is correct with proper resource cleanup.The connection is properly cleaned up in the
ensureblock, and error handling includes appropriate sync context. The SQL injection concern forsync_config.model.queryhas been previously flagged as a system-wide issue affecting all source connectors.
75-97: LGTM: Schema building logic is correct.The case-insensitive handling of column names (e.g.,
table_name || TABLE_NAME) is defensive coding that accounts for potential variations in how MySQL returns column identifiers across different configurations.
106-115: LGTM: Type mapping is appropriate.The regex-based type mapping covers common MySQL data types and safely defaults to "string" for unknown types.
| def query(db, query) | ||
| rows = db.fetch(query) | ||
| rows.map do |row| | ||
| RecordMessage.new(data: row, emitted_at: Time.now.to_i).to_multiwoven_message | ||
| end | ||
| end |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion | 🟠 Major
Remove unused dead code.
The query method is never called and duplicates logic from the read method. Dead code increases maintenance burden and can cause confusion.
Apply this diff to remove the unused method:
- def query(db, query)
- rows = db.fetch(query)
- rows.map do |row|
- RecordMessage.new(data: row, emitted_at: Time.now.to_i).to_multiwoven_message
- end
- end
-📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| def query(db, query) | |
| rows = db.fetch(query) | |
| rows.map do |row| | |
| RecordMessage.new(data: row, emitted_at: Time.now.to_i).to_multiwoven_message | |
| end | |
| end |
🤖 Prompt for AI Agents
integrations/lib/multiwoven/integrations/source/mysql_db/client.rb around lines
99 to 104 contains an unused duplicate method `query`; remove the entire `def
query(db, query) ... end` block (lines 99–104) from the file, then run a quick
project-wide search to confirm there are no references to `query` and run the
test suite/lint to ensure no breakages.
Description
This PR adds a new MySQL Source Connector to Multiwoven Integrations.
It enables users to connect to a MySQL database, discover schemas, and read data during sync operations.
This connector follows the existing MariaDB connector structure but is specifically adapted for MySQL.
Related Issue
Mysql connector in source #10
Type of Change
How Has This Been Tested?
bundle exec bin/console:Summary by CodeRabbit
New Features
Documentation
Tests