Newer
Older
docker-analytics / examples / sample_consumer.ipynb
Nigel Stanger on 17 May 2019 3 KB Fixed typo
{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Basic data streaming using Apache Spark and Apache Kafka\n",
    "\n",
    "This is a basic “hello world” demonstration of Apache Structured Streaming using an Apache Kafka data source. The script `sample_producer.py` repeatedly sends the string “Hello” plus a sequence number to the `sample` topic in Kafka. This notebook subscribes to that topic and displays the results of the query.\n",
    "\n",
    "See [Structured Streaming Programming Guide](https://spark.apache.org/docs/2.2.0/structured-streaming-programming-guide.html) and [Structured Streaming + Kafka Integration Guide](https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html).\n",
    "\n",
    "Subscribe to the stream:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "df = (\n",
    "    spark.readStream\n",
    "        .format(\"kafka\")\n",
    "        .option(\"kafka.bootstrap.servers\", \"kafka:9092\")\n",
    "        .option(\"subscribe\", \"sample\")\n",
    "        .option(\"startingOffsets\", \"earliest\")\n",
    "        .load()\n",
    ")\n",
    "df"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Start a query on the stream."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "df2 = df.selectExpr(\"CAST(key AS STRING)\", \"CAST(value AS STRING)\")\n",
    "q = df.writeStream \\\n",
    "    .queryName(\"stream\") \\\n",
    "    .format(\"memory\") \\\n",
    "    .start()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Loop for a while (`terminate`), displaying the content of the query every few seconds (`refresh`)."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import time\n",
    "from datetime import datetime, timedelta\n",
    "\n",
    "terminate = datetime.now() + timedelta(seconds=30)\n",
    "refresh = 5\n",
    "\n",
    "while datetime.now() < terminate:\n",
    "    # print(q.lastProgress)\n",
    "    print(spark.sql(\"select * from stream\").collect())\n",
    "    time.sleep(refresh)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Query monitoring and debugging."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "print(q.isActive)\n",
    "print(q.name)\n",
    "print(q.id)\n",
    "# spark.streams.get(q.id).stop()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Stop the query when done."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "q.stop()"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "PySpark",
   "language": "python",
   "name": "pyspark"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.8"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}