GitBucket
4.21.2
Toggle navigation
Snippets
Sign in
Files
Branches
2
Releases
Issues
3
Pull requests
Labels
Priorities
Milestones
Wiki
Forks
nigel.stanger
/
docker-analytics
Browse code
Fixed typo
master
spark3
1 parent
8474330
commit
cc6da412c61c8a9f7c932b143b076f789166201f
Nigel Stanger
authored
on 17 May 2019
Patch
Showing
1 changed file
examples/sample_consumer.ipynb
Ignore Space
Show notes
View
examples/sample_consumer.ipynb
{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Basic data streaming using Apache Spark and Apache Kafka\n", "\n", "This is a basic “hello world” demonstration of Apache Structured Streaming using an Apache Kafka data source. The script `sample_producer.py` repeatedly sends the string “Hello” plus a sequence number to the `sample` topic in Kafka. This notebook subscribes to that topic and displays the results of the query.\n", "\n", "See [Structured Streaming Programming Guide](https://spark.apache.org/docs/2.2.0/structured-streaming-programming-guide.html) and [Structured Streaming + Kafka Integration Guide](https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html).\n", "\n", "Subscribe to the stream:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "df = (\n", " spark.readStream\n", " .format(\"kafka\")\n", " .option(\"kafka.bootstrap.servers\", \"kafka:9092\")\n", " .option(\"subscribe\", \"sample\")\n", " .option(\"startingOffsets\", \"earliest\")\n", " .load()\n", ")\n", "df" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Start a query on the stream." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "df2 = df.selectExpr(\"CAST(key AS STRING)\", \"CAST(value AS STRING)\")\n", "q = df.writeStream \\\n", " .queryName(\"stream\") \\\n", " .format(\"memory\") \\\n", " .start()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Loop for a while (`terminate`), displaying the content of the query every few seconds (`refresh`)." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import time\n", "from datetime import datetime, timedelta\n", "\n", "terminate = datetime.now() + timedelta(seconds=30)\n", "refresh = 5\n", "\n", "while datetime.now() < terminate:\n", " # print(q.lastProgress)\n", " print(spark.sql(\"select * from stream\").collect())\n", " time.sleep(refresh)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Query monitoring and debugging." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print(q.isActive)\n", "print(q.name)\n", "print(q.id)\n", "# spark.streams.get(q.id).stop()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Stop the query when done." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "q.stop()" ] } ], "metadata": { "kernelspec": { "display_name": "PySpark", "language": "python", "name": "pyspark" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.8" } }, "nbformat": 4, "nbformat_minor": 2 }
{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Basic data streaming using Apache Spark and Apache Kafka\n", "\n", "This is a basic “hello world” demonstration of Apache Structured Streaming using an Apach Kafka data source. The script `sample_producer.py` repeatedly sends the string “Hello” plus a sequence number to the `sample` topic in Kafka. This notebook subscribes to that topic and displays the results of the query.\n", "\n", "See [Structured Streaming Programming Guide](https://spark.apache.org/docs/2.2.0/structured-streaming-programming-guide.html) and [Structured Streaming + Kafka Integration Guide](https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html).\n", "\n", "Subscribe to the stream:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "df = (\n", " spark.readStream\n", " .format(\"kafka\")\n", " .option(\"kafka.bootstrap.servers\", \"kafka:9092\")\n", " .option(\"subscribe\", \"sample\")\n", " .option(\"startingOffsets\", \"earliest\")\n", " .load()\n", ")\n", "df" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Start a query on the stream." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "df2 = df.selectExpr(\"CAST(key AS STRING)\", \"CAST(value AS STRING)\")\n", "q = df.writeStream \\\n", " .queryName(\"stream\") \\\n", " .format(\"memory\") \\\n", " .start()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Loop for a while (`terminate`), displaying the content of the query every few seconds (`refresh`)." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import time\n", "from datetime import datetime, timedelta\n", "\n", "terminate = datetime.now() + timedelta(seconds=30)\n", "refresh = 5\n", "\n", "while datetime.now() < terminate:\n", " # print(q.lastProgress)\n", " print(spark.sql(\"select * from stream\").collect())\n", " time.sleep(refresh)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Query monitoring and debugging." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print(q.isActive)\n", "print(q.name)\n", "print(q.id)\n", "# spark.streams.get(q.id).stop()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Stop the query when done." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "q.stop()" ] } ], "metadata": { "kernelspec": { "display_name": "PySpark", "language": "python", "name": "pyspark" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.8" } }, "nbformat": 4, "nbformat_minor": 2 }
Show line notes below