+44 (0)20 3051 3595 | info@coeo.com | Client portal login

Databricks Structured Streaming - Part 3 (Creating the Stream)

Andy Mitchell

In Part 1 of this series we created a Databricks Community Account and cluster.

In Part 2 we captured some streaming data to a file on our Databricks cluster.

In Part 3 we will look at interacting with the data using structured streaming.

  1. Using the Databricks cluster created in the previous post

       
  2. Navigate to "Introduction to Databricks Structured Streaming" where we should have the notebook from Part 2.

        
  3. Create a Python notebook called "Part 3 (Creating the stream)"

        
  4. Enter the following code into the cmd pane of the notebook. These are the variables that we will use for the schema for the json data and the location where we stored the files.

       
  5. The Schema for the json can be found on the website Seattle Real Time Fire 911 Calls, however this does not contain all of the columns that are in the api. This is then converted to a Pyspark Struct, the following information may be of use Microsoft.Spark.Sql.Types Namespace
        
  6. Get the URL for the data stream and paste it where <URL> is above, this can be found from the API button on the following web page: https://data.seattle.gov/Public-Safety/Seattle-Real-Time-Fire-911-Calls/upug-ckch
        
  7. Add another cmd pane by clicking on the + that appears below the middle of the previous one and add the following code. This reads the files from the file location (inputPath) using the schema (jsonSchema)
     
       
  8. Run both of the panes above click the at the top of the notebook or run each cell individually using the play button in the top right. The second pane should return

       
  9. Click on the arrow to expand the results and you will see the following:

       
  10. This shows that a DataFrame has been created but we cannot see any content.
       
  11. Add another cmd pane and add the following code, then run the cmd

       
  12. The first 1000 lines of the results are shown

       
  13. We have some data and now we would like see what the most frequent call type is, for this we need to aggregate the data by Type.
       
  14. Add another cmd cell and call it "Static aggregation", and add the following code:
    L22-03-02_1-Code
       
  15. Run the code above to generate the dataframe "StaticCountsDF"
       
  16. Note the last line of the code registers a Temporary view called "static_counts", this can be used in SQL queries external to the dataframe
       
  17. Add another cmd cell and call it "Aggregation results" and add the following code. Note "%sql" switches the cell to contain SQL code rather than Python
    L22-03-02_2-AggregationResults
       
  18. Run the cell and look at the results
    L22-02-01-results
       
  19. Switch to "Bar Chart" L03-04-08-Visualize and observe the frequency of each type of call
    L22-02-02-chartresults
       
  20. The Data we have displayed above is a static representation of the data that we have saved in the folder. However what we really need is the data automatically loaded
       
  21. Add a new cmd cell and name it "Load the Streaming Dataframe" and add the following code
    L22-03-02_3-LoadTheStreamingDataframe
       
  22. Notice the differences between the previous code and this code:

    1. readStream rather than read converts the static read to a streaming read

    2. .option("maxFilesPerTrigger", 1) treats each file as a separate batch (microbatch)

       
  23. Run the above cmd
       
  24. Is this a streaming dataset? how can we tell?
        
  25. In a new cmd window called "Is this a stream" enter the following code
    L22-03-02_4-IsHhisAStreamingDataframe
       
  26. Run the cmd to check if the "StreamingCountsDF" is a streaming dataframe
       
  27. So this is a streaming DataFrame but as yet we are not streaming the data, add another cmd called "Stream to memory" and add the following code
    L22-03-02_5-StreamtoMemory
       
  28. Run the above cmd and you will see the following
    L22-03-04-StreamingDFinit
       
  29. Wait for the Streaming to initialise and you will see the following
    L22-03-03-StreamingDFResults
       
  30. Expand the section next to "Counts" and you will see the dashboard
    L22-03-05-StreamingDFDash
       
  31. Switch to the notebook in created in the previous post and run the first 3 cmd cells

    1. Set some variables

    2. Function to save the api data to a file

    3. Test the function

          
  32. Switch back to the notebook for this post and you will see that the dashboard has changed
    L22-03-07-StreamingDFDash2-1
      
  33. Wait 5 minutes, then repeat the 2 steps above
    L22-03-08-StreamingDFDash3
      
  34. You will see that the data has already updated
       
  35. Above we set the aggregation window to be 1 hour we can view the aggregated data using the following:
    L22-03-02_6-SQLQuery
       
  36. Which shows the following results:
    L22-03-09-StreamingDFAggregate
       
  37. Note that the time is in 1 hour interval, here we are using the window.end where the event falls within the window.

In this post we have:

  • Created a static dataset
  • Created an aggregation on the dataset
  • Converted the dataset from static to streamed.

In the next post we will further explore structured streaming and additional options that are available.

Get more Databricks advice

Subscribe to Email Updates

Back to top