Newer
Older
docker-analytics / examples / clickstream_consumer.ipynb
{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Clickstream analysis in Python using Apache Spark and Apache Kafka\n",
    "\n",
    "This is a clickstream processing demo using Apache Kafka and Spark Structured Streaming. It’s based on the original Scala version described at https://github.com/IBM/kafka-streaming-click-analysis/blob/master/README.md. The clickstream data is from Wikipedia, and is streamed line-by-line into the Kafka topic `clickstream` by the script `kafka_producer.py`. Each line comprises four tab-separated values: the previous page visited (`prev`), the current page (`curr`), the type of page (`type`), and the number of clicks for that navigation path (`n`).\n",
    "\n",
    "The following block subscribes to the `clickstream` topic, extracts the columns from the received records, and starts an in memory query to summarise the most popular destination pages in terms of clicks."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import pyspark.sql.functions as psf\n",
    "\n",
    "records = (\n",
    "    spark.readStream\n",
    "        .format(\"kafka\")\n",
    "        .option(\"kafka.bootstrap.servers\", \"kafka:9092\")\n",
    "        .option(\"subscribe\", \"clickstream\")\n",
    "        .option(\"startingOffsets\", \"latest\")\n",
    "        .option(\"failOnDataLoss\", \"false\")\n",
    "        .load()\n",
    ")\n",
    "\n",
    "messages = (\n",
    "    records.withColumn(\"prev\", psf.split(records.value, \"\\t\")[0])\n",
    "        .withColumn(\"curr\", psf.split(records.value, \"\\t\")[1])\n",
    "        .withColumn(\"type\", psf.split(records.value, \"\\t\")[2])\n",
    "        .withColumn(\"n\", psf.split(records.value, \"\\t\")[3])\n",
    "        .groupBy(\"curr\")\n",
    "        .agg(psf.sum(\"n\").alias(\"num_clicks\"))\n",
    "        .orderBy(\"num_clicks\", ascending=False)\n",
    ")\n",
    "\n",
    "query = (\n",
    "    messages.writeStream\n",
    "        .queryName(\"clicks\")\n",
    "        .format(\"memory\")\n",
    "        .outputMode(\"complete\")\n",
    "        .start()\n",
    ")"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "The following block loops for a while (`terminate`), printing out the results of the query every few seconds (`refresh`). Ideally, this should grab the query results and do something useful, like plot them."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import time\n",
    "from datetime import datetime, timedelta\n",
    "\n",
    "terminate = datetime.now() + timedelta(seconds=120)\n",
    "refresh = 5\n",
    "\n",
    "while datetime.now() < terminate:\n",
    "    result = spark.sql(\"select * from clicks\")\n",
    "    result.show()\n",
    "    print(\"==========\")\n",
    "    time.sleep(refresh)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Query monitoring and debugging."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "print(query.isActive)\n",
    "print(query.name)\n",
    "print(query.id)\n",
    "# spark.streams.get(query.id).stop()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Terminate the query when done."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "query.stop()"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "PySpark",
   "language": "python",
   "name": "pyspark"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.8"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}