GitBucket
4.21.2
Toggle navigation
Snippets
Sign in
Files
Branches
2
Releases
Issues
3
Pull requests
Labels
Priorities
Milestones
Wiki
Forks
nigel.stanger
/
docker-analytics
Browse code
Added examples
master
spark3
1 parent
6f764a4
commit
847433051311aa30b2b59d04c5e787981b369076
Nigel Stanger
authored
on 17 May 2019
Patch
Showing
4 changed files
examples/clickstream_consumer.ipynb
examples/clickstream_producer.py
examples/sample_consumer.ipynb
examples/sample_producer.py
Ignore Space
Show notes
View
examples/clickstream_consumer.ipynb
0 → 100644
{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Clickstream analysis in Python using Apache Spark and Apache Kafka\n", "\n", "This is a clickstream processing demo using Apache Kafka and Spark Structured Streaming. It’s based on the original Scala version described at https://github.com/IBM/kafka-streaming-click-analysis/blob/master/README.md. The clickstream data is from Wikipedia, and is streamed line-by-line into the Kafka topic `clickstream` by the script `kafka_producer.py`. Each line comprises four tab-separated values: the previous page visited (`prev`), the current page (`curr`), the type of page (`type`), and the number of clicks for that navigation path (`n`).\n", "\n", "The following block subscribes to the `clickstream` topic, extracts the columns from the received records, and starts an in memory query to summarise the most popular destination pages in terms of clicks." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import pyspark.sql.functions as psf\n", "\n", "records = (\n", " spark.readStream\n", " .format(\"kafka\")\n", " .option(\"kafka.bootstrap.servers\", \"kafka:9092\")\n", " .option(\"subscribe\", \"clickstream\")\n", " .option(\"startingOffsets\", \"latest\")\n", " .option(\"failOnDataLoss\", \"false\")\n", " .load()\n", ")\n", "\n", "messages = (\n", " records.withColumn(\"prev\", psf.split(records.value, \"\\t\")[0])\n", " .withColumn(\"curr\", psf.split(records.value, \"\\t\")[1])\n", " .withColumn(\"type\", psf.split(records.value, \"\\t\")[2])\n", " .withColumn(\"n\", psf.split(records.value, \"\\t\")[3])\n", " .groupBy(\"curr\")\n", " .agg(psf.sum(\"n\").alias(\"num_clicks\"))\n", " .orderBy(\"num_clicks\", ascending=False)\n", ")\n", "\n", "query = (\n", " messages.writeStream\n", " .queryName(\"clicks\")\n", " .format(\"memory\")\n", " .outputMode(\"complete\")\n", " .start()\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The following block loops for a while (`terminate`), printing out the results of the query every few seconds (`refresh`). Ideally, this should grab the query results and do something useful, like plot them." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import time\n", "from datetime import datetime, timedelta\n", "\n", "terminate = datetime.now() + timedelta(seconds=120)\n", "refresh = 5\n", "\n", "while datetime.now() < terminate:\n", " result = spark.sql(\"select * from clicks\")\n", " result.show()\n", " print(\"==========\")\n", " time.sleep(refresh)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Query monitoring and debugging." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print(query.isActive)\n", "print(query.name)\n", "print(query.id)\n", "# spark.streams.get(query.id).stop()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Terminate the query when done." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "query.stop()" ] } ], "metadata": { "kernelspec": { "display_name": "PySpark", "language": "python", "name": "pyspark" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.8" } }, "nbformat": 4, "nbformat_minor": 2 }
Ignore Space
Show notes
View
examples/clickstream_producer.py
0 → 100755
#!/usr/local/bin/python import time from datetime import datetime, timedelta terminate = datetime.now() + timedelta(seconds=120) refresh = 0 from kafka import KafkaProducer producer = KafkaProducer(bootstrap_servers='kafka:9092') with open("2017_01_en_clickstream.tsv", "r") as clicks: count = 0 for line in clicks: if count > 0: producer.send("clickstream", bytes(line, encoding="utf-8")) count += 1 if count % 10000 == 0: print(count) time.sleep(refresh)
Ignore Space
Show notes
View
examples/sample_consumer.ipynb
0 → 100644
{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Basic data streaming using Apache Spark and Apache Kafka\n", "\n", "This is a basic “hello world” demonstration of Apache Structured Streaming using an Apach Kafka data source. The script `sample_producer.py` repeatedly sends the string “Hello” plus a sequence number to the `sample` topic in Kafka. This notebook subscribes to that topic and displays the results of the query.\n", "\n", "See [Structured Streaming Programming Guide](https://spark.apache.org/docs/2.2.0/structured-streaming-programming-guide.html) and [Structured Streaming + Kafka Integration Guide](https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html).\n", "\n", "Subscribe to the stream:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "df = (\n", " spark.readStream\n", " .format(\"kafka\")\n", " .option(\"kafka.bootstrap.servers\", \"kafka:9092\")\n", " .option(\"subscribe\", \"sample\")\n", " .option(\"startingOffsets\", \"earliest\")\n", " .load()\n", ")\n", "df" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Start a query on the stream." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "df2 = df.selectExpr(\"CAST(key AS STRING)\", \"CAST(value AS STRING)\")\n", "q = df.writeStream \\\n", " .queryName(\"stream\") \\\n", " .format(\"memory\") \\\n", " .start()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Loop for a while (`terminate`), displaying the content of the query every few seconds (`refresh`)." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import time\n", "from datetime import datetime, timedelta\n", "\n", "terminate = datetime.now() + timedelta(seconds=30)\n", "refresh = 5\n", "\n", "while datetime.now() < terminate:\n", " # print(q.lastProgress)\n", " print(spark.sql(\"select * from stream\").collect())\n", " time.sleep(refresh)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Query monitoring and debugging." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print(q.isActive)\n", "print(q.name)\n", "print(q.id)\n", "# spark.streams.get(q.id).stop()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Stop the query when done." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "q.stop()" ] } ], "metadata": { "kernelspec": { "display_name": "PySpark", "language": "python", "name": "pyspark" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.8" } }, "nbformat": 4, "nbformat_minor": 2 }
Ignore Space
Show notes
View
examples/sample_producer.py
0 → 100755
#!/usr/local/bin/python import time from datetime import datetime, timedelta terminate = datetime.now() + timedelta(seconds=20) from kafka import KafkaProducer producer = KafkaProducer(bootstrap_servers='kafka:9092') count = 1 while True: #datetime.now() < terminate: text = "Hello {c}".format(c=count) print(text) producer.send("sample", bytes(text, encoding="utf-8")) # producer.send("sample", b"foobar") time.sleep(0.5) count += 1
Show line notes below