from pyspark.sql import SparkSession

from pyspark.sql.window import Window
from pyspark.sql.functions import col, min, max, lead, lag, lit, date_format

import pandas as pd
# create spark session for Dataframe
spark = SparkSession.builder.appName('Practice').getOrCreate()

# create spark context for RDD
sc = spark.sparkContext

RDD (Resilient Distributed Datasets):

a. Transformations vs Actions

1. Transformations:

  • PySpark transformation functions which produce RDDs, DataFrames or DataSets in results.
  • Transformations are lazy operations meaning none of the transformations get executed until you call an action on Spark RDD.

    1.1 Narrow Transformations:

    • These types of transformations convert each input partition to only one output partition.
    • When each partition at the parent RDD is used by at most one partition of the child RDD or when each partition from child produced or dependent on single parent RDD.
    • This kind of transformation is basically fast.
    • Does not require any data shuffling over the cluster network or no data movement.
    • in RDD:

      1. map()
      2. filter()
      3. flatMap()
      4. sample()
      5. union()
    • in Dataframe

      1. select()
      2. filter() or where()
      3. withColumn()
      4. drop()
      5. alias()
      6. sample()
  • 1.2 Wide Transforamtions:

    • in rdd

      1. groupByKey()
      2. aggregateByKey()
      3. aggregate()
      4. join()
      5. repartition()
    • in dataframe

      1. groupBy()
      2. agg() (after groupBy())
      3. rollup() (followed by aggregation)
      4. orderBy() (across partitions)
      5. join() with different join types
      6. repartition()
      7. coalesce()

2. Actions:

  • PySpark actions produce a computed value back to the Spark driver program.
  • RDD Actions

    1. collect()
    2. count()
    3. first()
    4. top()
    5. min()
    6. max()
    7. take()
  • Dataframe Actions

    1. show(n)
    2. count()
    3. first() and head()
    4. collect()
    5. take(n)
    6. foreach(func)
    7. write.format(...).save(path)
    8. agg(...)

b. Repartition vs Coalesce

1. Repartition

  • Spark RDD repartition() method is used to increase or decrease the partitions.

2. Coalesce

  • Spark RDD coalesce() is used only to reduce the number of partitions.
  • This is optimized or improved version of repartition() where the movement of the data across the partitions is lower using coalesce.

c. Cache and Persist

  • Caching or persistence are optimization techniques for (iterative and interactive) Spark computations.
  • They help saving interim partial results so they can be reused in subsequent stages.
  • These interim results as RDDs are thus kept in memory (default) or more solid storage like disk and/or replicated.
  • RDDs can be cached using cache operation.
  • They can also be persisted using persist operation.
  • With cache(), you use only the default storage level :
    • MEMORY_ONLY for RDD
    • MEMORY_AND_DISK for Dataset
  • With persist(), you can specify which storage level you want for both RDD and Dataset.

d. Broadcasting

  • Broadcast variables allow the developers to cache a copy of the read-only variable on each machine/node rather than moving the copy of it with tasks.

e. Spark Workflow

  • ### 1. Driver program:
  • The driver program is the main program that is executed on the driver node.
  • It is responsible for creating the SparkContext object, which is the entry point to the Spark API. ### 2. DAG creation:
  • The driver program creates a DAG of tasks to be executed.
  • The DAG is a directed acyclic graph, which means that there are no circular dependencies between tasks. ### 3. Task scheduling:
  • Once the DAG is created, the driver program schedules the tasks to be executed on the worker nodes.
  • The driver program takes into account the resources available on each worker node when scheduling the tasks. ### 4. Task execution:
  • The tasks are then executed on the worker nodes.
  • The worker nodes are responsible for actually processing the data and performing the computations. ### 5. Result aggregation:
  • Once the tasks are completed, the driver program aggregates the results from the worker nodes.
  • The driver program then returns the results to the user.

f. Stages & Task Creation

  • Each spark action results in one or more stages and these stages are divided based on transformations that have narrow dependedncies and wide dependencies.
  • ex. map() tranformation has a narrow dependency, meaning each output partition depends on only one input partition. Thus our job will have only one stage.
  • Each stage is divided into tasks. A task represents the computation applied to a single partition of the RDD.
  • The number of task equal to the number of partitions in the RDD
  • By default when an RDD is created using parallelize without specifying a number of partitions spark might use a default value typically the num,ber of available cores.

g. Executor

  • Executors are responsible for running tasks on cluster nodes
  • An executor can run multiple tasks concurrently.
  • If an executor is allocated 5 cores it can run upto 5 tasks at the same time
  • The driver program communicates with the executors to schedule tasks on them & collect results.

h. Executor memory and Storage

  • e.g. 5 node cluster
  • Available : cores per node = 16 cores
            Memory per node - 64 GB
  • Reseverd: 1 core per node for OS and daemons
          1 GB per node
  • Executor: common practice is to have 5 cores per executor i.e. 3 executors per node total executors = 3 * 5 = 15 executors
  • Reserved 1 executor for app management i.e. number of executors 14

  • executor memory per node = 63 GB/5 executors = 21 GB per executor leave some heap memory i.e. 20 GB

    i.e final configuration --num-executors 14 -- executor-memory 20G --executor-cores 5

Spark Files

  • This method is particularly useful when running Spark on a cluster, as it ensures that the file is available on all nodes where the tasks will run.
  • By using SparkFiles, you can ensure that your external files are available to your Spark application regardless of where in the cluster your tasks are executed.
  • You can add files to your Spark job using the SparkContext.addFile() method.
  • This method uploads the specified file to the Spark master, and then it is transferred to all the worker nodes before the job starts.
  • Once a file has been added using addFile(), it can be accessed from worker nodes using the SparkFiles.get() method.
  • This method provides the file's local path on each node, enabling your Spark tasks to read from it as though it was a local file.
  • Common use cases for SparkFiles include distributing lookup tables, machine learning models, or any other necessary files that your Spark jobs need to access.

Serializers

  • #### 1. Default PickleSerializer:
  • For general use cases where custom serialization is not needed. #### 2. MarshalSerializer:
  • When working with simpler data types and needing slightly better performance than pickle. #### 3. KryoSerializer:
  • For large datasets and when performance is critical.
  • Kryo is often used in Spark applications dealing with large-scale data processing. #### 4. Custom Serializers:
  • In more advanced scenarios, you might even need to implement custom serializers for specific use cases or optimize serialization for custom objects.
  • However, this is usually only necessary in very specific or high-performance scenarios.
from pyspark.serializer import KryoSerializer
conf = SparkConf().set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
sc = SparkContext(conf=conf)

or

from pyspark.serializers import MarshalSerializer
sc = SparkContext("local", "Marshal Serialization", serializer = MarshalSerializer())

PySpark RDD

  • PySpark RDDs have the following advantages: #### 1. In-Memory Processing:
  • PySpark’s RDD helps in loading data from the disk to the memory.
  • The RDDs can even be persisted in the memory for reusing the computations. #### 2. Immutability:
  • The RDDs are immutable which means that once created, they cannot be modified.
  • While applying any transformation operations on the RDDs, a new RDD would be created. #### 3. Fault Tolerance:
  • The RDDs are fault-tolerant.
  • This means that whenever an operation fails, the data gets automatically reloaded from other available partitions.
  • This results in seamless execution of the PySpark applications. #### 4. Lazy Evolution:
  • The PySpark transformation operations are not performed as soon as they are encountered.
  • The operations would be stored in the DAG and are evaluated once it finds the first RDD action. #### 5. Partitioning:
  • Whenever RDD is created from any data, the elements in the RDD are partitioned to the cores available by default.

Shared Variables (Broadcast, Accumulators)

  • Normally, when you run a Spark operation, each node in the cluster works on its own copy of variables.
  • However, for certain tasks, you might want to share some state or data across tasks or nodes.
  • PySpark provides two types of shared variables: broadcast variables and accumulators. #### 1. Broadcast Variables
  • Broadcast variables are used to save a copy of a large value (like a lookup table) in every node's memory, instead of shipping a copy of it with tasks.
  • They are useful when tasks across multiple stages need the same data.
  • They should be used when the same data is needed by all or many of the tasks.
  • They help in reducing network I/O and memory usage. #### 2. Accumulators
  • Accumulators are used for aggregating the information across tasks.
  • They are write-only variables for the executors (nodes processing the data) and readable only by the driver program.
  • They are typically used for counters and sums.
  • Useful for aggregating data from worker nodes to the driver node.

  • Broadcast Variable

      from pyspark import SparkContext
    
      sc = SparkContext("local", "Broadcast Example")
      large_lookup_table = {"key1": "value1", "key2": "value2"}  # Large dataset
      broadcast_var = sc.broadcast(large_lookup_table)
    
      # Accessing the broadcast variable in an operation
      rdd = sc.parallelize([1, 2])
      result = rdd.map(lambda x: (x, broadcast_var.value.get(f"key{x}", None))).collect()
      print(result)  # Output will use values from the broadcasted variable
  • Accumulator Variable

      from pyspark import SparkContext
    
      sc = SparkContext("local", "Accumulator Example")
      accum = sc.accumulator(0)
    
      rdd = sc.parallelize([1, 2, 3, 4, 5])
      rdd.foreach(lambda x: accum.add(x))
    
      # The value of accum is available in the driver
      print(accum.value)  # Output will be the sum of the RDD elements

Create an RDD

rdd = sc.parallelize([1,2,3,4])
rdd.collect()
[1, 2, 3, 4]

Create an RDD from text file

rdd = sc.textFile(r"C:\Users\uif52518\Desktop\interview\data\sample1.txt")

Word Count Program

rdd = rdd.flatMap(lambda line: line.split(" "))
print('flatmap: ', rdd.collect())

rdd = rdd.map(lambda word: (word, 1))
print('\nmap: ', rdd.collect())

rdd = rdd.reduceByKey(lambda a, b: a+b)
print('\nreduceByKey: ', rdd.collect())
flatmap:  ['Hello', 'world', 'PySpark', 'is', 'awesome', 'Apache', 'Spark', 'is', 'fast', 'Spark', 'with', 'Python', 'is', 'great', 'Learning', 'Spark', 'is', 'fun']

map:  [('Hello', 1), ('world', 1), ('PySpark', 1), ('is', 1), ('awesome', 1), ('Apache', 1), ('Spark', 1), ('is', 1), ('fast', 1), ('Spark', 1), ('with', 1), ('Python', 1), ('is', 1), ('great', 1), ('Learning', 1), ('Spark', 1), ('is', 1), ('fun', 1)]

reduceByKey:  [('Hello', 1), ('world', 1), ('PySpark', 1), ('is', 4), ('awesome', 1), ('Apache', 1), ('Spark', 3), ('Python', 1), ('fast', 1), ('with', 1), ('great', 1), ('Learning', 1), ('fun', 1)]

Repartition and Coalesce

print(rdd.getNumPartitions())
print(rdd.glom().map(len).collect())
2
[8, 5]
rdd = rdd.repartition(4)
print(rdd.getNumPartitions())
print(rdd.glom().map(len).collect())
4
[0, 5, 0, 8]
rdd = rdd.coalesce(3)
print(rdd.getNumPartitions())
print(rdd.glom().map(len).collect())
3
[5, 8, 0]
 

Create DataFrame

person_data = [[1,'Wang','Allen'],
              [2,'Alice','Bob']]
person_cols = ['personId', 'lastName', 'firstName']

add_data = [[1,2,'New York City', 'New York'],
           [2,3,'Leetcode', 'California']]
add_cols = ['addressId', 'personId', 'city', 'state']
person_df = spark.createDataFrame(person_data, person_cols)
add_df = spark.createDataFrame(add_data, add_cols)
person_df.show()
+--------+--------+---------+
|personId|lastName|firstName|
+--------+--------+---------+
|       1|    Wang|    Allen|
|       2|   Alice|      Bob|
+--------+--------+---------+

add_df.show()
+---------+--------+-------------+----------+
|addressId|personId|         city|     state|
+---------+--------+-------------+----------+
|        1|       2|New York City|  New York|
|        2|       3|     Leetcode|California|
+---------+--------+-------------+----------+

person_df.printSchema()

Combine DataFrames

person_df.union(person_df).show()
+--------+--------+---------+
|personId|lastName|firstName|
+--------+--------+---------+
|       1|    Wang|    Allen|
|       2|   Alice|      Bob|
|       1|    Wang|    Allen|
|       2|   Alice|      Bob|
+--------+--------+---------+

Join DataFrames

res_sdf = person_df.join(add_df, how='left', on='personId')
res_sdf[['firstName', 'lastName', 'city', 'state']].show()
+---------+--------+-------------+--------+
|firstName|lastName|         city|   state|
+---------+--------+-------------+--------+
|    Allen|    Wang|         null|    null|
|      Bob|   Alice|New York City|New York|
+---------+--------+-------------+--------+

Group By

person_df.groupBy('firstname').agg()
<pyspark.sql.group.GroupedData at 0x1bdcab623a0>

User Defined Functions

from pyspark.sql.functions import UserDefinedFunction
def test(x):
    return x+1

udf = UserDefinedFunction(test)
person_df.select('personId').withColumn('a', udf(person_df.personId)).show()
+--------+---+
|personId|  a|
+--------+---+
|       1|  2|
|       2|  3|
+--------+---+

Window Function

from pyspark.sql.window import Window
w = Window.partitionBy('deptId').orderBy('name')
df.withColumn('new', rank().over(w))

DateTime

from pyspark.sql.functions import col, to_date, to_timestamp, date_format,\
                                    dayofweek, year, month, hour, minute, second
data = [("2021-01-01 10:15:30",), ("2021-06-24 12:01:19",)]
df = spark.createDataFrame(data, ["datetime_str"])
df.show()
df = df.withColumn("date", to_date(col("datetime_str"), "yyyy-MM-dd HH:mm:ss"))
df = df.withColumn("timestamp", to_timestamp(col("datetime_str"), "yyyy-MM-dd HH:mm:ss"))
df.show()
df = df.withColumn("year", year(col("timestamp")))
df = df.withColumn("month", month(col("timestamp")))
df = df.withColumn("day", dayofweek(col("timestamp")))
df.show()
df = df.withColumn("formatted_date", date_format(col("timestamp"), "yyyy/MM/dd HH:mm"))
df.show()
PySpark provides a wide range of functions to work with dates and times, 
such as add_months, date_add, date_sub, datediff, current_date, current_timestamp,
and many others to perform various date and time-related operations.
# Add 3 months to the start_date
df = df.withColumn("start_plus_3_months", add_months(col("start_date"), 3))

# Subtract 1 month from the end_date
df = df.withColumn("end_minus_1_month", add_months(col("end_date"), -1))
df.show()
# Add 10 days to start_date
df = df.withColumn("start_plus_10_days", date_add(col("start_date"), 10))

# Subtract 5 days from end_date
df = df.withColumn("end_minus_5_days", date_sub(col("end_date"), 5))
df.show()
# Difference in days between start_date and end_date
df = df.withColumn("days_diff", datediff(col("end_date"), col("start_date")))
df.show()
df = df.withColumn("current_date", current_date())
df = df.withColumn("current_timestamp", current_timestamp())
df.show()

Datetime squence

from pyspark.sql import SparkSession
from pyspark.sql.functions import sequence, to_date, col, explode, expr
from pyspark.sql.types import DateType

spark = SparkSession.builder.appName("sequence_example").getOrCreate()
df = spark.createDataFrame([("2022-01-01", "2022-01-05")], ["start_date", "end_date"])
df
DataFrame[start_date: string, end_date: string]
# Convert to date type

df = df.withColumn("start_date", to_date(col("start_date"), "yyyy-M-D"))
df = df.withColumn("end_date", to_date(col("end_date"), "yyyy-M-D"))

# df = df.withColumn("start_date", to_date(col("start_date")).cast(DateType()))
# df = df.withColumn("end_date", to_date(col("end_date")).cast(DateType()))
# Generate sequence of dates
df = df.withColumn("date_sequence", sequence(col("start_date"), col("end_date"), expr("interval 1 day")))
df.show()
+----------+----------+--------------------+
|start_date|  end_date|       date_sequence|
+----------+----------+--------------------+
|2022-01-01|2022-01-05|[2022-01-01, 2022...|
+----------+----------+--------------------+

# Explode the sequence to have one date per row
df = df.withColumn("date", explode(col("date_sequence")))
df.select("date").show()
+----------+
|      date|
+----------+
|2022-01-01|
|2022-01-02|
|2022-01-03|
|2022-01-04|
|2022-01-05|
+----------+

176. Second Highest Salary

'''
+----+--------+
| Id | Salary |
+----+--------+
| 1  | 100    |
| 2  | 200    |
| 3  | 300    |
+----+--------+
'''
'\n+----+--------+\n| Id | Salary |\n+----+--------+\n| 1  | 100    |\n| 2  | 200    |\n| 3  | 300    |\n+----+--------+\n'
emp_data = [[1,100],
           [2,200],
           [3,300]]
emp_cols = ['Id','Salary']
emp_sdf = spark.createDataFrame(emp_data, emp_cols)
w = Window.orderBy(emp_sdf.Salary.desc())
ds = emp_sdf.withColumn('rank', F.rank().over(w))
ds.filter(ds.rank==2)[['Salary']].withColumnRenamed('Salary', 'secondMaxSalary').show()
+---------------+
|secondMaxSalary|
+---------------+
|            200|
+---------------+

177. Nth Highest Salary

emp_data = [[1,100],
           [2,200],
           [3,300]]
emp_cols = ['Id','Salary']
emp_sdf = spark.createDataFrame(emp_data, emp_cols)
def nth_salary(n, emp_sdf):
    w = Window.orderBy(emp_sdf.Salary.desc())
    ds = emp_sdf.withColumn('dense_rank', F.dense_rank().over(w))
    ds.filter(ds.dense_rank==n)[['Salary']].withColumnRenamed('Salary', 'MaxSalary('+str(n)+')').show()
nth_salary(3, emp_sdf)
+------------+
|MaxSalary(3)|
+------------+
|         100|
+------------+

178. Rank Scores

+----+-------+
| Id | Score |
+----+-------+
| 1  | 3.50  |
| 2  | 3.65  |
| 3  | 4.00  |
| 4  | 3.85  |
| 5  | 4.00  |
| 6  | 3.65  |
+----+-------+
score_data = [[1, 3.50],
             [2, 3.65],
             [3, 4.00],
             [4, 3.85],
             [5, 4.00],
             [6, 3.65]]
score_cols = ['Id','Score']
df = spark.createDataFrame(score_data, score_cols)
df
DataFrame[Id: bigint, Score: double]
w = Window.orderBy(sdf.Score.desc())

df = sdf.withColumn('rank', F.dense_rank().over(w))
df[['Score', 'rank']].show()
+-----+----+
|Score|rank|
+-----+----+
|  4.0|   1|
|  4.0|   1|
| 3.85|   2|
| 3.65|   3|
| 3.65|   3|
|  3.5|   4|
+-----+----+

180. Consecutive Numbers

# Write a SQL query to find all numbers that appear at least three times
+----+-----+
| Id | Num |
+----+-----+
| 1  | 1   |
| 2  | 1   |
| 3  | 1   |
| 4  | 2   |
| 5  | 1   |
| 6  | 2   |
| 7  | 2   |
+----+-----+
nums_data = [[1,1],
            [2,1],
            [3,1],
            [4,2],
            [5,1],
            [6,2],
            [7,2]]
nums_col = ['Id','Num']
sdf = spark.createDataFrame(nums_data, nums_col)
w = Window.orderBy(sdf.Id)
df = sdf.withColumn('next', F.lead(sdf.Num).over(w))
df = sdf.withColumn('prev', F.lag(sdf.Num).over(w))
df.show()
+---+---+----+----+
| Id|Num|next|prev|
+---+---+----+----+
|  1|  1|   1|null|
|  2|  1|   1|   1|
|  3|  1|   2|   1|
|  4|  2|   1|   1|
|  5|  1|   2|   2|
|  6|  2|   2|   1|
|  7|  2|null|   2|
+---+---+----+----+

df = sdf[(sdf['Num']==sdf['next']) & (sdf['Num']==sdf['prev']) & (sdf['next']==sdf['prev'])][['Num']]
df.withColumnRenamed('Num','ConsecutiveNums').show()
+---------------+
|ConsecutiveNums|
+---------------+
|              1|
+---------------+

181. Employees Earning More Than Their Managers

emp_data = [[1, 'Joe', 70000, 3 ],
           [2, 'Henry', 80000, 4],
           [3, 'Sam', 60000, None],
           [4, 'Max', 90000, None]]
emp_cols = ['Id', 'Name', 'Salary', 'ManagerId']
df = spark.createDataFrame(emp_data, emp_cols)
joined_sdf = df.alias('e1').join(sdf.alias('e2'), col('e1.id')==col('e2.ManagerId'), how='inner')
joined_sdf.where('e1.salary>e2.salary').select('e1.name').show()
+----+
|name|
+----+
| Max|
+----+

182. Duplicate Emails

email_data = [[1, 'a@b.com'],
             [2, 'c@d.com'],
             [3, 'a@b.com']]
email_cols = ['Id', 'Email']
df = spark.createDataFrame(email_data, email_cols)
df.groupBy('Email').count().alias('df').where('df.count>1').select('df.email').show()
+-------+
|  email|
+-------+
|a@b.com|
+-------+

from pyspark.ml.regression import RandomForestRegressor