From ee5bf96a3bf38ad81e7f7381e7bf555b2a0b7da4 Mon Sep 17 00:00:00 2001 From: khuyentran1401 Date: Sat, 30 Mar 2024 14:57:13 -0500 Subject: [PATCH] add spark udf --- Chapter5/spark.ipynb | 198 +++++++++++++++++++++++++++++++++++++++---- 1 file changed, 183 insertions(+), 15 deletions(-) diff --git a/Chapter5/spark.ipynb b/Chapter5/spark.ipynb index 03ca7019..c9afca9d 100644 --- a/Chapter5/spark.ipynb +++ b/Chapter5/spark.ipynb @@ -32,7 +32,7 @@ }, { "cell_type": "code", - "execution_count": 37, + "execution_count": 3, "id": "c7e8e068", "metadata": { "tags": [ @@ -76,7 +76,7 @@ }, { "cell_type": "code", - "execution_count": 35, + "execution_count": 4, "id": "be8b69c8", "metadata": {}, "outputs": [], @@ -88,10 +88,17 @@ }, { "cell_type": "code", - "execution_count": 41, + "execution_count": 5, "id": "58d04cb6", "metadata": {}, "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + " \r" + ] + }, { "name": "stdout", "output_type": "stream", @@ -99,11 +106,11 @@ "+---+----+----+\n", "|cat|val1|val2|\n", "+---+----+----+\n", - "| c| 56| 60|\n", - "| b| 23| 65|\n", - "| b| 39| 8|\n", - "| a| 77| 39|\n", - "| c| 17| 66|\n", + "| b| 0| 34|\n", + "| a| 58| 12|\n", + "| c| 24| 72|\n", + "| a| 20| 58|\n", + "| b| 13| 17|\n", "+---+----+----+\n", "only showing top 5 rows\n", "\n" @@ -117,7 +124,7 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 6, "id": "01181341", "metadata": {}, "outputs": [], @@ -127,7 +134,7 @@ }, { "cell_type": "code", - "execution_count": 40, + "execution_count": 7, "id": "305ddc35", "metadata": {}, "outputs": [ @@ -141,12 +148,12 @@ { "data": { "text/plain": [ - "[Row(cat='c', avg(val2)=49.56976095832509),\n", - " Row(cat='b', avg(val2)=49.43383865728208),\n", - " Row(cat='a', avg(val2)=49.68113195098398)]" + "[Row(cat='c', avg(val2)=49.54095055783208),\n", + " Row(cat='b', avg(val2)=49.46593810642427),\n", + " Row(cat='a', avg(val2)=49.52092805080465)]" ] }, - "execution_count": 40, + "execution_count": 7, "metadata": {}, "output_type": "execute_result" } @@ -987,6 +994,167 @@ "source": [ "[View other array functions](https://bit.ly/4c0txD1)." ] + }, + { + "cell_type": "markdown", + "id": "278c6d76", + "metadata": {}, + "source": [ + "### Simplify Complex SQL Queries with PySpark UDFs" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "f75b6174", + "metadata": { + "tags": [ + "hide-cell" + ] + }, + "outputs": [], + "source": [ + "!pip install \"pyspark[sql]\"" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "0f909f3d", + "metadata": { + "tags": [ + "hide-cell" + ] + }, + "outputs": [], + "source": [ + "from pyspark.sql import SparkSession\n", + "\n", + "spark = SparkSession.builder.getOrCreate()" + ] + }, + { + "cell_type": "markdown", + "id": "08b4e6b5", + "metadata": {}, + "source": [ + "SQL queries can often become complex and challenging to comprehend." + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "id": "4b733f5f", + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + " \r" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+---+-------------+\n", + "| id|modified_name|\n", + "+---+-------------+\n", + "| 1| John doe|\n", + "| 2| Jane smith|\n", + "| 3| Bob johnson|\n", + "+---+-------------+\n", + "\n" + ] + } + ], + "source": [ + "df = spark.createDataFrame(\n", + " [(1, \"John Doe\"), (2, \"Jane Smith\"), (3, \"Bob Johnson\")], [\"id\", \"name\"]\n", + ")\n", + "\n", + "# Register the DataFrame as a temporary table or view\n", + "df.createOrReplaceTempView(\"df\")\n", + "\n", + "# Complex SQL query\n", + "spark.sql(\n", + " \"\"\"\n", + " SELECT id, CONCAT(UPPER(SUBSTRING(name, 1, 1)), LOWER(SUBSTRING(name, 2))) AS modified_name\n", + " FROM df\n", + "\"\"\"\n", + ").show()" + ] + }, + { + "cell_type": "markdown", + "id": "8056521a", + "metadata": {}, + "source": [ + "Using PySpark UDFs simplifies complex SQL queries by encapsulating complex operations into a single function call, resulting in cleaner queries. UDFs also allow for the reuse of complex logic across different queries. \n", + "\n", + "In the code example below, we define a UDF called `modify_name` that converts the name to uppercase." + ] + }, + { + "cell_type": "code", + "execution_count": 13, + "id": "59804345", + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "24/03/30 14:36:24 WARN SimpleFunctionRegistry: The function modify_name replaced a previously registered function.\n", + " \r" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+---+-------------+\n", + "| id|modified_name|\n", + "+---+-------------+\n", + "| 1| John doe|\n", + "| 2| Jane smith|\n", + "| 3| Bob johnson|\n", + "+---+-------------+\n", + "\n" + ] + } + ], + "source": [ + "from pyspark.sql.functions import udf\n", + "from pyspark.sql.types import StringType\n", + "\n", + "\n", + "# Define a UDF to modify the name\n", + "@udf(returnType=StringType())\n", + "def modify_name(name):\n", + " return name[0].upper() + name[1:].lower()\n", + "\n", + "spark.udf.register('modify_name', modify_name)\n", + "\n", + "# Apply the UDF in the spark.sql query\n", + "df.createOrReplaceTempView(\"df\")\n", + "\n", + "spark.sql(\n", + " \"\"\"\n", + " SELECT id, modify_name(name) AS modified_name\n", + " FROM df\n", + "\"\"\"\n", + ").show()" + ] + }, + { + "cell_type": "markdown", + "id": "963cf67f", + "metadata": {}, + "source": [ + "[Learn more about PySPark UDFs](https://bit.ly/3TEYPHh)." + ] } ], "metadata": { @@ -1005,7 +1173,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.11.4" + "version": "3.11.6" } }, "nbformat": 4,