diff --git a/examples/clickstream_consumer.ipynb b/examples/clickstream_consumer.ipynb new file mode 100644 index 0000000..e81fe6a --- /dev/null +++ b/examples/clickstream_consumer.ipynb @@ -0,0 +1,134 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Clickstream analysis in Python using Apache Spark and Apache Kafka\n", + "\n", + "This is a clickstream processing demo using Apache Kafka and Spark Structured Streaming. It’s based on the original Scala version described at https://github.com/IBM/kafka-streaming-click-analysis/blob/master/README.md. The clickstream data is from Wikipedia, and is streamed line-by-line into the Kafka topic `clickstream` by the script `kafka_producer.py`. Each line comprises four tab-separated values: the previous page visited (`prev`), the current page (`curr`), the type of page (`type`), and the number of clicks for that navigation path (`n`).\n", + "\n", + "The following block subscribes to the `clickstream` topic, extracts the columns from the received records, and starts an in memory query to summarise the most popular destination pages in terms of clicks." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import pyspark.sql.functions as psf\n", + "\n", + "records = (\n", + " spark.readStream\n", + " .format(\"kafka\")\n", + " .option(\"kafka.bootstrap.servers\", \"kafka:9092\")\n", + " .option(\"subscribe\", \"clickstream\")\n", + " .option(\"startingOffsets\", \"latest\")\n", + " .option(\"failOnDataLoss\", \"false\")\n", + " .load()\n", + ")\n", + "\n", + "messages = (\n", + " records.withColumn(\"prev\", psf.split(records.value, \"\\t\")[0])\n", + " .withColumn(\"curr\", psf.split(records.value, \"\\t\")[1])\n", + " .withColumn(\"type\", psf.split(records.value, \"\\t\")[2])\n", + " .withColumn(\"n\", psf.split(records.value, \"\\t\")[3])\n", + " .groupBy(\"curr\")\n", + " .agg(psf.sum(\"n\").alias(\"num_clicks\"))\n", + " .orderBy(\"num_clicks\", ascending=False)\n", + ")\n", + "\n", + "query = (\n", + " messages.writeStream\n", + " .queryName(\"clicks\")\n", + " .format(\"memory\")\n", + " .outputMode(\"complete\")\n", + " .start()\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "The following block loops for a while (`terminate`), printing out the results of the query every few seconds (`refresh`). Ideally, this should grab the query results and do something useful, like plot them." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "import time\n", + "from datetime import datetime, timedelta\n", + "\n", + "terminate = datetime.now() + timedelta(seconds=120)\n", + "refresh = 5\n", + "\n", + "while datetime.now() < terminate:\n", + " result = spark.sql(\"select * from clicks\")\n", + " result.show()\n", + " print(\"==========\")\n", + " time.sleep(refresh)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Query monitoring and debugging." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "print(query.isActive)\n", + "print(query.name)\n", + "print(query.id)\n", + "# spark.streams.get(query.id).stop()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Terminate the query when done." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "query.stop()" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "PySpark", + "language": "python", + "name": "pyspark" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.6.8" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +}