# Basic data streaming using Apache Spark and Apache Kafka

This is a basic “hello world” demonstration of Apache Structured Streaming using an Apach Kafka data source. The script `sample_producer.py` repeatedly sends the string “Hello” plus a sequence number to the `sample` topic in Kafka. This notebook subscribes to that topic and displays the results of the query.

See [Structured Streaming Programming Guide](https://spark.apache.org/docs/2.2.0/structured-streaming-programming-guide.html) and [Structured Streaming + Kafka Integration Guide](https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html).

Subscribe to the stream:

In [None]:
df = (
    spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", "kafka:9092")
        .option("subscribe", "sample")
        .option("startingOffsets", "earliest")
        .load()
)
df

Start a query on the stream.

In [None]:
df2 = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
q = df.writeStream \
    .queryName("stream") \
    .format("memory") \
    .start()

Loop for a while (`terminate`), displaying the content of the query every few seconds (`refresh`).

In [None]:
import time
from datetime import datetime, timedelta

terminate = datetime.now() + timedelta(seconds=30)
refresh = 5

while datetime.now() < terminate:
    # print(q.lastProgress)
    print(spark.sql("select * from stream").collect())
    time.sleep(refresh)

Query monitoring and debugging.

In [None]:
print(q.isActive)
print(q.name)
print(q.id)
# spark.streams.get(q.id).stop()

Stop the query when done.

In [None]:
q.stop()