docker-analytics / examples / sample_consumer.ipynb
 "cells": [
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Basic data streaming using Apache Spark and Apache Kafka\n",
    "This is a basic “hello world” demonstration of Apache Structured Streaming using an Apach Kafka data source. The script `` repeatedly sends the string “Hello” plus a sequence number to the `sample` topic in Kafka. This notebook subscribes to that topic and displays the results of the query.\n",
    "See [Structured Streaming Programming Guide]( and [Structured Streaming + Kafka Integration Guide](\n",
    "Subscribe to the stream:"
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "df = (\n",
    "    spark.readStream\n",
    "        .format(\"kafka\")\n",
    "        .option(\"kafka.bootstrap.servers\", \"kafka:9092\")\n",
    "        .option(\"subscribe\", \"sample\")\n",
    "        .option(\"startingOffsets\", \"earliest\")\n",
    "        .load()\n",
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Start a query on the stream."
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "df2 = df.selectExpr(\"CAST(key AS STRING)\", \"CAST(value AS STRING)\")\n",
    "q = df.writeStream \\\n",
    "    .queryName(\"stream\") \\\n",
    "    .format(\"memory\") \\\n",
    "    .start()"
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Loop for a while (`terminate`), displaying the content of the query every few seconds (`refresh`)."
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import time\n",
    "from datetime import datetime, timedelta\n",
    "terminate = + timedelta(seconds=30)\n",
    "refresh = 5\n",
    "while < terminate:\n",
    "    # print(q.lastProgress)\n",
    "    print(spark.sql(\"select * from stream\").collect())\n",
    "    time.sleep(refresh)"
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Query monitoring and debugging."
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# spark.streams.get("
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Stop the query when done."
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
 "metadata": {
  "kernelspec": {
   "display_name": "PySpark",
   "language": "python",
   "name": "pyspark"
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.8"
 "nbformat": 4,
 "nbformat_minor": 2