Spark & R: data frame operations with SparkR Published Sep 21, 2015 Last updated Apr 12, 2017 In this third tutorial (see the previous one) we will introduce more advanced concepts about SparkSQL with R that you can find in the SparkR documentation, applied to the 2013 American Community Survey housing data. Spark R Guide. The two most commonly used libraries that provide an R interface to Spark are SparkR and sparklyr. Databricks notebooks and jobs support both packages, although you cannot use functions from both SparkR and sparklyr with the same object. Databricks also provides an integration with RStudio, the popular IDE for R.
How do one call packages from spark to be utilized for data operations with R?
example i am trying to access my test.csv in hdfs as below
but getting error as below:
i tried loading the csv package by below option
but getting the below error during loading sqlContext
Any help will be highly appreciated.
zero3231 Answer
So it looks like by setting SPARKR_SUBMIT_ARGS
you are overriding the default value, which is sparkr-shell
. You could probably do the same thing and just append sparkr-shell to the end of your SPARKR_SUBMIT_ARGS. This is seems unnecessarily complex compared to depending on jars so I've created a JIRA to track this issue (and I'll try and a fix if the SparkR people agree with me) https://issues.apache.org/jira/browse/SPARK-8506 .
Note: another option would be using the sparkr command + --packages com.databricks:spark-csv_2.10:1.0.3
since that should work.
Not the answer you're looking for? Browse other questions tagged rapache-sparksparkr or ask your own question.
-->This document shows how to predict flight arrival delays using a ScaleR logistic regression model. The example uses flight delay and weather data, joined using SparkR.
Although both packages run on Apache Hadoop’s Spark execution engine, they are blocked from in-memory data sharing as they each require their own respective Spark sessions. Until this issue is addressed in an upcoming version of ML Server, the workaround is to maintain non-overlapping Spark sessions, and to exchange data through intermediate files. The instructions here show that these requirements are straightforward to achieve.
This example was initially shared in a talk at Strata 2016 by Mario Inchiosa and Roni Burd. You can find this talk at Building a Scalable Data Science Platform with R.
The code was originally written for ML Server running on Spark in an HDInsight cluster on Azure. But the concept of mixing the use of SparkR and ScaleR in one script is also valid in the context of on-premises environments.
The steps in this document assume that you have an intermediate level of knowledge of R and R the ScaleR library of ML Server. You are introduced to SparkR while walking through this scenario.
The airline and weather datasets
The flight data is available from the U.S. government archives. It is also available as a zip from AirOnTimeCSV.zip.
The weather data can be downloaded as zip files in raw form, by month, from the National Oceanic and Atmospheric Administration repository. For this example, download the data for May 2007 – December 2012. Use the hourly data files and YYYYMMMstation.txt
file within each of the zips.
Setting up the Spark environment
Use the following code to set up the Spark environment:
Next, add Spark_Home
to the search path for R packages. Adding it to the search path allows you to use SparkR, and initialize a SparkR session:
Preparing the weather data
To prepare the weather data, subset it to the columns needed for modeling:
- 'Visibility'
- 'DryBulbCelsius'
- 'DewPointCelsius'
- 'RelativeHumidity'
- 'WindSpeed'
- 'Altimeter'
Then add an airport code associated with the weather station and convert the measurements from local time to UTC.
Begin by creating a file to map the weather station (WBAN) info to an airport code. The following code reads each of the hourly raw weather data files, subsets to the columns we need, merges the weather station mapping file, adjusts the date times of measurements to UTC, and then writes out a new version of the file:
Importing the airline and weather data to Spark DataFrames
Now we use the SparkR read.df() function to import the weather and airline data to Spark DataFrames. This function, like many other Spark methods, are executed lazily, meaning that they are queued for execution but not executed until required.
Data cleansing and transformation
Next we do some cleanup on the airline data we’ve imported to rename columns. We only keep the variables needed, and round scheduled departure times down to the nearest hour to enable merging with the latest weather data at departure:
Now we perform similar operations on the weather data:
Joining the weather and airline data
We now use the SparkR join() function to do a left outer join of the airline and weather data by departure AirportID and datetime. The outer join allows us to retain all the airline data records even if there is no matching weather data. Following the join, we remove some redundant columns, and rename the kept columns to remove the incoming DataFrame prefix introduced by the join.
In a similar fashion, we join the weather and airline data based on arrival AirportID and datetime:
Save results to CSV for exchange with ScaleR
That completes the joins we need to do with SparkR. We save the data from the final Spark DataFrame 'joinedDF5' to a CSV for input to ScaleR and then close out the SparkR session. We explicitly tell SparkR to save the resultant CSV in 80 separate partitions to enable sufficient parallelism in ScaleR processing:
Import to XDF for use by ScaleR
We could use the CSV file of joined airline and weather data as-is for modeling via a ScaleR text data source. But we import it to XDF first, since it is more efficient when running multiple operations on the dataset:
Splitting data for training and test
We use rxDataStep to split out the 2012 data for testing and keep the rest for training:
Train and test a logistic regression model
Now we are ready to build a model. To see the influence of weather data on delay in the arrival time, we use ScaleR’s logistic regression routine. We use it to model whether an arrival delay of greater than 15 minutes is influenced by the weather at the departure and arrival airports:
Now let’s see how it does on the test data by making some predictions and looking at ROC and AUC.
Scoring elsewhere
We can also use the model for scoring data on another platform. By saving it to an RDS file and then transferring and importing that RDS into a destination scoring environment such as MIcrosoft SQL Server R Services. It is important to ensure that the factor levels of the data to be scored match those on which the model was built. That match can be achieved by extracting and saving the column information associated with the modeling data via ScaleR’s rxCreateColInfo()
function and then applying that column information to the input data source for prediction. In the following we save a few rows of the test dataset and extract and use the column information from this sample in the prediction script:
Summary
In this article, we’ve shown how it’s possible to combine use of SparkR for data manipulation with ScaleR for model development in Hadoop Spark. This scenario requires that you maintain separate Spark sessions, only running one session at a time, and exchange data via CSV files. Although straightforward, this process should be even easier in an upcoming ML Services release, when SparkR and ScaleR can share a Spark session and so share Spark DataFrames.
Next steps and more information
For more information on use of ML Server on Apache Spark, see the Getting started guide.
For general information on ML Server, see the Get started with R article.
For information on ML Services on HDInsight, see Overview of ML Services on HDInsight and Get started with ML Services on Azure HDInsight.
For more information on use of SparkR, see:
Apache SparkR document.
SparkR Overview from Databricks.