Skip to content

Commit

Permalink
add spark udf
Browse files Browse the repository at this point in the history
  • Loading branch information
khuyentran1401 committed Mar 30, 2024
1 parent b80b52f commit ee5bf96
Showing 1 changed file with 183 additions and 15 deletions.
198 changes: 183 additions & 15 deletions Chapter5/spark.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
},
{
"cell_type": "code",
"execution_count": 37,
"execution_count": 3,
"id": "c7e8e068",
"metadata": {
"tags": [
Expand Down Expand Up @@ -76,7 +76,7 @@
},
{
"cell_type": "code",
"execution_count": 35,
"execution_count": 4,
"id": "be8b69c8",
"metadata": {},
"outputs": [],
Expand All @@ -88,22 +88,29 @@
},
{
"cell_type": "code",
"execution_count": 41,
"execution_count": 5,
"id": "58d04cb6",
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
" \r"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"+---+----+----+\n",
"|cat|val1|val2|\n",
"+---+----+----+\n",
"| c| 56| 60|\n",
"| b| 23| 65|\n",
"| b| 39| 8|\n",
"| a| 77| 39|\n",
"| c| 17| 66|\n",
"| b| 0| 34|\n",
"| a| 58| 12|\n",
"| c| 24| 72|\n",
"| a| 20| 58|\n",
"| b| 13| 17|\n",
"+---+----+----+\n",
"only showing top 5 rows\n",
"\n"
Expand All @@ -117,7 +124,7 @@
},
{
"cell_type": "code",
"execution_count": null,
"execution_count": 6,
"id": "01181341",
"metadata": {},
"outputs": [],
Expand All @@ -127,7 +134,7 @@
},
{
"cell_type": "code",
"execution_count": 40,
"execution_count": 7,
"id": "305ddc35",
"metadata": {},
"outputs": [
Expand All @@ -141,12 +148,12 @@
{
"data": {
"text/plain": [
"[Row(cat='c', avg(val2)=49.56976095832509),\n",
" Row(cat='b', avg(val2)=49.43383865728208),\n",
" Row(cat='a', avg(val2)=49.68113195098398)]"
"[Row(cat='c', avg(val2)=49.54095055783208),\n",
" Row(cat='b', avg(val2)=49.46593810642427),\n",
" Row(cat='a', avg(val2)=49.52092805080465)]"
]
},
"execution_count": 40,
"execution_count": 7,
"metadata": {},
"output_type": "execute_result"
}
Expand Down Expand Up @@ -987,6 +994,167 @@
"source": [
"[View other array functions](https://bit.ly/4c0txD1)."
]
},
{
"cell_type": "markdown",
"id": "278c6d76",
"metadata": {},
"source": [
"### Simplify Complex SQL Queries with PySpark UDFs"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "f75b6174",
"metadata": {
"tags": [
"hide-cell"
]
},
"outputs": [],
"source": [
"!pip install \"pyspark[sql]\""
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "0f909f3d",
"metadata": {
"tags": [
"hide-cell"
]
},
"outputs": [],
"source": [
"from pyspark.sql import SparkSession\n",
"\n",
"spark = SparkSession.builder.getOrCreate()"
]
},
{
"cell_type": "markdown",
"id": "08b4e6b5",
"metadata": {},
"source": [
"SQL queries can often become complex and challenging to comprehend."
]
},
{
"cell_type": "code",
"execution_count": 8,
"id": "4b733f5f",
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
" \r"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"+---+-------------+\n",
"| id|modified_name|\n",
"+---+-------------+\n",
"| 1| John doe|\n",
"| 2| Jane smith|\n",
"| 3| Bob johnson|\n",
"+---+-------------+\n",
"\n"
]
}
],
"source": [
"df = spark.createDataFrame(\n",
" [(1, \"John Doe\"), (2, \"Jane Smith\"), (3, \"Bob Johnson\")], [\"id\", \"name\"]\n",
")\n",
"\n",
"# Register the DataFrame as a temporary table or view\n",
"df.createOrReplaceTempView(\"df\")\n",
"\n",
"# Complex SQL query\n",
"spark.sql(\n",
" \"\"\"\n",
" SELECT id, CONCAT(UPPER(SUBSTRING(name, 1, 1)), LOWER(SUBSTRING(name, 2))) AS modified_name\n",
" FROM df\n",
"\"\"\"\n",
").show()"
]
},
{
"cell_type": "markdown",
"id": "8056521a",
"metadata": {},
"source": [
"Using PySpark UDFs simplifies complex SQL queries by encapsulating complex operations into a single function call, resulting in cleaner queries. UDFs also allow for the reuse of complex logic across different queries. \n",
"\n",
"In the code example below, we define a UDF called `modify_name` that converts the name to uppercase."
]
},
{
"cell_type": "code",
"execution_count": 13,
"id": "59804345",
"metadata": {},
"outputs": [
{
"name": "stderr",
"output_type": "stream",
"text": [
"24/03/30 14:36:24 WARN SimpleFunctionRegistry: The function modify_name replaced a previously registered function.\n",
" \r"
]
},
{
"name": "stdout",
"output_type": "stream",
"text": [
"+---+-------------+\n",
"| id|modified_name|\n",
"+---+-------------+\n",
"| 1| John doe|\n",
"| 2| Jane smith|\n",
"| 3| Bob johnson|\n",
"+---+-------------+\n",
"\n"
]
}
],
"source": [
"from pyspark.sql.functions import udf\n",
"from pyspark.sql.types import StringType\n",
"\n",
"\n",
"# Define a UDF to modify the name\n",
"@udf(returnType=StringType())\n",
"def modify_name(name):\n",
" return name[0].upper() + name[1:].lower()\n",
"\n",
"spark.udf.register('modify_name', modify_name)\n",
"\n",
"# Apply the UDF in the spark.sql query\n",
"df.createOrReplaceTempView(\"df\")\n",
"\n",
"spark.sql(\n",
" \"\"\"\n",
" SELECT id, modify_name(name) AS modified_name\n",
" FROM df\n",
"\"\"\"\n",
").show()"
]
},
{
"cell_type": "markdown",
"id": "963cf67f",
"metadata": {},
"source": [
"[Learn more about PySPark UDFs](https://bit.ly/3TEYPHh)."
]
}
],
"metadata": {
Expand All @@ -1005,7 +1173,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.11.4"
"version": "3.11.6"
}
},
"nbformat": 4,
Expand Down

0 comments on commit ee5bf96

Please sign in to comment.