Social Days Report 2018
15/04/2019
Deep Learning and Predictive Analytics World
14/05/2019

Setup Pyspark on Windows

1. Install Anaconda

You should begin by installing Anaconda, which can be found here (select OS from the top):

https://www.anaconda.com/distribution/#download-section

For this How to Anaconda 2019.03 has been installed (python 3.7 version). Although please note that you should use the same version (env) as the one used on the cluster (in order to avoid version conflicts).

Installation may take between 30min and 1h.

Once installed, you have Anaconda including python 3.7 and juypter installed on your labtop.

To start juypter, simply start an Anaconda prompt, and type in „jupyter notebook“, as follows:

Search in Windows for Anaconda and choose the Anaconda prompt:

Type in „jupyter notebook“:

Your browser should open and juypter is accessible:

If you want to use the more innovative jupyter lab frontend just exchange in the URL the # with lab http://localhost:8888/lab

Once anaconda with jupyter and python 3.7 is available we can continue with the next step installing spark.

2. Install Spark

To install spark on your laptop the following three steps need to be executed.

REMARK: This handbook is for installing spark 2.2.0. Theoretically you can do the same installtion steps with newer versions of spark. Practically this would also work on the cluster, but this hasnt been tested and so for the moment just use spark 2.2.0.

2.1 Download Spark

You can download spark from here: http://spark.apache.org/downloads.html

Choose version 2.2.0 and download.

The *tgz file can be unpacked e.g. with 7-zip, which I got from somewhere in the internet.

Before you unpack the *tgz create a new folder called spark in C:

C:/spark

Unpack the *tgz into C:/spark

The target folder for the unpacking of the above file should be something like: spark-2.2.0-bin-hadoop2.7

2.2 Download WinUtils.exe

Sparkjobs in yarn mode need access to HDFS, since temporary data will be uploaded to HDFS into hdfs:///user/< userid >. Consequently your laptop needs a hadoop client which can be downloaded here.

https://github.com/steveloughran/winutils/tree/master/hadoop-2.7.1/bin

Create a new folder in C:/spark called Hadoop and a subfolder called bin

C:/spark/Hadoop/bin

and copy the downloaded winutils.exe into the bin folder

2.3 Download cloudera client config

2.3.1 Download

In order to acces the cluster the cloudera config needs to be downloaded. To do this, you will need to follow a link of this format:

http:// YOUR_CLOUDERA_MANAGER_IP /cmf/services/10/client-config

So, for example, if your IP is: myClouderaIP.com:1234, then your link will be:

http://myClouderaIP.com:1234/cmf/services/10/client-config

Download the zip and extract in a new subfolder from C:/spark called cloudera

C:/spark/cloudera/

Important The files (*.xml and other) should be copied direct under the cloudera folder, no subfolder like hive-clientconfig should be there.

2.3.2 Copy the hive-site.xml

The last step is to copy the hive-site.xml from

C:/spark/cloudera/hive-site.xml

to

C:/spark/spark-2.2.0-bin-hadoop2.7/conf/hive-site.xml

There is already one but should be replaced with the new one.

3. Setup environment variables in Windows

Since all needed sources/binaries and configs has been downloaded and saved in C:/spark a couple of environement variables need to be set in order to let jupyter know to use pyspark and how to acess the cluster

For spark the following environment variables need to be setIn [2]:

!setx SP_ROOT_HOME "C:\spark" 
!setx SPARK_HOME "C:\spark\spark-2.2.0-bin-hadoop2.7"
!setx PYSPARK_PYTHON "/opt/anaconda3/bin/python3"
!setx PYTHONPATH "C:\spark\spark-2.2.0-bin-hadoop2.7\python;C:\spark\spark-2.2.0-bin-hadoop2.7\python\lib;C:\spark\spark-2.2.0-bin-hadoop2.7\python\lib\py4j-0.10.4-src.zip;"
!setx PYSPARK_LIBS "C:\spark\spark-2.2.0-bin-hadoop2.7\python\lib\pyspark.zip;"
!setx HADOOP_CONF_DIR "C:\spark\cloudera;"
!setx YARN_CONF_DIR "C:\spark\cloudera;"
SUCCESS: Specified value was saved. 
SUCCESS: Specified value was saved.
SUCCESS: Specified value was saved.
SUCCESS: Specified value was saved.
SUCCESS: Specified value was saved.
SUCCESS: Specified value was saved.
SUCCESS: Specified value was saved.

For the winutils.exe the following environment variables need to be setIn [3]:

!setx HADOOP_HOME "C:\spark\Hadoop" 
!setx PATH "%PATH%;%SPARK_HOME%\bin;%HADOOP_HOME%\bin"
SUCCESS: Specified value was saved. 
SUCCESS: Specified value was saved.

VERY IMPORTANT

Close the complete browser with juypter and the cmd running in the back and restart juypter via Aneconda again. The environment variables are becoming active after in juypter after the restart.

4. Open Ports

Assure that the Ports are open on your laptop in case you plan to use yarn-client mode as in this case. Whenever you see the error message in the yarn logs no connection to the spark driver can be established then you have most probably a port issue. Assuming you need just one SparkSession on the Laptop the following ports need to be opened.

spark.driver.port <- 51810
spark.fileserver.port <- 51811
spark.broadcast.port <- 51812
spark.replClassServer.port <- 51813
spark.blockManager.port <- 51814
spark.executor.port <- 51815
SparkWebUi <- 4040

If for any reason you need more than one SparkSession running in Parallel, then simply extend by opening more ports according the pattern above. e.g.

spark.driver.port <- 51816
spark.fileserver.port <- 51817
spark.broadcast.port <- 51818
spark.replClassServer.port <- 51819
spark.blockManager.port <- 51820
spark.executor.port <- 51821
SparkWebUi <- 4041 any new Spark Context will just run the SaprkwebUI on the next port starting from 4040, then 4041, then 4042 ans so on

4.1 Step 1

To open ports you need admin rights. ‚Run as administrator‘, or request admin rights from IT if necessary.

4.2 Step 2

Once you got the admin rights. (You get them immediately) Search on windows for the Firewall service and activate it Windows Firewall

Click on Advanced Settings

Windows Firewall – Advanced Settings

4.3 Step 3

Create a new Inbound Rule.

To do this:

In Advanced Settings, scroll down and click ‚Inbound Rules‘. Then on the top left click ‚New Rule‘.

Choose the ‚Port‘ option and click ‚Next‘. Configure all the mentioned Ports from chapter Opening Ports 51810,51811,51812,51813,51814,51815,4040 Configure Ports

Choose ‚Allow the Connection‘ and click ‚Next‘.

Select only ‚Domain‘ and click ‚Next‘.

Name the Inbound Rule ‚Spark Driver yarn-client‘ and click ‚Finish‘.

5. Check Environment

5.1 Check if all environment variables are set correctly

The environment variables haven’t been set, and this can be done over python. If you dont want to execute this variable all the time just add a new !setx command as seen above.In [4]:

import os 
os.environ["JAVA_HOME"] = "C:\\PROGRA~1\\Java\\jdk1.8.0_151"

Check if all variable are set correctly. No need to check every time. This is just for the first time.In [5]:

print("JAVA_HOME: "+ os.getenv("JAVA_HOME")) 
print("SPARK_HOME: "+ os.getenv("SPARK_HOME"))
print("HADOOP_HOME: "+ os.getenv("HADOOP_HOME"))
print("HADOOP_CONF_DIR: " + os.getenv("HADOOP_CONF_DIR")) print("YARN_CONF_DIR: " + os.getenv("YARN_CONF_DIR")) print("PYSPARK_PYTHON: " + os.getenv("PYSPARK_PYTHON"))
print("PYTHONPATH: " + os.getenv("PYTHONPATH"))
print("PYSPARK_LIBS: " + os.getenv("PYSPARK_LIBS"))
print("PATH: " + os.getenv("PATH")) #print("HADOOP_USER_NAME: " + os.getenv("HADOOP_USER_NAME"))
JAVA_HOME: C:\PROGRA~1\Java\jdk1.8.0_151 
SPARK_HOME: C:\spark\spark-2.2.0-bin-hadoop2.7
HADOOP_HOME: C:\spark\Hadoop
HADOOP_CONF_DIR: C:\spark\cloudera;
YARN_CONF_DIR: C:\spark\cloudera;
PYSPARK_PYTHON: /opt/anaconda3/bin/python3
PYTHONPATH:C:\Inicio\1.1.1.2\nocplusplus\lib\python;C:\Inicio\1.1.1.2\lib\python;C:\Inicio\1.1.1.2\lib\python\pyverify;C:\spark\spark-2.2.0-bin-hadoop2.7\python;C:\spark\spark-2.2.0-bin-hadoop2.7\python\lib;C:\spark\spark-2.2.0-bin-hadoop2.7\python\lib\py4j-0.10.4-src.zip;
PYSPARK_LIBS: C:\spark\spark-2.2.0-bin-hadoop2.7\python\lib\pyspark.zip; PATH: C:\Inicio\1.1.1.2/bin;C:\Inicio/tools/64\iniciomsys-17.11.6.2/usr/bin;C:\Inicio\tools\64\Anaconda3-5.0.1.2;C:\Inicio\tools/64\Anaconda3-5.0.1.2/Scripts;C:\Inicio\tools/64\Anaconda3-5.0.1.2/DLLs;C:\WINDOWS\system32;C:\WINDOWS;C:\WINDOWS\System32\Wbem;C:\WINDOWS\System32\WindowsPowerShell\v1.0\;C:\Program Files\Common Files\ThinPrint\;C:\Program Files\PuTTY\;C:\Program Files (x86)\IVI Foundation\VISA\WinNT\Bin\;C:\Program Files\IVI Foundation\VISA\Win64\Bin\;C:\Program Files (x86)\IVI Foundation\VISA\WinNT\Bin;C:\Program Files (x86)\scala\bin;C:\Inicio\tools\64\Anaconda3-5.0.1.2\Library\bin;C:\Users\grippo\AppData\Local\Microsoft\WindowsApps;C:\spark\spark-2.2.0-bin-hadoop2.7\bin;C:\spark\Hadoop\bin;C:\spark\spark-2.2.0-bin-hadoop2.7\bin;C:\spark\Hadoop\bin;C:\spark\spark-2.2.0-bin-hadoop2.7\bin;C:\spark\Hadoop\bin

6. Samples of using Spark

All the steps 1 to 5 are steps which need to be performend just one time to configure the Laptop Workspace. Once done you can start juypter via Anaconda, create Sparksessions and start working with Hive, hdfs, and Spark.

6.1 Sparksession in local mode

In local mode you can also access hive and hdfs from the cluster. If you use pyspark it will excute spark executers on your laptop.In [3]:

import os 
os.environ["JAVA_HOME"] = "C:\\PROGRA~1\\Java\\jdk1.8.0_151"

from pyspark.sql import SparkSession

spark = SparkSession.builder \
.master('local') \
.appName("jupyter") \
.config("spark.driver.port",51810 ) \
.config("spark.fileserver.port" , 51811) \ .config("spark.broadcast.port" , 51812) \ .config("spark.replClassServer.port" , 51813) \ .config("spark.blockManager.port" , 51814) \ .config("spark.executor.port" , 51815) \ .config("spark.dynamicAllocation.enabled" , 'false') \ .config("spark.executor.instances" , 3) \ .config("spark.executor.cores" , 1) \
.config("spark.executor.memory", '1g') \
.enableHiveSupport() \
.getOrCreate()

print("ready")

SparkWebUI can be accessed after creation of the SparkSession http://localhost:4040

6.2 Sparksession in yarn client mode

In yarn-client mode you can also access hive and hdfs from the cluster. If you use pyspark it will excute spark executers on the cluster.

SparkWebUI can be accessed after creation of the SparkSession http://localhost:4040 

import os 
os.environ["JAVA_HOME"] = "C:\\PROGRA~1\\Java\\jdk1.8.0_151" #Sos.environ["HADOOP_USER_NAME"] = "hdfs"

from pyspark.sql import SparkSession

spark = SparkSession.builder \
.master('yarn') \
.appName("jupyter") \
.config("spark.submit.deployMode","client") \ .config("spark.driver.port",51810 ) \
.config("spark.fileserver.port" , 51811) \ .config("spark.broadcast.port" , 51812) \ .config("spark.replClassServer.port" , 51813) \ .config("spark.blockManager.port" , 51814) \ .config("spark.executor.port" , 51815) \ .config("spark.dynamicAllocation.enabled" , 'false') \ .config("spark.executor.instances" , 3) \ .config("spark.executor.cores" , 1) \
.config("spark.executor.memory", '1g') \
.enableHiveSupport() \
.getOrCreate()

print("ready")
ready

SparkWebUI can be accessed after creation of the SparkSession http://localhost:4040

6.3 Hive

After you created the Sparksession in local or yarn-cient mode you get with .enableHiveSupport() automatically access to Hive on the cluster.

You can now use spark.catalog.listDatabses() to see information about the databases. The output will have the format:

Database(name=’example1′,description=None,locationUri=’hdfs://example_directory1/example1.db‘),

Database(name=’example2′,description=None,locationUri=’hdfs://example_directory1/example2.db‘),

You can also use spark.catalog.listTables() to see information about the tables. The output will have the format:

Table(name=’example1′,database=’default‘,description=None,tableType=’MANAGED‘, isTemporary=False),

Table(name=’example2′,database=’default‘,description=None,tableType=’EXTERNAL‘, isTemporary=False),

6.4 SparkSQL

You can use SparkSQL and execute SQl commands.In [11]:

#Execute SQL on Hive 
df = spark.sql("select * from ct_fac")
df.count()

Output:

1479106
#Print the Schema of a data frame 
df.printSchema()
root  
|-- lot: string (nullable = true)
|-- from_basic_type: string (nullable = true)
|-- from_facility: string (nullable = true)
|-- cumCT: double (nullable = true)
|-- Last_TS: string (nullable = true)
spark.sql("describe ct_fac").show()
+---------------+---------+-------+ 
| col_name|data_type|comment|
+---------------+---------+-------+
| lot| string| null|
|from_basic_type| string| null|
| from_facility| string| null|
| cumCT| double| null|
| Last_TS| string| null|
+---------------+---------+-------+

6.5 Pyspark

Execute python and spark on the cluster.In [14]:

largeRange = spark.sparkContext.parallelize(range(100000)) 
reduceTest = largeRange.reduce(lambda a, b: a + b)
filterReduceTest = largeRange.filter(lambda x: x % 7 == 0).sum()

print(reduceTest)
print(filterReduceTest)
4999950000 
714264285

6.6 Dont forget to stop Spark when it runs on the cluster yarn client mode

You need to manually stop the Spark application, unless the CPU and Memory won’t be freed for other users.In [16]:

spark.stop()