Github project : https://github.com/saagie/example-R-querying-impala
Connect with ODBC
Dependencies
R package
DBI: Standard database interface
odbc: Connect to ODBC databases using DBI
dplyr: Data manipulation library
dbplyr: Converts data manipulation written in R to SQL
implyr: Same as above but specific for Impala
getPass: Library to hide password typed in Rstudio notebook
If you are using this on the Saagie platform, all dependencies and drivers should already be installed. If not, or if you want to connect from outside of the plateform, read the installation guide at the end of the odbc article.
Authentification without Kerberos
Parameters
- Driver: Default Cloudera ODBC Driver for Impala 64-bit on the platform. The name of the driver to use (Default name on Windows is usually "Cloudera ODBC Driver for Impala", Default name on Linux is usually "Cloudera ODBC Driver for Impala 64-bit")
- Host: Ip or hostname of the Impala database. You can find it under Impala > External Connections > Host. It has the format dn1.pX.company.prod.saagie.io.
- Port: Default is 21050
- Schema: Schema in which to execute the queries
Additional parameters for authentification
- AuthMech: The authentification mechanism, use 3 for authentification with user / password
- UseSASL: Simple Authentification and Security Layer, use 1 for authentification with user / password
- UID: Your Saagie username on the platform
- PWD: Your Saagie password on the platform
Code example
library(DBI)
con <- dbConnect(odbc::odbc(),
Driver = "Cloudera ODBC Driver for Impala 64-bit",
Host = "dn1.pX.company.prod.saagie.io",
Port = 21050,
Schema = "default",
AuthMech = 3,
UseSASL = 1,
UID = "user",
PWD = "password")
Impala over SSL
If your Impala is secured with SSL, you have to add the following parameters to your command:
- SSL=1 → Mandatory. The client will communicate over SSL to the server.
- AllowSelfSignedServerCert=1 → Optional. To allow authentication using self-signed certificates that have not been added to the list of
trusted certificates. NB : The certificate chain file is not yet available from Saagie's containers. - AllowHostNameCNMismatch=1 → Optional. To allow the common name of a CA-issued SSL certificate to not match the host name of the Impala server.
Authentification with Kerberos
Parameters
- Driver: Default Cloudera ODBC Driver for Impala 64-bit on the platform. The name of the driver to use (Default name on Windows is usually "Cloudera ODBC Driver for Impala", Default name on Linux is usually "Cloudera ODBC Driver for Impala 64-bit")
- Host: Ip or hostname of the Impala database. Default is dn1.pX.company.prod.saagie.io
- Port: Default is 21050
- Schema: Schema in which to execute the queries
Additional parameters for authentification
- AuthMech: The authentification mechanism, use 1 for authentification with Kerberos ticket
- User: Your Saagie username on the platform
- Password: Your Saagie password on the platform
Code example
library(DBI)
library(getPass)
# Method 1 (interactive) : Use in Rstudio. Interactive pop up to enter password
system('kinit user',input=getPass('Enter your password: '))
# Method 2 (scripts) : Use outside of Rstudio.
# Password is written in command line or stored in a environment variable
# Uncomment next line to use
# system('echo password | kinit user')
con <- dbConnect(odbc::odbc(),
Driver = "Cloudera ODBC Driver for Impala 64-bit",
Host = "dn1.pX.company.prod.saagie.io",
Port = 21050,
Schema = "default",
AuthMech = 1)
Query examples
Show Tables
# If you only see the first letter of each table, check the section below
# named : "Fix display names of tables in Rstudio and dbListTables"
# Another workaround is to use plain SQL "show schemas" and "show tables"
# List all tables from all schemas
dbListTables(con)
dbGetQuery(con, 'show schemas')
dbGetQuery(con, 'show tables')
Get all elements of a table
sample <- DBI::dbReadTable(con, "sample") # Fetch all data from a table
Execute queries in dplyr syntax
# Create a lazy tbl from an Impala table
forecast_db <- tbl(con, in_schema('forecast', 'forecast_weekly'))
forecast_db # Print columns and column types
# The query is written in dplyr syntax, but executed on a remote sql database
query <- forecast_db %>%
summarise(mean_forecast = forecast %>% as.numeric() %>% mean)
show_query(query) # Show the query that will be executed
query # Executes the lazy query when the result is printed
# Example of usable dplyr verbs
forecast_db %>%
filter(prediction_date == max(prediction_date)) %>%
group_by(reference) %>%
summarise(forecast_mean = mean(forecast),
forecast_max = max(forecast),
nb_forecast_total = n())
sales_db <- tbl(con, in_schema('forecast', 'sales_weekly'))
sales_db
forecast_db <- forecast_db %>%
mutate(reference = as.character(reference))
diff_db <- inner_join(forecast_db, sales_db, by=c('reference', 'year', 'week'))
diff_by_ref <- diff_db %>%
group_by(reference, year, week) %>%
summarise(diff_by_week = abs(forecast - quantity)) %>% # Difference between forecast and reality for each prediction
group_by(reference) %>%
summarise(diff_by_ref = sum(diff_by_week)) # Sum of all differences for each reference
diff_db # Executes all the lazy queries above
Writing table to Impala
The default function for writing fom R to Impala is very slow and only works for small amount of data. It uses the query INSERT VALUES ... INTO ... because it's the only way to insert data without knowing the underlying storage mechanism used by Impala. By default, on Saagie platforms hdfs is used and we can use it to speed up data transfer from R to Impala.
Below is an example of this function. It works in 4 steps :
- Upload data from R to HDFS in /tmp/
- Create a temporary external table on the csv file uploaded
- Copy the contents of the temporary table into the final Impala table with parquet format
- Remove the temporary table and the csv file used
The parameters used are described in the code below. Also, you need to specify the URL of web hdfs specific to your platform inside the function. If any error occur, verify the rights for your Impala user and change the hdfs user used.
#' Write data form R to Impala using HDFS
#'
#' @param con Impala connexion
#' @param name Name of the table created in Impala
#' @param obj R data frame
#' @param database Impala database to use
#' @param overwrite Overwrite the current table if it already exists
#'
fastWriteTable <- function(con, name, obj, database = 'default', overwrite = F) {
library(DBI)
library(stringi)
library(httr)
library(data.table)
if(!overwrite) {
showRequest <- paste0("show tables in `", database, "` like '", name, "'")
exists <- dbGetQuery(con, showRequest)
if(nrow(exists) > 0) {
print(paste0('Table ', database, '.', name, ' already exists. Use overwrite = T to overwrite table'))
return('Table already exists')
}
}
# HDFS information
hdfsUrl <- "https://nn1.pX.companyName.saagie.io:50470/webhdfs/v1"
hdfsUser <- '&user.name=impala'
# Informations about csv table
tableCsv <- paste0('tmp__', name, '__tmp') # Name of the table in Impala
location <- paste0('/tmp/', tableCsv, '/') # Folder location of the csv file
separator <- ';' # Separator in the csv file
nameCsv <- paste0(name, '.csv')
# Select database
dbGetQuery(con, paste0('use ', database))
# Columns names and classes
cl <- sapply(obj, class)
n <- names(cl)
# Matching R and Impala columns types
r_types <- c('numeric', 'integer', 'character', 'factor', 'c(\"POSIXct\", \"POSIXt\")')
impala_types <- c('DOUBLE', 'INTEGER', 'STRING', 'STRING', 'TIMESTAMP')
# Replace R types by Impala types
cl <- stringi::stri_replace_all_fixed(cl, r_types, impala_types, vectorize_all=F)
# Create each column as correct type
namesTypes <- paste0(dbQuoteIdentifier(con, make.names(n)), ' ', cl, collapse = ', ')
dropRequest <- paste0("DROP TABLE IF EXISTS `", tableCsv, "`")
dbGetQuery(con, dropRequest)
# Write csv to disk and copy in Impala
fwrite(obj, nameCsv, col.names = F, sep = ';')
set_config( config( ssl_verifypeer = 0L ) )
uri <- paste0(hdfsUrl, location, nameCsv, '?op=CREATE&overwrite=true', hdfsUser)
response <- PUT(uri)
stop_for_status(response)
uriWrite <- response$url
response <- PUT(uriWrite, body = upload_file(nameCsv))
stop_for_status(response)
# Create table on csv file
createRequest <- paste0("CREATE TABLE `", tableCsv, "` (", gsub('.', '_', namesTypes, fixed = T), ") row format delimited fields terminated by '", separator, "' stored as textfile location '", location, "'")
dbGetQuery(con, createRequest)
dropRequest2 <- paste0("DROP TABLE IF EXISTS `", name, "`")
dbGetQuery(con, dropRequest2)
# Copy csv table into a parquet table
parquetRequest <- paste0("CREATE TABLE `", name, "` STORED AS PARQUET AS SELECT * FROM `", tableCsv, "`")
dbGetQuery(con, parquetRequest)
# Optimize table
computeRequest <- paste0("COMPUTE STATS `", name, "`")
dbGetQuery(con, computeRequest)
# Remove tmp csv table
dropTmpRequest <- paste0("DROP TABLE `", tableCsv, "`")
dbGetQuery(con, dropTmpRequest)
return('Table correctly uploaded')
}
Installation details (only use if necessary)
Impala ODBC driver
Download the Impala ODBC driver from Cloudera https://www.cloudera.com/downloads/connectors/impala/odbc/2-5-39.html
On Windows, install the .msi executable
On Linux, install odbcinst tool
sudo apt-get install odbcinst
sudo dpkg -i clouderaimpalaodbc_*.deb
Execute the following command line to register the driver as installed
sudo odbcinst -i -d -f /opt/cloudera/impalaodbc/Setup/odbcinst.ini
if (!length(find.package("odbc", quiet = TRUE))) {
# Install OS package for ODBC
system("sudo apt-get install -y unixodbc unixodbc-dev", intern = TRUE)
# Install R package for ODBC
install.packages("odbc", repos = "https://cloud.r-project.org")
}
# If Impala ODBC driver is missing:
if (!file.exists("/etc/odbcinst.ini") ||
!length(grep("Cloudera ODBC Driver for Impala 64-bit", readLines("/etc/odbcinst.ini"), ignore.case = TRUE))) {
# Retrieve Impala drivers for Debianoids from Cloudera
if (!dir.exists("driver-impala")) dir.create("driver-impala")
if (!length(Sys.glob("driver-impala/clouderahiveodbc_*.deb")))
download.file(paste0("https://downloads.cloudera.com/connectors/impala_odbc_2.5.40.1025/",
"Debian/clouderaimpalaodbc_2.5.40.1025-2_amd64.deb"),
destfile = "driver-impala/clouderaimpalaodbc_2.5.40.1025-2_amd64.deb")
# Install driver
system("sudo dpkg -i driver-impala/clouderaimpalaodbc_*.deb", intern = TRUE)
# Setup driver and /etc/odbcinst.ini file
system("sudo apt-get install -y odbcinst", intern = TRUE)
system("sudo odbcinst -i -d -f /opt/cloudera/impalaodbc/Setup/odbcinst.ini", intern = TRUE)
}
Fix display names of tables in Rstudio and dbListTables
Execute the following command with an admin account :
sudo sed -i 's/DriverManagerEncoding=UTF-32/DriverManagerEncoding=UTF-16/g' /opt/cloudera/impalaodbc/lib/64/cloudera.impalaodbc.ini
Then disconnect from your Impala connexion, and restart you R session. When you connect again to Impala, you should see the full tables names.
With JDBC
Dependencies
R package
rjdbc : https://cran.r-project.org/web/packages/RJDBC/index.html
This allows R to connect to any DBMS that has a JDBC driver.
Impala JDBC drivers
Download the Impala JDBC drivers
download.file('https://github.com/Mu-Sigma/RImpala/blob/master/impala-jdbc-cdh5.zip?raw=true', 'jdbc.zip',
method ='curl', extra ='-L')
unzip('jdbc.zip')
Parameters
impalaConnectionUrl : Url used to communicate with Impala through a datanode without authentication.
If no athentication, the format is as follows :
jdbc:>hive2://datanode1dns:port/;auth=noSasl
The impala port is usually 21050.
Code explanation for querying impala
Loading JDBC driver and connection
drv <- JDBC(driverClass = "org.apache.hive.jdbc.HiveDriver",
classPath = list.files("impala-jdbc-cdh5", pattern = "jar$", full.names = TRUE),
identifier.quote = "`")
impalaConnectionUrl <- "jdbc:hive2://datanode1dns:port/"
conn <- dbConnect(drv, impalaConnectionUrl,user="xxxx",password="xxxx")
Query and Insert : Examples
Show Tables
# All databases
dbListTables(conn)
# The database "default"
dbGetQuery(conn,"show tables")
Get all elements of a table
# In database "default"
d <- dbReadTable(conn,"table_name")
# OR
d <- dbGetQuery(conn,"select * from table_name")
# Other that the database "default")
d <- dbReadTable(conn, nameBDD.table_name)
Create a table in parquet format
dbSendUpdate(conn, "CREATE TABLE table_name (attribute1 string, attribute2 int) STORED AS PARQUET")
Insert data into a table
dbGetQuery(conn, "INSERT INTO table_name VALUES ('test', 1)")
Refresh Tables
dbGetQuery(conn, "INVALIDATE METADATA")
Comments
0 comments
Article is closed for comments.