diff --git a/Chapter5/better_pandas.ipynb b/Chapter5/better_pandas.ipynb index 990d160..3dd9cec 100644 --- a/Chapter5/better_pandas.ipynb +++ b/Chapter5/better_pandas.ipynb @@ -2289,6 +2289,258 @@ "[Link to Delta Lake](https://github.com/delta-io/delta)." ] }, + { + "cell_type": "markdown", + "id": "6a202591", + "metadata": {}, + "source": [ + "### From Complex SQL to Simple Merges: Delta Lake's Upsert Solution" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "e655b5fa", + "metadata": { + "tags": [ + "hide-cell" + ] + }, + "outputs": [], + "source": [ + "!pip install delta-spark" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "32ae71e5", + "metadata": { + "tags": [ + "remove-cell" + ] + }, + "outputs": [], + "source": [ + "import pyspark\n", + "from delta import *\n", + "\n", + "# Configure Spark to use Delta\n", + "builder = (\n", + " pyspark.sql.SparkSession.builder.appName(\"MyApp\")\n", + " .config(\"spark.sql.extensions\", \"io.delta.sql.DeltaSparkSessionExtension\")\n", + " .config(\n", + " \"spark.sql.catalog.spark_catalog\",\n", + " \"org.apache.spark.sql.delta.catalog.DeltaCatalog\",\n", + " )\n", + ")\n", + "\n", + "spark = configure_spark_with_delta_pip(builder).getOrCreate()" + ] + }, + { + "cell_type": "markdown", + "id": "775dcae5", + "metadata": {}, + "source": [ + "Traditionally, implementing upsert (update or insert) logic requires separate UPDATE and INSERT statements or complex SQL. This approach can be error-prone and inefficient, especially for large datasets. \n", + "\n", + "Delta Lake's merge operation solves this problem by allowing you to specify different actions for matching and non-matching records in a single, declarative statement.\n", + "\n", + "Here's an example that demonstrates the power and simplicity of Delta Lake's merge operation:\n", + "\n", + "First, let's set up our initial data:\n", + "\n", + "\n" + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "id": "ff393032", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Initial Customers:\n", + "+-----------+-----------+----------------+-------------------+\n", + "|customer_id| name| email| last_updated|\n", + "+-----------+-----------+----------------+-------------------+\n", + "| 1| John Doe|john@example.com|2023-01-01 10:00:00|\n", + "| 2| Jane Smith|jane@example.com|2023-01-02 11:00:00|\n", + "| 3|Bob Johnson| bob@example.com|2023-01-03 12:00:00|\n", + "+-----------+-----------+----------------+-------------------+\n", + "\n", + "Updates:\n", + "+-----------+-----------+--------------------+\n", + "|customer_id| name| email|\n", + "+-----------+-----------+--------------------+\n", + "| 2| Jane Doe|jane.doe@example.com|\n", + "| 3|Bob Johnson| bob@example.com|\n", + "| 4|Alice Brown| alice@example.com|\n", + "+-----------+-----------+--------------------+\n", + "\n" + ] + } + ], + "source": [ + "# Create sample data for 'customers' DataFrame\n", + "customers_data = [\n", + " (1, \"John Doe\", \"john@example.com\", \"2023-01-01 10:00:00\"),\n", + " (2, \"Jane Smith\", \"jane@example.com\", \"2023-01-02 11:00:00\"),\n", + " (3, \"Bob Johnson\", \"bob@example.com\", \"2023-01-03 12:00:00\"),\n", + "]\n", + "customers = spark.createDataFrame(\n", + " customers_data, [\"customer_id\", \"name\", \"email\", \"last_updated\"]\n", + ")\n", + "\n", + "# Create sample data for 'updates' DataFrame\n", + "updates_data = [\n", + " (2, \"Jane Doe\", \"jane.doe@example.com\"), # Existing customer with updates\n", + " (3, \"Bob Johnson\", \"bob@example.com\"), # Existing customer without changes\n", + " (4, \"Alice Brown\", \"alice@example.com\"), # New customer\n", + "]\n", + "updates = spark.createDataFrame(updates_data, [\"customer_id\", \"name\", \"email\"])\n", + "\n", + "# Show the initial data\n", + "print(\"Initial Customers:\")\n", + "customers.show()\n", + "print(\"Updates:\")\n", + "updates.show()" + ] + }, + { + "cell_type": "markdown", + "id": "acb9e489", + "metadata": {}, + "source": [ + "Next, we create a Delta table from our initial customer data:" + ] + }, + { + "cell_type": "code", + "execution_count": 13, + "id": "0041f1d4", + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + " \r" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Customers Delta Table created successfully\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + " \r" + ] + } + ], + "source": [ + "# Define the path where you want to save the Delta table\n", + "delta_table_path = \"customers_delta\"\n", + "\n", + "# Write the DataFrame as a Delta table\n", + "customers.write.format(\"delta\").mode(\"overwrite\").save(delta_table_path)\n", + "\n", + "# Create a DeltaTable object\n", + "customers_delta = DeltaTable.forPath(spark, delta_table_path)\n", + "\n", + "print(\"Customers Delta Table created successfully\")" + ] + }, + { + "cell_type": "markdown", + "id": "560b2a9d", + "metadata": {}, + "source": [ + "Now, here's the key part - the merge operation that handles both updates and inserts in a single statement:" + ] + }, + { + "cell_type": "code", + "execution_count": 14, + "id": "f0626375", + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + " \r" + ] + } + ], + "source": [ + "# Assume 'customers_delta' is your target table and 'updates' is your source of new data\n", + "customers_delta.alias(\"target\").merge(\n", + " updates.alias(\"source\"),\n", + " \"target.customer_id = source.customer_id\"\n", + ").whenMatchedUpdate(set={\n", + " \"name\": \"source.name\",\n", + " \"email\": \"source.email\",\n", + " \"last_updated\": \"current_timestamp()\"\n", + "}).whenNotMatchedInsert(values={\n", + " \"customer_id\": \"source.customer_id\",\n", + " \"name\": \"source.name\",\n", + " \"email\": \"source.email\",\n", + " \"last_updated\": \"current_timestamp()\"\n", + "}).execute()" + ] + }, + { + "cell_type": "code", + "execution_count": 15, + "id": "0ed114dc", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Updated Customers Delta Table:\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + " \r" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+-----------+-----------+--------------------+--------------------+\n", + "|customer_id| name| email| last_updated|\n", + "+-----------+-----------+--------------------+--------------------+\n", + "| 2| Jane Doe|jane.doe@example.com|2024-08-20 16:05:...|\n", + "| 3|Bob Johnson| bob@example.com|2024-08-20 16:05:...|\n", + "| 4|Alice Brown| alice@example.com|2024-08-20 16:05:...|\n", + "| 1| John Doe| john@example.com| 2023-01-01 10:00:00|\n", + "+-----------+-----------+--------------------+--------------------+\n", + "\n" + ] + } + ], + "source": [ + "# Verify the updated data\n", + "print(\"Updated Customers Delta Table:\")\n", + "customers_delta.toDF().show()" + ] + }, { "attachments": {}, "cell_type": "markdown", @@ -4178,7 +4430,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.11.4" + "version": "3.11.6" }, "toc": { "base_numbering": 1, diff --git a/docs/Chapter5/better_pandas.html b/docs/Chapter5/better_pandas.html index fe70cec..946f779 100644 --- a/docs/Chapter5/better_pandas.html +++ b/docs/Chapter5/better_pandas.html @@ -524,16 +524,17 @@
!pip install delta-spark
+
Traditionally, implementing upsert (update or insert) logic requires separate UPDATE and INSERT statements or complex SQL. This approach can be error-prone and inefficient, especially for large datasets.
+Delta Lake’s merge operation solves this problem by allowing you to specify different actions for matching and non-matching records in a single, declarative statement.
+Here’s an example that demonstrates the power and simplicity of Delta Lake’s merge operation:
+First, let’s set up our initial data:
+# Create sample data for 'customers' DataFrame
+customers_data = [
+ (1, "John Doe", "john@example.com", "2023-01-01 10:00:00"),
+ (2, "Jane Smith", "jane@example.com", "2023-01-02 11:00:00"),
+ (3, "Bob Johnson", "bob@example.com", "2023-01-03 12:00:00"),
+]
+customers = spark.createDataFrame(
+ customers_data, ["customer_id", "name", "email", "last_updated"]
+)
+
+# Create sample data for 'updates' DataFrame
+updates_data = [
+ (2, "Jane Doe", "jane.doe@example.com"), # Existing customer with updates
+ (3, "Bob Johnson", "bob@example.com"), # Existing customer without changes
+ (4, "Alice Brown", "alice@example.com"), # New customer
+]
+updates = spark.createDataFrame(updates_data, ["customer_id", "name", "email"])
+
+# Show the initial data
+print("Initial Customers:")
+customers.show()
+print("Updates:")
+updates.show()
+
Initial Customers:
++-----------+-----------+----------------+-------------------+
+|customer_id| name| email| last_updated|
++-----------+-----------+----------------+-------------------+
+| 1| John Doe|john@example.com|2023-01-01 10:00:00|
+| 2| Jane Smith|jane@example.com|2023-01-02 11:00:00|
+| 3|Bob Johnson| bob@example.com|2023-01-03 12:00:00|
++-----------+-----------+----------------+-------------------+
+
+Updates:
++-----------+-----------+--------------------+
+|customer_id| name| email|
++-----------+-----------+--------------------+
+| 2| Jane Doe|jane.doe@example.com|
+| 3|Bob Johnson| bob@example.com|
+| 4|Alice Brown| alice@example.com|
++-----------+-----------+--------------------+
+
Next, we create a Delta table from our initial customer data:
+# Define the path where you want to save the Delta table
+delta_table_path = "customers_delta"
+
+# Write the DataFrame as a Delta table
+customers.write.format("delta").mode("overwrite").save(delta_table_path)
+
+# Create a DeltaTable object
+customers_delta = DeltaTable.forPath(spark, delta_table_path)
+
+print("Customers Delta Table created successfully")
+
+
Customers Delta Table created successfully
+
+
Now, here’s the key part - the merge operation that handles both updates and inserts in a single statement:
+# Assume 'customers_delta' is your target table and 'updates' is your source of new data
+customers_delta.alias("target").merge(
+ updates.alias("source"),
+ "target.customer_id = source.customer_id"
+).whenMatchedUpdate(set={
+ "name": "source.name",
+ "email": "source.email",
+ "last_updated": "current_timestamp()"
+}).whenNotMatchedInsert(values={
+ "customer_id": "source.customer_id",
+ "name": "source.name",
+ "email": "source.email",
+ "last_updated": "current_timestamp()"
+}).execute()
+
+
# Verify the updated data
+print("Updated Customers Delta Table:")
+customers_delta.toDF().show()
+
Updated Customers Delta Table:
+
+
+-----------+-----------+--------------------+--------------------+
+|customer_id| name| email| last_updated|
++-----------+-----------+--------------------+--------------------+
+| 2| Jane Doe|jane.doe@example.com|2024-08-20 16:05:...|
+| 3|Bob Johnson| bob@example.com|2024-08-20 16:05:...|
+| 4|Alice Brown| alice@example.com|2024-08-20 16:05:...|
+| 1| John Doe| john@example.com| 2023-01-01 10:00:00|
++-----------+-----------+--------------------+--------------------+
+
Appending mismatched data to a Parquet table involves reading the existing data, concatenating it with the new data, and overwriting the existing Parquet file. This approach can be expensive and may lead to schema inconsistencies.
In the following code, the datatype of col3
is supposed to be int64
instead of float64
.
Extract features and select only relevant features for each time series.
Learn more about Streaming API in Polars.
As a data scientist, you’re likely familiar with the popular data analysis libraries Pandas and Polars. Both provide powerful tools for working with tabular data, but how do their syntaxes compare?
To begin, we’ll create equivalent dataframes in both Pandas and Polars: