Skip to content
This repository was archived by the owner on Nov 30, 2021. It is now read-only.

Commit a2258ab

Browse files
Class name for dbplyr::in_schema() changed
Version now 0.2.0; see changelog
1 parent ca1daad commit a2258ab

30 files changed

+248
-115
lines changed

DESCRIPTION

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
Package: redshiftTools
22
Type: Package
33
Title: Redshift Tools
4-
Version: 0.1.2
4+
Version: 0.2.0
55
Authors@R: c(person("Pablo", "Seibelt", email = "pabloseibelt@sicarul.com",
66
role = c("aut", "cre")),person("Russell S.", "Pierce", role = c("ctb"), email = "russell.pierce@zapier.com"))
77
Mantainers@R: person("Pablo", "Seibelt", email = "pabloseibelt@sicarul.com",
@@ -31,6 +31,6 @@ Imports:
3131
Description: Tools to upload data to an Amazon Redshift Database with good performance.
3232
License: MIT + file LICENSE
3333
LazyData: TRUE
34-
RoxygenNote: 7.1.0
34+
RoxygenNote: 7.1.1
3535
Suggests: testthat,
3636
lubridate

NAMESPACE

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ export(rs_table_exists)
2020
export(rs_unload)
2121
export(rs_upsert_table)
2222
export(sanitize_column_names_for_redshift)
23+
export(schema_to_character)
2324
export(spectrum_add_partition)
2425
export(spectrum_drop_partition)
2526
export(spectrum_list_partitions)
@@ -48,6 +49,7 @@ importFrom(dplyr,left_join)
4849
importFrom(dplyr,mutate_)
4950
importFrom(dplyr,pull)
5051
importFrom(dplyr,recode)
52+
importFrom(dplyr,rename)
5153
importFrom(dplyr,select)
5254
importFrom(dplyr,select_)
5355
importFrom(dplyr,tbl)

R/internal.R

Lines changed: 42 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,37 @@
1+
table_parts <- function(x) {
2+
unlist(strsplit(as.character(x), split = ".", fixed = TRUE))
3+
}
4+
15
log_if_verbose <- function(...) {
26
if (isTRUE(getOption("redshiftTools.verbose"))) {
37
message("redshiftTools: ", ...)
48
}
59
}
610

11+
is_schema <- function(table_name) {
12+
return(inherits(table_name, c("ident", "ident_q", "dbplyr_schema")))
13+
}
14+
715
stopifnoschema <- function(table_name) {
8-
assertthat::assert_that("ident" %in% class(table_name), msg = "Table name must be result of dbplyr::in_schema()")
16+
assertthat::assert_that(is_schema(table_name), msg = "Table name must be result of dbplyr::in_schema()")
917
}
1018

1119
warnifnoschema <- function(table_name) {
12-
if (!"ident" %in% class(table_name)) {
20+
if (!is_schema(table_name)) {
1321
warning(glue("No schema specified for {table_name}, will default to using public"))
1422
return(FALSE)
1523
} else {
1624
return(TRUE)
1725
}
1826
}
1927

20-
#' Simple coalece
28+
#' Simple coalesce
2129
#'
22-
#' A simple non-type aware coalece
30+
#' A simple non-type aware coalesce
2331
#'
2432
#' @aliases coalesceifnull %||%
25-
#' @params x left position
26-
#' @params y right position
33+
#' @param x left position
34+
#' @param y right position
2735
coalesceifnull <- function(x, y) {
2836
return(x %||% y)
2937
}
@@ -36,7 +44,7 @@ boto <- reticulate::import("boto", delay_load = TRUE)
3644
read.text = function(pathname) {
3745
if (file.exists(pathname)) {
3846
return (paste(readLines(pathname), collapse="\n"))
39-
}
47+
}
4048
}
4149

4250
check_aws_credentials <- function() {
@@ -45,13 +53,13 @@ check_aws_credentials <- function() {
4553
renviron_path = "/etc/R/Renviron.site"
4654
rprofile_path = ".Rprofile"
4755
aws_key_env_set = Sys.getenv("AWS_ACCESS_KEY_ID") != ""
48-
aws_configure_exists = file.exists(aws_configure_path)
56+
aws_configure_exists = file.exists(aws_configure_path)
4957
aws_credentials_exists = file.exists(aws_credentials_path)
5058
# Does not need checking as both would set env var, keeping it here for completness
5159
renviron_exists = file.exists(renviron_path)
5260
rprofile_exists = file.exists(rprofile_path)
5361
if (aws_key_env_set) {
54-
log_if_verbose("AWS_ACCESS_KEY_ID is set, it will override other aws credentials")
62+
log_if_verbose("AWS_ACCESS_KEY_ID is set, it will override other aws credentials")
5563
} else if (aws_credentials_exists && grepl("aws_access_key_id", read.text(aws_credentials_path))) {
5664
log_if_verbose("aws_access_key_id is set in ~/.aws/credentials, it will overrride other aws credentials")
5765
} else if (aws_configure_exists && grepl("aws_access_key_id", read.text(aws_configure_path))) {
@@ -168,20 +176,37 @@ queryDo <- function(dbcon, query) {
168176
#' @importFrom assertthat assert_that
169177
#' @importFrom DBI dbGetQuery
170178
#' @importFrom whisker whisker.render
179+
#' @importFrom dplyr rename
171180
get_table_schema <- function(dbcon, table) {
172181
assertthat::assert_that(length(table) <= 2)
173-
assertthat::assert_that("character" %in% class(table))
174182
if (is.atomic(table)) {
175183
schema <- "public"
176184
target_table <- table
177185
} else {
178-
schema <- table[1]
179-
target_table <- table[2]
186+
schema <- tolower(as.character(table[1]))
187+
target_table <- tolower(as.character(table[2]))
180188
}
181-
dbGetQuery(dbcon, whisker.render("SELECT *
182-
FROM pg_table_def
183-
WHERE tablename = '{{table_name}}'
184-
AND schemaname = '{{schema}}'", list(table_name = target_table, schema = schema)))
189+
190+
## pg_table_def only contains items that are in the search path
191+
192+
#original_search_path <- DBI::dbGetQuery(dbcon, "show search_path;")$search_path
193+
##dbGetQuery loses the single quotes around user
194+
#original_search_path <- gsub("$user", "'$user'", original_search_path, fixed = TRUE)
195+
#DBI::dbExecute(dbcon, whisker.render("set search_path to {{schema}}, '$user', public", list (schema = schema)))
196+
## really should be in a try block, otherwise we risk messing up the search path
197+
#res <- dbGetQuery(dbcon, whisker.render("SELECT *
198+
#FROM pg_table_def
199+
#WHERE tablename = '{{table_name}}'
200+
#AND schemaname = '{{schema}}'", list(table_name = target_table, schema = schema)))
201+
#DBI::dbExecute(dbcon, whisker.render("set search_path to {{original}}", list (original = original_search_path)))
202+
res <- dbGetQuery(dbcon,
203+
glue::glue_sql(
204+
.con = dbcon,
205+
"select * from information_schema.columns where table_schema = {schema} and table_name = {target_table}"
206+
)) %>%
207+
rename(schemaname = table_schema, tablename = table_name, column = column_name, type = data_type) %>%
208+
arrange(ordinal_position)
209+
return(res)
185210
}
186211

187212
#' Fix the order of columns in d to match the underlying Redshift table
@@ -238,7 +263,7 @@ fix_column_order <- function(d, dbcon, table_name, strict = TRUE) {
238263
#' and the result can't change quickly because Redshift resizes take quite some time.
239264
#'
240265
#' @importFrom memoise memoise timeout
241-
#' @params dbcon A database connection object
266+
#' @param dbcon A database connection object
242267
number_of_slices <- memoise::memoise(function(dbcon) {
243268
message("Getting number of slices from Redshift")
244269
slices <- queryDo(dbcon, "select count(1) from stv_slices")
@@ -283,22 +308,6 @@ make_dot <- function(table, schema = NULL) {
283308
paste0(schema, ".", table)
284309
}
285310

286-
vec_to_dot <- function(x) {
287-
if (length(x) == 1) {
288-
schema <- "public"
289-
table <- x
290-
} else {
291-
assertthat::assert_that(length(x) == 2)
292-
schema <- x[1]
293-
table <- x[2]
294-
}
295-
make_dot(table, schema)
296-
}
297-
298-
dot_to_vec <- function(x) {
299-
strsplit(x, split = ".", fixed = TRUE)[[1]]
300-
}
301-
302311

303312
#' Make Credentials
304313
#'

R/redshift-replace.R

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
#'
66
#' @param data a data frame
77
#' @param dbcon an RPostgres connection to the redshift server
8-
#' @param tableName the name of the table to replace
8+
#' @param table_name the name of the table to replace
99
#' @param split_files optional parameter to specify amount of files to split into. If not specified will look at amount of slices in Redshift to determine an optimal amount.
1010
#' @param bucket the name of the temporary bucket to load the data. Will look for AWS_BUCKET_NAME on environment if not specified.
1111
#' @param region the region of the bucket. Will look for AWS_DEFAULT_REGION on environment if not specified.
@@ -21,15 +21,15 @@
2121
#' host='my-redshift-url.amazon.com', port='5439',
2222
#' user='myuser', password='mypassword',sslmode='require')
2323
#'
24-
#' rs_replace_table(data=a, dbcon=con, tableName='testTable',
24+
#' rs_replace_table(data=a, dbcon=con, table_name='testTable',
2525
#' bucket="my-bucket", split_files=4)
2626
#'
2727
#' }
2828
#' @export
2929
#' @importFrom DBI dbExecute
3030
rs_replace_table <- function(data,
3131
dbcon,
32-
tableName,
32+
table_name,
3333
split_files,
3434
bucket = Sys.getenv("AWS_BUCKET_NAME"),
3535
region = Sys.getenv("AWS_DEFAULT_REGION"),
@@ -49,23 +49,23 @@ rs_replace_table <- function(data,
4949
# all the work. it's not a pure function!
5050
replace <- function(data, dbcon) {
5151
split_files <- min(split_files, nrow(data))
52-
data <- fix_column_order(data, dbcon, table_name = tableName, strict = strict)
52+
data <- fix_column_order(data, dbcon, table_name = table_name, strict = strict)
5353
prefix <- uploadToS3(data, bucket, split_files)
5454
raw_bucket <- paste0(bucket, if (Sys.getenv("ENVIRONMENT") == "production") "" else "-test")
5555
on.exit({
5656
message("Deleting temporary files from S3 bucket")
5757
deletePrefix(prefix, raw_bucket, split_files)
5858
})
5959
message("Truncating target table")
60-
queryDo(dbcon, sprintf("truncate table %s", tableName))
60+
queryDo(dbcon, sprintf("truncate table %s", table_name))
6161
if (remove_quotes) {
6262
query_string <- "copy %s from 's3://%s/%s.' region '%s' truncatecolumns acceptinvchars as '^' escape delimiter '|' removequotes gzip ignoreheader 1 emptyasnull STATUPDATE ON COMPUPDATE ON %s;"
6363
} else {
6464
query_string <- "copy %s from 's3://%s/%s.' region '%s' truncatecolumns acceptinvchars as '^' escape delimiter '|' gzip ignoreheader 1 emptyasnull STATUPDATE ON COMPUPDATE ON %s;"
6565
}
6666
DBI::dbExecute(dbcon, sprintf(
6767
query_string,
68-
tableName,
68+
table_name,
6969
raw_bucket,
7070
prefix,
7171
region,

R/redshift-spectrum.R

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,5 @@
1-
table_parts <- function(table_name) {
2-
return(strsplit(table_name, ".", fixed = TRUE)[[1]])
3-
}
4-
51
globalVariables(c("V1", "values")) # suppresses check note due to NSE
2+
63
#' List Spectrum partitions
74
#'
85
#' Provides character vector of partition names for a given table
@@ -75,7 +72,7 @@ spectrum_drop_partition <- function(dbcon, table_name, part_name, part_value) {
7572
#' @importFrom glue glue
7673
#' @importFrom DBI dbExecute
7774
spectrum_add_partition <- function(dbcon, table_name, part_name, part_value, base_location, quote_partition_value = TRUE) {
78-
stopifnot("ident" %in% class(table_name))
75+
stopifnoschema(table_name)
7976
if (quote_partition_value) {
8077
q <- "'"
8178
} else {

R/redshift-transaction.R

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@
1414
#' .dbcon = rs$con,
1515
#' .function_sequence = list(
1616
#' function(...) { rs_create_table(table_name = "mtcars", ...) },
17-
#' function(...) { rs_upsert_table(tableName = "mtcars", ...) },
18-
#' function(...) { rs_replace_table(tableName = "mtcars", ...) }
17+
#' function(...) { rs_upsert_table(table_name = "mtcars", ...) },
18+
#' function(...) { rs_replace_table(table_name = "mtcars", ...) }
1919
#' )
2020
#' )
2121
#' }
@@ -40,14 +40,18 @@ transaction <- function(.data, .dbcon, .function_sequence) {
4040
TRUE
4141
},
4242
error = function(e) {
43+
# send message now in case we hit another error during ROLLBACK - possible if we lost db connection
4344
message(e$message)
4445
DBI::dbExecute(.dbcon, "ROLLBACK;")
4546
message("Rollback complete")
46-
FALSE
47+
stop(glue("A redshift error occured: {e$message}"))
4748
}
4849
)
49-
if (is.null(result) || !isTRUE(result)) {
50-
stop("A redshift error occured")
50+
if (is.null(result)) {
51+
stop("A redshift error occured, the result of the transaction was NULL - which is unexpected")
52+
}
53+
if (!isTRUE(result)) {
54+
stop("A redshift error occured, the result of the transaction was FALSE - which is unexpected")
5155
}
5256
return(result)
5357
}

R/redshift-upsert.R

Lines changed: 13 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
#'
77
#' @param data a data frame
88
#' @param dbcon an RPostgres connection to the redshift server
9-
#' @param tableName the name of the table to replace
9+
#' @param table_name the name of the table to replace
1010
#' @param split_files optional parameter to specify amount of files to split into. If not specified will look at amount of slices in Redshift to determine an optimal amount.
1111
#' @param keys athis optional vector contains the variables by which to upsert. If not defined, the upsert becomes an append.
1212
#' @param bucket the name of the temporary bucket to load the data. Will look for AWS_BUCKET_NAME on environment if not specified.
@@ -30,19 +30,19 @@
3030
#' host='my-redshift-url.amazon.com', port='5439',
3131
#' user='myuser', password='mypassword',sslmode='require')
3232
#'
33-
#' rs_upsert_table(data=nx, dbcon=con, tableName='testTable',
33+
#' rs_upsert_table(data=nx, dbcon=con, table_name='testTable',
3434
#' bucket="my-bucket", split_files=4, keys=c('a'))
3535
#'
3636
#' }
3737
#' @export
3838
rs_upsert_table <- function(
3939
data,
4040
dbcon,
41-
tableName,
41+
table_name,
4242
keys = NULL,
4343
split_files,
4444
bucket=Sys.getenv("AWS_BUCKET_NAME"),
45-
region=Sys.getenv("AWS_DEFAULT_REGION"),
45+
region=Sys.getenv("AWS_DEFAULT_REGION", unset = "us-east-1"),
4646
access_key=NULL,
4747
secret_key=NULL,
4848
strict = FALSE,
@@ -66,15 +66,19 @@ rs_upsert_table <- function(
6666
raw_bucket <- paste0(bucket, if (Sys.getenv("ENVIRONMENT") == "production") "" else "-test")
6767
split_files <- min(split_files, nrow(data))
6868

69-
data <- fix_column_order(data, dbcon, table_name = tableName, strict = strict)
69+
data <- fix_column_order(data, dbcon, table_name = table_name, strict = strict)
7070
prefix <- uploadToS3(data, bucket, split_files)
7171
on.exit({
7272
message("Deleting temporary files from S3 bucket")
7373
deletePrefix(prefix, raw_bucket, split_files)
7474
})
7575
stageTable <- paste0(sample(letters, 32, replace = TRUE), collapse = "")
7676

77-
DBI::dbExecute(dbcon, sprintf("create temp table %s (like %s)", stageTable, tableName))
77+
if (is_schema(table_name)) {
78+
table_name <- schema_to_character(table_name)
79+
}
80+
81+
DBI::dbExecute(dbcon, sprintf("create temp table %s (like %s)", stageTable, table_name))
7882

7983
message("Copying data from S3 into Redshift")
8084
DBI::dbExecute(dbcon, sprintf(
@@ -88,18 +92,18 @@ rs_upsert_table <- function(
8892

8993
if (!is.null(keys)) {
9094
message("Deleting rows with same keys")
91-
keysCond <- paste(stageTable, ".", keys, "=", tableName, ".", keys, sep = "")
95+
keysCond <- paste(stageTable, ".", keys, "=", table_name, ".", keys, sep = "")
9296
keysWhere <- sub(" and $", "", paste0(keysCond, collapse = "", sep = " and "))
9397
DBI::dbExecute(dbcon, sprintf(
9498
"delete from %s using %s where %s;",
95-
tableName,
99+
table_name,
96100
stageTable,
97101
keysWhere
98102
))
99103
}
100104

101105
message("Insert new rows")
102-
DBI::dbExecute(dbcon, sprintf("insert into %s (select * from %s);", tableName, stageTable))
106+
DBI::dbExecute(dbcon, sprintf("insert into %s (select * from %s);", table_name, stageTable))
103107
DBI::dbExecute(dbcon, sprintf("drop table %s;", stageTable))
104108
},
105109
error = function(e) {

0 commit comments

Comments
 (0)