Skip to content

Commit

Permalink
add delta lake upsert
Browse files Browse the repository at this point in the history
  • Loading branch information
khuyentran1401 committed Aug 20, 2024
1 parent 21aa9ee commit 7d57881
Show file tree
Hide file tree
Showing 4 changed files with 690 additions and 33 deletions.
254 changes: 253 additions & 1 deletion Chapter5/better_pandas.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -2289,6 +2289,258 @@
"[Link to Delta Lake](https://github.com/delta-io/delta)."
]
},
{
"cell_type": "markdown",
"id": "6a202591",
"metadata": {},
"source": [
"### From Complex SQL to Simple Merges: Delta Lake's Upsert Solution"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "e655b5fa",
"metadata": {
"tags": [
"hide-cell"
]
},
"outputs": [],
"source": [
"!pip install delta-spark"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "32ae71e5",
"metadata": {
"tags": [
"remove-cell"
]
},
"outputs": [],
"source": [
"import pyspark\n",
"from delta import *\n",
"\n",
"# Configure Spark to use Delta\n",
"builder = (\n",
" pyspark.sql.SparkSession.builder.appName(\"MyApp\")\n",
" .config(\"spark.sql.extensions\", \"io.delta.sql.DeltaSparkSessionExtension\")\n",
" .config(\n",
" \"spark.sql.catalog.spark_catalog\",\n",
" \"org.apache.spark.sql.delta.catalog.DeltaCatalog\",\n",
" )\n",
")\n",
"\n",
"spark = configure_spark_with_delta_pip(builder).getOrCreate()"
]
},
{
"cell_type": "markdown",
"id": "775dcae5",
"metadata": {},
"source": [
"Traditionally, implementing upsert (update or insert) logic requires separate UPDATE and INSERT statements or complex SQL. This approach can be error-prone and inefficient, especially for large datasets. \n",
"\n",
"Delta Lake's merge operation solves this problem by allowing you to specify different actions for matching and non-matching records in a single, declarative statement.\n",
"\n",
"Here's an example that demonstrates the power and simplicity of Delta Lake's merge operation:\n",
"\n",
"First, let's set up our initial data:\n",
"\n",
"\n"
]
},
{
"cell_type": "code",
"execution_count": 12,
"id": "ff393032",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Initial Customers:\n",
"+-----------+-----------+----------------+-------------------+\n",
"|customer_id| name| email| last_updated|\n",
"+-----------+-----------+----------------+-------------------+\n",
"| 1| John Doe|john@example.com|2023-01-01 10:00:00|\n",
"| 2| Jane Smith|jane@example.com|2023-01-02 11:00:00|\n",
"| 3|Bob Johnson| bob@example.com|2023-01-03 12:00:00|\n",
"+-----------+-----------+----------------+-------------------+\n",
"\n",
"Updates:\n",
"+-----------+-----------+--------------------+\n",
"|customer_id| name| email|\n",
"+-----------+-----------+--------------------+\n",
"| 2| Jane Doe|jane.doe@example.com|\n",
"| 3|Bob Johnson| bob@example.com|\n",
"| 4|Alice Brown| alice@example.com|\n",
"+-----------+-----------+--------------------+\n",
"\n"
]
}
],
"source": [
"# Create sample data for 'customers' DataFrame\n",
"customers_data = [\n",
" (1, \"John Doe\", \"john@example.com\", \"2023-01-01 10:00:00\"),\n",
" (2, \"Jane Smith\", \"jane@example.com\", \"2023-01-02 11:00:00\"),\n",
" (3, \"Bob Johnson\", \"bob@example.com\", \"2023-01-03 12:00:00\"),\n",
"]\n",
"customers = spark.createDataFrame(\n",
" customers_data, [\"customer_id\", \"name\", \"email\", \"last_updated\"]\n",
")\n",
"\n",
"# Create sample data for 'updates' DataFrame\n",
"updates_data = [\n",
" (2, \"Jane Doe\", \"jane.doe@example.com\"), # Existing customer with updates\n",
" (3, \"Bob Johnson\", \"bob@example.com\"), # Existing customer without changes\n",
" (4, \"Alice Brown\", \"alice@example.com\"), # New customer\n",
"]\n",
"updates = spark.createDataFrame(updates_data, [\"customer_id\", \"name\", \"email\"])\n",
"\n",
"# Show the initial data\n",
"print(\"Initial Customers:\")\n",
"customers.show()\n",
"print(\"Updates:\")\n",
"updates.show()"
]
},
{
"cell_type": "markdown",
"id": "acb9e489",
"metadata": {},
"source": [
"Next, we create a Delta table from our initial customer data:"
]
},
{
"cell_type": "code",
"execution_count": 13,
"id": "0041f1d4",
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
" \r"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"Customers Delta Table created successfully\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
" \r"
]
}
],
"source": [
"# Define the path where you want to save the Delta table\n",
"delta_table_path = \"customers_delta\"\n",
"\n",
"# Write the DataFrame as a Delta table\n",
"customers.write.format(\"delta\").mode(\"overwrite\").save(delta_table_path)\n",
"\n",
"# Create a DeltaTable object\n",
"customers_delta = DeltaTable.forPath(spark, delta_table_path)\n",
"\n",
"print(\"Customers Delta Table created successfully\")"
]
},
{
"cell_type": "markdown",
"id": "560b2a9d",
"metadata": {},
"source": [
"Now, here's the key part - the merge operation that handles both updates and inserts in a single statement:"
]
},
{
"cell_type": "code",
"execution_count": 14,
"id": "f0626375",
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
" \r"
]
}
],
"source": [
"# Assume 'customers_delta' is your target table and 'updates' is your source of new data\n",
"customers_delta.alias(\"target\").merge(\n",
" updates.alias(\"source\"),\n",
" \"target.customer_id = source.customer_id\"\n",
").whenMatchedUpdate(set={\n",
" \"name\": \"source.name\",\n",
" \"email\": \"source.email\",\n",
" \"last_updated\": \"current_timestamp()\"\n",
"}).whenNotMatchedInsert(values={\n",
" \"customer_id\": \"source.customer_id\",\n",
" \"name\": \"source.name\",\n",
" \"email\": \"source.email\",\n",
" \"last_updated\": \"current_timestamp()\"\n",
"}).execute()"
]
},
{
"cell_type": "code",
"execution_count": 15,
"id": "0ed114dc",
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Updated Customers Delta Table:\n"
]
},
{
"name": "stderr",
"output_type": "stream",
"text": [
" \r"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"+-----------+-----------+--------------------+--------------------+\n",
"|customer_id| name| email| last_updated|\n",
"+-----------+-----------+--------------------+--------------------+\n",
"| 2| Jane Doe|jane.doe@example.com|2024-08-20 16:05:...|\n",
"| 3|Bob Johnson| bob@example.com|2024-08-20 16:05:...|\n",
"| 4|Alice Brown| alice@example.com|2024-08-20 16:05:...|\n",
"| 1| John Doe| john@example.com| 2023-01-01 10:00:00|\n",
"+-----------+-----------+--------------------+--------------------+\n",
"\n"
]
}
],
"source": [
"# Verify the updated data\n",
"print(\"Updated Customers Delta Table:\")\n",
"customers_delta.toDF().show()"
]
},
{
"attachments": {},
"cell_type": "markdown",
Expand Down Expand Up @@ -4178,7 +4430,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.4"
"version": "3.11.6"
},
"toc": {
"base_numbering": 1,
Expand Down
Loading

0 comments on commit 7d57881

Please sign in to comment.