Introduction
In this course, you’ll cover all the foundational knowledge needed to work with Apache Spark. This includes understanding the architecture, the roles of the driver and executors, and how everything fits together. You’ll also learn why Spark is so powerful, and when to choose Spark over other technologies. These concepts will be covered in detail, followed by a hands-on lab where we’ll dive into partitioning, sorting, and how to work with Spark for reading and writing data.
Spark is a distributed computing framework designed to process large volumes of data efficiently. It’s often considered the successor to other big data technologies, starting with Hadoop and Java MapReduce back in 2009. After that, Hive was introduced, followed by Spark, which has emerged as the dominant technology in the field.
Spark is likely to remain relevant for a long time due to its robust capabilities, and we’ll delve deeper into its details throughout the course.
Why is Spark so good?
- Efficient Use of RAM: Spark leverages RAM more effectively than previous technologies like Hive or MapReduce. In Hive or MapReduce, data had to be written to and read from disk at each step, which made computations slow. Spark reduces the need for this disk I/O, enabling faster processing. It only writes to disk when necessary, such as when an operation exceeds available memory, and this is referred to as spilling to disk.
- Performance: Spark’s ability to use memory more efficiently leads to faster computations. Spilling to disk is something you’d want to avoid, as it slows down Spark, making it behave like the slower Hive or MapReduce.
- Storage Agnostic: Spark is highly flexible in terms of the data sources it can read from, including relational databases, data lakes, files, MongoDB, and more. This makes it a very versatile tool for handling various data storage systems.
- Large Community: Spark has a massive community of developers and users, which makes it well-supported and continuously evolving. It’s also compatible with different technologies, not just Databricks, but can be used with a wide range of systems.
When is Spark not so good?
- Lack of Expertise in Your Team: If you’re the only person on your team who knows Spark, it might not be a good idea to use it. Spark requires a certain level of expertise, and using it without the necessary knowledge can lead to poor maintenance and reliability. It’s better to either train your team or consider alternative technologies that everyone is comfortable with.
- Company’s Existing Investment: If your company has already heavily invested in other technologies like BigQuery, Snowflake, or something else that works well for your needs, introducing Spark might complicate things. In such cases, it’s usually better to stick with the existing technology. Having a homogeneous pipeline—where all pipelines are built using the same technology—leads to better consistency and management. For example, it’s better to have 20 BigQuery pipelines than to mix them with one Spark pipeline.
How does Spark Work
- The Basketball Analogy: Spark can be understood by comparing it to a basketball team. Here are the key components:
- The Plan (The Play): This represents the overall strategy or the execution plan Spark is going to follow. It defines what operations need to be performed on the data.
- The Driver (The Coach): The driver is the Spark component that coordinates the entire process, much like a coach guiding the basketball team. The driver schedules tasks, manages execution, and ensures everything runs smoothly.
- The Executors (The Players): These are the actual workers in the system, similar to the basketball players who carry out the actions on the court. The executors perform the tasks assigned to them, such as processing data or applying transformations.
The Plan
- Coach’s Play: The plan is like the play that the coach designs for the basketball team. It’s the strategy Spark will follow, detailing the operations on the data.
- Describing the Plan: You can describe the plan in various languages like:
- Python
- Scala
- SQL (most common)
- R (if you prefer)
- Lazy Evaluation: The plan is evaluated lazily, meaning it doesn’t execute immediately. Instead, it waits until a “player takes a shot”. In Spark, this means the plan won’t run until one of these actions occurs:
- Writing output: Saving data to disk or a database.
- Collecting data: Gathering the results and sending them back to the driver.
- Taking a Shot: When Spark “takes a shot,” it processes the data, either writing it out or collecting it, essentially executing the plan. This is the moment when Spark actually starts the computation.
So, the plan is your data processing code (data frames, SQL, or dataset API), and it doesn’t run until triggered by a writing or collecting action.
The Driver
- Role of the Driver: The driver is like the coach of the basketball team. It reads the plan and figures out the best way to execute it, deciding what play (plan) to run.
- Key Settings:
spark.driver.memory: This determines the memory available to the driver to process the job.- Default: 2 GB, but can be increased up to 16 GB.
- When to increase:
- For complex jobs with many steps or plans.
- When using
collectoperations, which can alter the job’s plan (though generally discouraged).
spark.driver.memoryOverheadFactor:- This is the memory allocated for Java’s non-heap memory needs.
- It’s essential when dealing with complex jobs that require more memory due to JVM overhead.
- Other Driver Settings: According to the speaker’s experience, most of the other driver settings don’t have a substantial impact on performance.
- Driver’s Responsibilities:
- Job Execution: The driver decides when the job should actually execute, triggered by output or collection operations.
- Join Operations: It determines how to join datasets, which significantly affects performance. Choosing the wrong join type can make a job much slower or faster.
- Parallelism: The driver also decides how much parallelism is needed at each step:
- Initial Parallelism: Based on the number of files in the datasets.
- After Joins: Determines how to parallelize operations after the join to ensure efficient computation.
The executors
- Role of Executors: Executors are the workhorses of the Spark framework—the “Michael Jordans” and “LeBron James” of the team. They handle the actual data processing by performing transformations, filtering, aggregations, and other operations specified in the plan.
Key Executor Settings:
spark.executor.memory- What It Does: Specifies the memory allocated to each executor for processing tasks.
- Default: 1–2 GB; can be increased up to 16 GB.
- Optimization Strategy:
- Start small (e.g., 2 GB) and gradually increase (e.g., 4, 6, 8 GB).
- Aim for the smallest memory size that consistently prevents out-of-memory (OOM) errors over multiple runs.
- Avoid excessive padding: too much memory wastes resources; too little causes job failures.
- Common Misstep: Setting memory to the maximum (e.g., 16 GB) without consideration, which leads to inefficient and expensive jobs.
- Cost vs. Reliability Trade-Off:
- Engineering time is more expensive than slightly higher cloud costs.
- Accept a small amount of memory padding to avoid frequent debugging and restarts.
spark.executor.cores- What It Does: Determines the number of tasks an executor can handle simultaneously.
- Default: 4 tasks per executor.
- Optimization Guidelines:
- Can increase to 5 or 6 tasks if you need more parallelism.
- Do not exceed 6 tasks per executor to avoid bottlenecks, such as disk throughput and task scheduling delays.
- Trade-Off:
- More tasks increase parallelism but also raise the risk of OOM errors due to memory contention.
- Larger numbers of concurrent tasks make skewed data distributions more likely to cause failures.
spark.executor.memoryOverheadFactor- What It Does: Allocates additional memory for JVM overhead (non-heap memory required by Java).
- When to Adjust:
- For jobs with high complexity, including:
- Heavy use of User Defined Functions (UDFs)—especially PySpark UDFs, which are inefficient.
- Complex query plans with numerous joins, unions, or nested transformations.
- To improve job reliability without increasing
spark.executor.memory.
- For jobs with high complexity, including:
- Key Advantage: Increasing memory overhead enhances reliability without extra costs.
Executor Optimization Strategies
- Test Iteratively:
- Adjust memory and cores incrementally to balance resource usage and reliability.
- Run the job for several days with the same settings to ensure stability.
- Avoid Overcommitment:
- Do not allocate excessive cores or memory unless absolutely necessary.
- Skewed tasks (tasks with uneven data distribution) are more likely to cause OOM errors with higher parallelism.
- Handle PySpark UDFs Carefully:
- PySpark UDFs are notoriously inefficient and memory-intensive. Consider native Spark SQL functions or alternative methods when possible.
The types of JOINs in Spark
Joins are critical operations in Spark for combining datasets. Optimizing join types can have a massive impact on the performance of your Spark jobs. Here’s a breakdown of the three major join types in Spark and how they work.
Shuffle Sort Merge Join
- Performance: Least performant among the three.
- Versatility: Most versatile and works in almost any scenario.
- How It Works:
- Both datasets involved in the join are shuffled across the cluster to align data with matching keys.
- Data is then sorted before merging.
- Use Case:
- When neither side of the join is small or pre-bucketed.
- When you need a fallback join that works for large datasets.
- Challenges:
- Shuffling is expensive because it involves data movement across nodes, which increases network I/O and disk usage.
- Sorting further adds computational overhead.
- Optimization Tip: Minimize the use of Shuffle Sort Merge Joins when possible by using broadcast or bucket joins.
Broadcast Hash Join
- Performance: Faster than Shuffle Sort Merge Join for certain scenarios.
- How It Works:
- One side of the join (the smaller dataset) is broadcasted to all executors.
- The join is then performed locally on each executor, avoiding shuffling altogether.
- Use Case:
- When one dataset is small enough to fit in executor memory (typically 8–10 GB or less, depending on the job).
- Works for joins where there’s a clear large vs. small dataset distinction.
- Challenges:
- If the broadcasted dataset is too large, executors can run out of memory and crash.
- Requires careful tuning of executor memory overhead settings for reliability.
- How to Trigger a Broadcast Join:
- Use the Spark SQL hint:
/*+ BROADCAST(table_name) */in your SQL query. - Ensure Spark’s auto-broadcast threshold (
spark.sql.autoBroadcastJoinThreshold) is set appropriately (defaults to 10 MB but can be increased).
- Use the Spark SQL hint:
Bucket Join
- Performance: High performance when conditions are met.
- How It Works:
- Both datasets are bucketed and sorted by the same key before the join.
- Bucketing eliminates the need for shuffling, as data is already aligned.
- Use Case:
- When datasets are pre-bucketed on the join key (e.g., during data ingestion or preprocessing).
- Particularly useful for repeated joins on the same key, as bucketing is a one-time setup cost.
- Challenges:
- Requires datasets to have been written with bucketing enabled during their creation.
- Limited flexibility; datasets must match the same bucket number and partitioning scheme.
- How to Use Bucket Joins:
- Ensure the tables or datasets are bucketed (
bucketBy) and saved with proper configuration. - Spark will automatically recognize the buckets and use them during the join.
- Ensure the tables or datasets are bucketed (
Key Takeaways for Join Optimization
- Minimize Shuffle Sort Merge Joins:
- Use this as a last resort for joins involving large, unbucketed datasets.
- Leverage Broadcast Hash Joins:
- Use when one dataset is small enough to fit in executor memory. It drastically reduces the overhead of shuffling.
- Utilize Bucket Joins for Pre-Processed Data:
- If you have control over data ingestion pipelines, consider bucketing datasets on frequently used join keys to save shuffle overhead.
- Understand Join Types with
EXPLAIN:- Use
explain()to identify the join type Spark is using. Look for the terms:SortMergeJoin(Shuffle Sort Merge Join)BroadcastHashJoin(Broadcast Hash Join)BucketedSortMergeJoin(Bucket Join)
- Use
- Tune Settings for Broadcast Joins:
- Adjust
spark.sql.autoBroadcastJoinThresholdto handle larger broadcasted datasets if needed.
- Adjust
- Monitor Executor Memory:
- For broadcast joins, ensure there’s enough executor memory to handle the broadcasted dataset.
How does Shuffle work?
Shuffle is a key mechanism in Apache Spark, but it’s also the most costly and least scalable part of the framework. While Shuffle enables distributed processing, it introduces significant overhead, especially as data sizes grow. Let’s break down the details.
What is Shuffle?
- Shuffle refers to the process of redistributing data across the cluster, ensuring that related data (e.g., by a key) ends up in the same partition for operations like joins, group-bys, or reduces.
- Spark achieves this redistribution by:
- Splitting data into partitions.
- Hashing keys (e.g.,
user_id) to determine the target partition. - Sending data across nodes to the appropriate partitions.
Why Shuffle is Painful at Scale
- Network Overhead:
- Data is moved between nodes over the network, which is a slow and expensive operation compared to in-memory computation.
- Disk I/O:
- Spark often writes intermediate shuffle data to disk for fault tolerance, increasing latency.
- Memory Pressure:
- Shuffle operations can create temporary spikes in memory usage, leading to OutOfMemory (OOM) errors if resources aren’t configured properly.
- Exponential Costs with Scale:
- At scales of 20–30 TB/day or higher, shuffle becomes impractical. The data movement across nodes becomes a bottleneck, and alternative solutions (e.g., bucketing, pre-aggregations) are required.
When Does Shuffle Occur?
- Group By:
- Data is grouped by a key, requiring all matching keys to land in the same partition.
- Example: Grouping on
user_idsends user data with the sameuser_idto the same partition.
- Joins:
- During a shuffle sort merge join, data from both sides of the join is shuffled to ensure all matching keys are in the same partition.
- Example: Joining two datasets on
user_idwill hash theuser_idand send matching records to the same partition.
- Repartitioning:
- When you explicitly call
.repartition()or.coalesce()to change the number of partitions.
- When you explicitly call
- Wide Transformations:
- Transformations like
reduceByKey,distinct, orsortByKeythat require data movement across partitions.
- Transformations like
Shuffle Example: Group By
Imagine you’re grouping by user_id and you have 4 files spread across 3 partitions:
/attachments/Pasted-image-20250106172506.png)
- Initial Partitions:
- File 1 is read into Partition 1.
- File 2 into Partition 2, and so on.
- Hashing Keys:
- The
user_idis hashed and divided by the number of partitions (e.g., 3). - The remainder determines the target partition:
- Remainder 0 → Partition 1
- Remainder 1 → Partition 2
- Remainder 2 → Partition 3
- The
- Data Redistribution:
- Data from File 1, File 2, File 3, and File 4 is shuffled across the nodes to the correct partition based on the hash.
How Shuffle Works in Joins
Shuffle Sort Merge Join:
- Dataset A (Files 1 & 2) and Dataset B (Files 3 & 4) are read.
- Both datasets are shuffled and hashed by the join key (e.g.,
user_id). - Data with the same key ends up in the same partition on both sides.
- The join is executed locally in each partition.
Broadcast Join (Avoids Shuffle):
- If Dataset B (Files 3 & 4) is small enough, it’s broadcasted to all executors.
- No shuffle occurs. Instead:
- Each executor holds a copy of Dataset B.
- Joins are performed locally without data movement.
Minimizing Shuffle Overhead
- Broadcast Small Datasets:
- Use a broadcast join when one side of the join is small (≤ 10 GB).
- Reduce network and disk I/O by avoiding shuffles.
- Use Bucketing:
- Pre-bucket datasets on the join key to avoid shuffling during joins.
- Ensures data is pre-partitioned and sorted, leading to faster joins.
- Avoid Repartitioning:
- Keep the number of partitions consistent throughout the job.
- If repartitioning is necessary, carefully choose the number of partitions (e.g., reduce default 200 to a smaller number).
- Leverage Local Aggregations:
- Perform aggregations (e.g., partial sums) locally before shuffling.
- Example: Instead of grouping by
user_iddirectly, calculate intermediate results locally and then aggregate globally.
- Optimize Parallelism:
- Use
.coalesce()to reduce partitions intelligently for downstream processing.
- Use
Shuffle
Shuffle partitions and parallelism are tightly connected in Spark. Settings like spark.sql.shuffle.partitions (used for higher-level APIs like DataFrame) and spark.default.parallelism (for the RDD API) control how Spark redistributes data across partitions. Unless you’re working with low-level RDDs—which is discouraged—you should focus on spark.sql.shuffle.partitions for most workloads.
By default, Spark uses 200 shuffle partitions, which may not be suitable for all scenarios. Smaller jobs can benefit from fewer partitions, while large-scale jobs require more to avoid overloading executors. Keep in mind that shuffle, while necessary for certain operations like joins and group-bys, can become a performance bottleneck for massive datasets.
Is Shuffle Really That Bad? Shuffle often gets a bad reputation, but it’s not inherently “bad.” It’s vital for distributed computing, enabling Spark to group and join data correctly. However, shuffle can be problematic when:
- Processing tens or hundreds of terabytes.
- Working with highly skewed data.
- Performing joins with large cardinality mismatches.
For jobs handling reasonable volumes (e.g., under 1 TB), shuffle is generally manageable and worth optimizing rather than avoiding outright.
Challenges from Real-world Scenarios:
- Netflix’s IPv4 to IPv6 Transition
Initially, joining a 100 TB/hour dataset with a small IP lookup table worked using broadcast joins. However, switching to IPv6 exploded the size of the lookup table, making broadcast impossible. A shuffle join was attempted but failed due to the scale of the data. - Facebook Notification Joins
A pipeline joining 10 TB with 50 TB consumed 30% of the available compute resources. The shuffle costs were unsustainable, highlighting the need for an alternative approach.
Facebook Notification Joins: Bucketed Joins
- Data Sets: Two large tables—one with 10 TB and another with 50 TB of data—needed to be joined on
user_id. - Challenge: Using a shuffle sort merge join consumed 30% of total compute resources for notifications and was highly inefficient. Shuffle was becoming the bottleneck.
The Solution: Bucketed Joins. The tables were bucketed to avoid shuffling during the join operation. Here’s how it worked.
- Bucket the Tables
- Both tables were bucketed on the
user_idcolumn, using 1,024 buckets (a power of 2). - Bucketing ensures that data is pre-sorted and grouped by the join key (
user_id), so related rows from both tables are guaranteed to reside in the same bucket.
- Both tables were bucketed on the
- How Bucketing Works
- Each row is placed in a specific bucket based on
user_id % total_buckets. - For example:
- If
user_id % 1024 == 0, the row goes to bucket 0. - If
user_id % 1024 == 1, the row goes to bucket 1, and so on.
- If
- When performing the join, Spark directly aligns matching buckets from both tables. This avoids the need to shuffle data between partitions.
- Each row is placed in a specific bucket based on
- Power of Two Buckets
- Why powers of two?
- Ensures compatibility when bucket counts differ. For example:
- A table with 512 buckets can still align with one that has 1,024 buckets, as 512 is a factor of 1,024.
- Non-power-of-two bucket counts (e.g., 7, 13) create compatibility issues and can force unnecessary shuffling.
- Ensures compatibility when bucket counts differ. For example:
- Why powers of two?
Choosing the right number of buckets is key:
- Too Few Buckets: Leads to oversized partitions and risks executor memory overload.
- Too Many Buckets: Results in many empty or sparsely populated partitions (wasting resources).
- Rule of Thumb: Aim for roughly 10 GB per bucket.
- For example:
- A 10 TB table with 1,024 buckets gives approximately 10 GB per bucket.
- For example:
Edge Cases and Lessons Learned:
- Small Tables: Avoid over-bucketing for smaller datasets. If a table has only a few thousand rows, excessive bucketing leads to many empty buckets and inefficiencies.
- Example: Bucketing a 1,000-row table into 1,024 buckets guarantees empty buckets due to the pigeonhole principle.
- Empty Buckets: Can disrupt systems relying on evenly distributed data, as some buckets may contain no data at all.
- System Performance: Properly bucketed tables massively reduce resource usage, speeding up joins and avoiding bottlenecks.
By switching to bucketed joins:
- Shuffling was eliminated.
- Performance was drastically improved, freeing up compute resources for other pipelines.
The Netflix Problem: Solving a Massive Join at Scale
- Dataset: A pipeline processing 100 terabytes/hour of network request data.
- Join Requirement: Match each network request (IP address) with the microservice app that handled the request, using an IP lookup table.
- Challenge:
- The IP lookup table became too large to broadcast after migrating from IPv4 to IPv6 (drastically increasing the IP address space).
- Falling back to a shuffle join for this volume of data was infeasible, as it would require shuffling 100 TB/hour across the cluster.
The Solution: Solving the Problem Upstream. Instead of optimizing the Spark join, the problem was addressed at the data source:
- Change the Logging Strategy:
- Microservice owners were asked to log their app name directly when handling network requests.
- This meant that the information needed (the app associated with the network request) was already available in the raw data, eliminating the need for a join entirely.
- Result:
- The app information was embedded in the dataset, making the join unnecessary.
- By pushing the solution upstream, Spark’s computational burden was reduced, and the pipeline became much more efficient.
While effective, this approach had its own challenges:
- Coordination Overhead:
- Required reaching out to hundreds of microservice owners to implement the logging change.
- Aligning teams and processes took significant time and effort.
- Dependency Management:
- The pipeline now depended on all microservices consistently logging the app name.
- Any service failing to log correctly could break downstream pipelines.
- Data Latency:
- Changes in the microservice logging had to propagate downstream, which might introduce delays.
Lessons Learned:
- Scale-Driven Decisions:
- When dealing with extremely large-scale data (e.g., 100 TB/hour), traditional Spark optimizations (like broadcast or bucket joins) may not work.
- Instead, rethink the problem and address inefficiencies at their source.
- Spark Isn’t Always the Solution:
- Solving problems outside of Spark (e.g., upstream logging changes) can often simplify pipelines and save resources.
- Avoid the “hammer and nail” mentality—not all problems need Spark optimizations.
- Collaborative Problem-Solving:
- Sometimes the best solution requires cross-team collaboration and buy-in, even if it’s painful and time-consuming.
How to Minimize Shuffle at High Volumes
- Use Bucketing for Multiple Joins or Aggregations:
- Bucketing is highly effective when there are multiple joins or aggregations happening downstream.
- If you are only performing one join, then bucketing is typically not worth the cost.
- Why? The shuffle cost required to write a bucketed table is only justified if you’re going to reuse the buckets for several operations.
- Costs of Bucketing:
- Writing a bucketed table incurs an initial shuffle cost, which can be expensive.
- The benefits of bucketing are only realized when subsequent operations avoid shuffling because the data is already aligned by buckets.
Handling Presto with Bucketed Tables:
- Small Number of Buckets Can Cause Issues:
- If your table has a small number of buckets (e.g., 8 or 16), Presto queries might actually perform worse.
- Why? Presto’s initial parallelism will match the number of buckets, but for non-bucketed tables, it can dynamically split files and achieve higher parallelism.
- If your table has a small number of buckets (e.g., 8 or 16), Presto queries might actually perform worse.
- General Advice:
- While buckets are great for Spark, be cautious when using them with systems like Presto, especially for smaller bucket counts.
Always Use Powers of Two for Bucketing
- When creating bucketed tables, always use powers of two:
- Examples: 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024, etc.
- Why powers of two?
- It ensures that bucket counts can align as multiples with other tables.
- For example, a table with 8 buckets can align with another table with 16 buckets.
- If you use non-power-of-two values (e.g., 7 or 13), the tables will only align if both sides use the exact same bucket count, which is unlikely.
The Importance of Minimizing Shuffle
- Minimizing shuffle is one of the most impactful optimizations you can make, especially at scale.
- Shuffle costs rise exponentially with data size because of the network I/O and compute resources involved.
- At big scales, avoiding shuffle can save significant money and improve performance dramatically.
Conclusion
- Bucketing is a great tool for minimizing shuffle, but it should be used strategically:
- Don’t bucket unnecessarily: The cost of creating buckets only pays off if there are multiple downstream operations that reuse the buckets.
- Always stick to powers of two to ensure compatibility and flexibility.
- Minimizing shuffle is not just a technical challenge—it can be a career-defining skill that showcases your ability to optimize at scale and deliver cost savings.
min 36