# Clickstream analysis in Python using Apache Spark and Apache Kafka

This is a clickstream processing demo using Apache Kafka and Spark Structured Streaming. Itâ€™s based on the original Scala version described at https://github.com/IBM/kafka-streaming-click-analysis/blob/master/README.md. The clickstream data is from Wikipedia, and is streamed line-by-line into the Kafka topic `clickstream` by the script `kafka_producer.py`. Each line comprises four tab-separated values: the previous page visited (`prev`), the current page (`curr`), the type of page (`type`), and the number of clicks for that navigation path (`n`).

The following block subscribes to the `clickstream` topic, extracts the columns from the received records, and starts an in memory query to summarise the most popular destination pages in terms of clicks.

In [None]:
import pyspark.sql.functions as psf

records = (
    spark.readStream
        .format("kafka")
        .option("kafka.bootstrap.servers", "kafka:9092")
        .option("subscribe", "clickstream")
        .option("startingOffsets", "latest")
        .option("failOnDataLoss", "false")
        .load()
)

messages = (
    records.withColumn("prev", psf.split(records.value, "\t")[0])
        .withColumn("curr", psf.split(records.value, "\t")[1])
        .withColumn("type", psf.split(records.value, "\t")[2])
        .withColumn("n", psf.split(records.value, "\t")[3])
        .groupBy("curr")
        .agg(psf.sum("n").alias("num_clicks"))
        .orderBy("num_clicks", ascending=False)
)

query = (
    messages.writeStream
        .queryName("clicks")
        .format("memory")
        .outputMode("complete")
        .start()
)

The following block loops for a while (`terminate`), printing out the results of the query every few seconds (`refresh`). Ideally, this should grab the query results and do something useful, like plot them.

In [None]:
import time
from datetime import datetime, timedelta

terminate = datetime.now() + timedelta(seconds=120)
refresh = 5

while datetime.now() < terminate:
    result = spark.sql("select * from clicks")
    result.show()
    print("==========")
    time.sleep(refresh)

Query monitoring and debugging.

In [None]:
print(query.isActive)
print(query.name)
print(query.id)
# spark.streams.get(query.id).stop()

Terminate the query when done.

In [None]:
query.stop()