Skip to content

Commit 2f14e02

Browse files
Add MySQL source connector
1 parent 39fb263 commit 2f14e02

File tree

5 files changed

+182
-0
lines changed

5 files changed

+182
-0
lines changed

integrations/lib/multiwoven/integrations.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,7 @@
8080
require_relative "integrations/source/clickhouse/client"
8181
require_relative "integrations/source/amazon_s3/client"
8282
require_relative "integrations/source/maria_db/client"
83+
require_relative "integrations/source/mysql_db/client"
8384
require_relative "integrations/source/oracle_db/client"
8485
require_relative "integrations/source/databrics_model/client"
8586
require_relative "integrations/source/aws_sagemaker_model/client"

integrations/lib/multiwoven/integrations/rollout.rb

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ module Integrations
3434
Odoo
3535
GoogleDrive
3636
Http
37+
MysqlDb
3738
].freeze
3839

3940
ENABLED_DESTINATIONS = %w[
@@ -63,6 +64,7 @@ module Integrations
6364
Qdrant
6465
PineconeDB
6566
Odoo
67+
MysqlDb
6668
].freeze
6769
end
6870
end
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
# frozen_string_literal: true
2+
3+
require "sequel"
4+
require "multiwoven/integrations/core/source_connector"
5+
require "multiwoven/integrations/protocol/protocol"
6+
require "active_support/core_ext/hash/indifferent_access"
7+
8+
module Multiwoven
9+
module Integrations
10+
module Source
11+
module MysqlDb
12+
class Client < Multiwoven::Integrations::Core::SourceConnector
13+
14+
15+
def check_connection(connection_config)
16+
create_connection(connection_config.with_indifferent_access)
17+
ConnectionStatus.new(status: ConnectionStatusType["succeeded"]).to_multiwoven_message
18+
rescue => e
19+
ConnectionStatus.new(status: ConnectionStatusType["failed"], message: e.message).to_multiwoven_message
20+
end
21+
22+
def discover(connection_config)
23+
cfg = connection_config.with_indifferent_access
24+
db = create_connection(cfg)
25+
26+
query = <<-SQL
27+
SELECT table_name, column_name, data_type, is_nullable
28+
FROM information_schema.columns
29+
WHERE table_schema = '#{cfg[:database]}'
30+
ORDER BY table_name, ordinal_position;
31+
SQL
32+
33+
results = db.fetch(query).all
34+
catalog = Catalog.new(streams: build_streams(results))
35+
catalog.to_multiwoven_message
36+
37+
rescue => e
38+
handle_exception(e, context: "MYSQL:DISCOVER:EXCEPTION", type: "error")
39+
end
40+
41+
def read(sync_config)
42+
cfg = sync_config.source.connection_specification.with_indifferent_access
43+
db = create_connection(cfg)
44+
45+
sql = sync_config.model.query
46+
sql = batched_query(sql, sync_config.limit, sync_config.offset) if sync_config.limit || sync_config.offset
47+
48+
rows = db.fetch(sql)
49+
rows.map do |row|
50+
RecordMessage.new(data: row, emitted_at: Time.now.to_i).to_multiwoven_message
51+
end
52+
53+
rescue => e
54+
handle_exception(e, {
55+
context: "MYSQL:READ:EXCEPTION",
56+
type: "error",
57+
sync_id: sync_config.sync_id,
58+
sync_run_id: sync_config.sync_run_id
59+
})
60+
end
61+
62+
private
63+
64+
def create_connection(cfg)
65+
Sequel.connect(
66+
adapter: "mysql2",
67+
host: cfg[:host],
68+
port: cfg[:port] || 3306,
69+
user: cfg[:username],
70+
password: cfg[:password],
71+
database: cfg[:database]
72+
)
73+
end
74+
75+
def build_streams(records)
76+
records.group_by { |r| r[:table_name] || r[:TABLE_NAME] }.map do |table_name, cols|
77+
schema = convert_schema(cols)
78+
Multiwoven::Integrations::Protocol::Stream.new(
79+
name: table_name.to_s,
80+
action: StreamAction["fetch"],
81+
json_schema: schema
82+
)
83+
end
84+
end
85+
86+
def convert_schema(columns)
87+
properties = {}
88+
89+
columns.each do |col|
90+
col_name = col[:column_name] || col[:COLUMN_NAME]
91+
data_type = col[:data_type] || col[:DATA_TYPE]
92+
93+
properties[col_name] = { type: map_type(data_type) }
94+
end
95+
96+
{ type: "object", properties: properties }
97+
end
98+
99+
def map_type(data_type)
100+
case data_type
101+
when /int/ then "integer"
102+
when /char|text|varchar/ then "string"
103+
when /date|time|timestamp/ then "string"
104+
when /decimal|float|double/ then "number"
105+
when /bool/ then "boolean"
106+
else "string"
107+
end
108+
end
109+
end
110+
end
111+
end
112+
end
113+
end
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
{
2+
"data": {
3+
"name": "mysql_db",
4+
"title": "MySQL",
5+
"connector_type": "source",
6+
"version": "0.1.0",
7+
"category": "Data Warehouse",
8+
"sub_category": "Relational Database",
9+
"documentation_url": "https://dev.mysql.com/doc/",
10+
"github_issue_label": "source-mysql-db",
11+
"icon": "icon.svg",
12+
"license": "MIT",
13+
"release_stage": "alpha",
14+
"support_level": "community",
15+
"tags": ["language:ruby", "multiwoven", "database", "sql", "mysql"]
16+
}
17+
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
{
2+
"documentation_url": "https://dev.mysql.com/doc/",
3+
"stream_type": "dynamic",
4+
"connector_query_type": "raw_sql",
5+
"connection_specification": {
6+
"$schema": "http://json-schema.org/draft-07/schema#",
7+
"title": "MySQL Database",
8+
"type": "object",
9+
"required": ["host", "port", "username", "password", "database"],
10+
"properties": {
11+
"host": {
12+
"description": "The hostname or IP address of the server where the MySQL database is hosted.",
13+
"examples": ["127.0.0.1", "mysql.yourdomain.com","localhost"],
14+
"type": "string",
15+
"title": "Host",
16+
"order": 0
17+
},
18+
"port": {
19+
"description": "The port number on which the MySQL server is listening for connections.",
20+
"examples": [3306],
21+
"type": "integer",
22+
"title": "Port",
23+
"default": 3306,
24+
"order": 1
25+
},
26+
"username": {
27+
"description": "The username used to authenticate and connect to the MySQL database.",
28+
"examples": ["root"],
29+
"type": "string",
30+
"title": "Username",
31+
"order": 2
32+
},
33+
"password": {
34+
"description": "The password corresponding to the username used for authentication.",
35+
"type": "string",
36+
"multiwoven_secret": true,
37+
"title": "Password",
38+
"order": 3
39+
},
40+
"database": {
41+
"description": "The name of the specific database within the MySQL server to connect to.",
42+
"examples": ["test", "analytics", "production_db","mydatabase"],
43+
"type": "string",
44+
"title": "Database",
45+
"order": 4
46+
}
47+
}
48+
}
49+
}

0 commit comments

Comments
 (0)