Cover image for Deep dive into Delta Lake merge

Deep dive into Delta Lake merge

If you are working with Delta Lake, one of the most popular frameworks for building large-scale data pipelines, you may have encountered the MERGE command. It is a powerful command that simplifies complex data processing tasks, but it can also be challenging to use efficiently. Additionally, it may not be immediately clear why you should use it instead of traditional methods like UPDATE and INSERT.

Therefore, in this article, I will explain how the Delta Lake MERGE command works under the hood and how to use it efficiently. By the end of this article, you will have a better understanding of the MERGE command and how to optimize its performance in your data pipelines.

NOTE: This article is based on Delta Lake 4.0.0.

Why does merge matters?

Suppose you're working on customer data for an online retail company. Your pipeline receives a stream of updates to customer information from various sources, such as CRM systems, web forms, and customer support interactions. Some records may need to be updated to existing customers, while others may represent new customers. Without MERGE, you would need to:

  1. Read the entire target table to find existing customers.
  2. Filter the incoming updates to identify new and existing customers.
  3. Perform separate UPDATE and INSERT operations for existing and new customers.

As a result, you would need to write a lot of code to perform these operations, and you would have to invest significant effort to ensure transactional integrity and prevent data corruption caused by concurrent pipelines writing to the same partition.

What is merge?

MERGE is a command in Delta Lake that performs insert, update, or delete operations based on the following conditions:

  • whenMatched: when a source row matches a target row, perform the specified action on the source row
  • whenNotMatched: when a source row does not match any target row, perform the specified action to the source row
  • whenNotMatchedBySource: when a target row does not match any source row, perform the specified action on the target row

Another important aspect of MERGE is automatic schema evolution. It handles schema mismatches between the source and target tables by automatically evolving the schema.

The most common use case of MERGE is to perform an upsert operation. By combining the whenMatched and whenNotMatched clauses, you can execute an upsert in a single command.

-- Upsert operation using MERGE
MERGE INTO target
USING source
ON source.key = target.key
WHEN MATCHED
  UPDATE SET *
WHEN NOT MATCHED
  INSERT *
WHEN NOT MATCHED BY SOURCE
  DELETE

Another common use case is to implement a cdc (change data capture) pipeline. It combines detecting changes in the source table and performing delete, update, or insert operations on the target table in an atomic way.

-- CDC operation using MERGE
MERGE INTO users
USING (
  SELECT userId, latest.address AS address, latest.deleted AS deleted FROM (
    SELECT userId, MAX(struct(TIME, address, deleted)) AS latest
    FROM changes GROUP BY userId
  )
) latestChange
ON latestChange.userId = users.userId
WHEN MATCHED AND latestChange.deleted = TRUE THEN
  DELETE
WHEN MATCHED THEN
  UPDATE SET address = latestChange.address
WHEN NOT MATCHED AND latestChange.deleted = FALSE THEN
  INSERT (userId, address) VALUES (userId, address)

What's inside?

MERGE consists of two distinct phases:

  1. Identifying the affected files (typically using an inner join)
  2. Applying the changes to those files (typically using a full outer join)

merge

Phase 1: Identifying affected files

To identify the touched files, it performs an inner join using the given merge condition between the target and source tables to find matching records. Then it verifies that no source row matches multiple target rows. If this condition is violated, it throws an error. Finally, it identifies all files that contain matching records.

NOTE: The join type can be a right outer join if the WHEN NOT MATCHED BY SOURCE clause is specified, which is used for deleting or updating the target rows that are not matched with the source rows.

Therefore, the performance of this phase is influenced by two factors: how efficiently the existing files are read and how efficiently the join is performed.

Read performance can be improved by:

  • Merge predicates: Use merge predicates to reduce the search space. If your table uses Hive-style partitioning, include partition columns in the merge condition to take advantage of partition pruning.
  • Compacting small files: The small file problem degrades read I/O performance. Compact small files to improve performance.
  • Z-ordering: Z-order is a technique to improve the locality of records, enabling data skipping. It also uses a bin-packing algorithm under the hood, which can significantly improve read performance.

Join performance can be improved by:

  • The number of partitions: The number of partitions configured by spark.sql.shuffle.partitions is relevant to the order of parallelism and the number of output partitions. Properly configured the number of partitions can improve the join performance.
  • Broadcast join: If you have enough memory for each executor, you can increase the broadcast join threshold by setting spark.sql.autoBroadcastJoinThreshold to a larger value to use broadcast join, which can significantly improve the join performance.

Phase 2: Applying Changes

In the second phase, Delta Lake reads the affected files identified in the first phase and applies the necessary changes. The way Delta Lake combines the source and target data depends on the specific clauses used in your MERGE statement. This is typically achieved through a join operation, but the type of join can vary:

MERGE Clauses PresentJoin Type UsedTypical Use Case
WHEN MATCHEDInner JoinUpdate/Delete only
WHEN NOT MATCHEDRight Outer JoinInsert only
WHEN MATCHED + WHEN NOT MATCHEDLeft Outer JoinUpsert (update/insert)
All three (including NOT MATCHED BY SOURCE)Full Outer JoinUpsert + Delete (CDC)

After performing the join, it applies the changes to the files. Therefore, performance is affected by both the efficiency of the join and the efficiency of writing the files.

Join performance can be improved by:

  • The number of partitions: The number of partitions configured by spark.sql.shuffle.partitions is relevant to the order of parallelism and the number of output partitions. Properly configured the number of partitions can improve the join performance.
  • Cache the source table: If the source table is stored in a caching layer such as memory, it improves the join performance by using the cached data directly. Note that you should not cache the target table since it will cause cache coherence problems.

Write performance can be improved by:

  • Optimized write: The number of output files affects the write performance. Without optimized write, the number of output files is either the same as the number of partitions (which is commonly large, by default 200) or a small number of files if AQE (Adaptive Query Execution) is enabled. This can lead to poor write performance, as it may result in each file being excessively large. Optimized write efficiently manages the number of output files by repartitioning the data before writing. The number of output files is determined by the size of the data, which is configured by spark.databricks.delta.optimizeWrite.binSize.
  • repartitionBeforeWrite: Alternatively, you can set spark.delta.merge.repartitionBeforeWrite to true to repartition the data by the table's partition columns before writing. This is unnecessary if you have already enabled optimized write, and there is generally no reason to use it instead of optimized write.

Automatic schema evolution

If the schema of the Delta table is different from the source, INSERT and UPDATE operations will fail with an error. This is where automatic schema evolution comes in. It automatically evolves the schema of the target table to match the source table by adding new columns or updating existing columns as needed.

It physically changes the metadata of parquet files. Note that it updates the metadata of files only if the target file includes modified records. If an existing file has no modified records, it remains unchanged, and its metadata will be updated only when the file is modified by future DML operations.

Additional Tips

Merge does not break Hive-style partitioning

A Hive-style partition is a common way to organize data in Delta Lake. You do not need to worry about MERGE breaking the existing partition structure. It is designed to work seamlessly with Hive-style partitioning.

  • It creates new directories if the partition does not exist.
  • It does not break updating existing partitions.
  • Partition pruning is effective to reduce the search space.

Low shuffle merge

NOTE: Only available in Databricks Runtime 10.4 LTS and above.

Suppose there are only a small number of records to be updated in a large file. In this case, MERGE still reads the entire file and replaces it with a new file, causing unnecessary rows to be shuffled and written. If you are using Databricks, low shuffle merge can help you avoid this problem. It can be enabled by setting spark.databricks.delta.merge.enableLowShuffle to true.

It works by passing only the modified rows to the shuffle stages, which significantly reduces the amount of data shuffled and written. It also attempts to preserve the existing data layout of unmodified records, including z-order optimization.

References