{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Clickstream analysis in Python using Apache Spark and Apache Kafka\n", "\n", "This is a clickstream processing demo using Apache Kafka and Spark Structured Streaming. It’s based on the original Scala version described at https://github.com/IBM/kafka-streaming-click-analysis/blob/master/README.md. The clickstream data is from Wikipedia, and is streamed line-by-line into the Kafka topic `clickstream` by the script `kafka_producer.py`. Each line comprises four tab-separated values: the previous page visited (`prev`), the current page (`curr`), the type of page (`type`), and the number of clicks for that navigation path (`n`).\n", "\n", "The following block subscribes to the `clickstream` topic, extracts the columns from the received records, and starts an in memory query to summarise the most popular destination pages in terms of clicks." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import pyspark.sql.functions as psf\n", "\n", "records = (\n", " spark.readStream\n", " .format(\"kafka\")\n", " .option(\"kafka.bootstrap.servers\", \"kafka:9092\")\n", " .option(\"subscribe\", \"clickstream\")\n", " .option(\"startingOffsets\", \"latest\")\n", " .option(\"failOnDataLoss\", \"false\")\n", " .load()\n", ")\n", "\n", "messages = (\n", " records.withColumn(\"prev\", psf.split(records.value, \"\\t\")[0])\n", " .withColumn(\"curr\", psf.split(records.value, \"\\t\")[1])\n", " .withColumn(\"type\", psf.split(records.value, \"\\t\")[2])\n", " .withColumn(\"n\", psf.split(records.value, \"\\t\")[3])\n", " .groupBy(\"curr\")\n", " .agg(psf.sum(\"n\").alias(\"num_clicks\"))\n", " .orderBy(\"num_clicks\", ascending=False)\n", ")\n", "\n", "query = (\n", " messages.writeStream\n", " .queryName(\"clicks\")\n", " .format(\"memory\")\n", " .outputMode(\"complete\")\n", " .start()\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The following block loops for a while (`terminate`), printing out the results of the query every few seconds (`refresh`). Ideally, this should grab the query results and do something useful, like plot them." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import time\n", "from datetime import datetime, timedelta\n", "\n", "terminate = datetime.now() + timedelta(seconds=120)\n", "refresh = 5\n", "\n", "while datetime.now() < terminate:\n", " result = spark.sql(\"select * from clicks\")\n", " result.show()\n", " print(\"==========\")\n", " time.sleep(refresh)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Query monitoring and debugging." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print(query.isActive)\n", "print(query.name)\n", "print(query.id)\n", "# spark.streams.get(query.id).stop()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Terminate the query when done." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "query.stop()" ] } ], "metadata": { "kernelspec": { "display_name": "PySpark", "language": "python", "name": "pyspark" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.8" } }, "nbformat": 4, "nbformat_minor": 2 }