Create an End to End Streaming Pipeline with Kafka, Docker, Confluent, Mage, BigQuery and dbt

56 minute read

Table of Contents

Goal of the Project and Explanation

This guide aims to achieve two goals: stream data in real-time from UK’s Companies House and integrate it with the entire dataset as new data becomes available over time.

The UK Companies House is an executive agency of the UK Government, responsible for incorporating and dissolving limited companies, registering company information, and making this information available to the public.

Here is a simplified architecture of the project:

linearly separable data

Here is the technical architecture:

linearly separable data

Let’s go through the different steps.

The first task is actually being able to stream data from UK’s Companies House API:

linearly separable data

Once we are able to do this, we need to create a Kafka cluster on Confluent to serve as the intermediary for the streaming data:

linearly separable data

Afterwards, we’ll create a Kafka producer, a mechanism for streaming data to Kafka clusters:

linearly separable data

All of the previous steps involve several scripts and multiple packages. To simplify the deployment and mitigate dependency issues we’ll encapsulate all components into a Docker container. In this way, we’ll be to able to stream data to our Kafka cluster with one simple line of code, docker start container_id.

linearly separable data

Our Docker containers will be within a virtual machine, because we want to run them 24/7. If we run them locally, as soon as we shut down our computer, the containers will also stop:

linearly separable data

To retrieve the flow of data from Kafka and forward it to downstream processes, we’ll use Mage as an orchestrator. To ensure continuous operation, Mage we’ll also be encapsulated in a Docker container:

linearly separable data

In Mage, the initial block will consume data from the Kafka Cluster and pass it to a Python script:

linearly separable data

This script will conduct an initial cleanup of the data and then stream it in real-time to BigQuery:

linearly separable data

In BigQuery, initially we’ll manually upload the entire dataset from the UK’s Companies House. Afterwards, we’ll utilize dbt (data build tool) to incrementally update the dataset with streaming data as it becomes available:

linearly separable data

Finally, we can visualize the data with our BI tool of preference. However, this will not be covered in this guide.

The GitHub repository is available here.

Set-up Google Platform, a Virtual Machine and Connect to it Remotely through Visual Studio Code

Create a Google Cloud Project

The first thing that we have to do is to create a GCP project. In this project we’ll host many of the things that we’ll build, such as the application to produce streaming data, our orchestration tool, Mage, as well as our BigQuery database.

Once you reach the Google Cloud Homepage, click on Console on the top right corner.

linearly separable data

If you have never been a paying customer of Google Cloud, you should be able to enjoy a $300 free credit.

linearly separable data

Once you are in the console, click on the top left, next to the Google Cloud logo.

linearly separable data

Then, on the top right, click on NEW PROJECT

linearly separable data

Give your project a name and click on CREATE. I’ll call mine streaming-pipeline:

linearly separable data

Once your project has been created, click on SELECT PROJECT:

linearly separable data

Now, at the top left corner, you should see the name of your project:

linearly separable data

Now, let’s move to the next step, creating a virtual machine.

Create a Virtual Machine

linearly separable data

A virtual machine is essentially a computer within a computer. For instance, you can create a virtual machine on your local computer, completely separate from the physical hardware. In our case, we’re opting to create a virtual machine in the cloud. Why? Because that’s where our orchestrator, Mage, will reside. We need to host Mage on a machine that never shuts down to ensure the streaming pipeline can run continuously.

To create a virtual machine (VM), type vm instances in the search bar and click on VM instances:

linearly separable data

This will take you to a screen where you’ll see the option to enable the Compute Engine API. In Google Cloud, enabling specific services is akin to loading libraries in programming languages. In our case, we need the Compute Engine API to create and run a VM. Click on ENABLE to proceed:

linearly separable data

Once the API has finished loading, click on CREATE INSTANCE:

linearly separable data

Give your VM a name and choose the region closest to you. Remember, we’ll be creating other services later on. It’s essential to maintain consistency in the region selection. Otherwise, when different services need to interact, they may encounter issues if their data is located in different regions.

linearly separable data

Go to the Boot disk section and click on CHANGE:

linearly separable data

Select the following options and click on SELECT:

linearly separable data

Under Firewall select the following options. We need to enable these options otherwise we won’t be able to access Mage from the outside world.

linearly separable data

Now, click on CREATE:

linearly separable data

Once you VM is created, you can click on SSH to access it:

linearly separable data

If everything works properly, you should see a screen similar to this:

linearly separable data

Congratulations, you’ve successfully accessed your VM! However, working with the VM directly from the browser can be inconvenient, as it lacks the benefits of a modern IDE like Visual Studio Code. Therefore, we need to find a way to connect to our VM remotely. This is where SSH keys come in.

Create SSH Keys and Access VM Remotely

SSH keys are a mechanism that allows you to securely connect to third-party applications. When you create SSH keys, you have two of them:

  • A public SSH key, which you share with third-party applications - think of it as the lock
  • A private SSH key, which you keep on your machine - think of it as the key

In essence, on your machine, you have the key to open the lock. When you open the lock, you access the third-party application. On a side note, I think that calling SSH keys both keys is a bit misleading. I would have personally called them SSH key and SSH lock. This would have made things easier to understand.

Anyway, go to your terminal, and type the following:

 ssh-keygen -t rsa -f ~/.ssh/kafka-streaming -C arbenkqiku

In this case, kafka-streaming is the name that we give to our keys, and arbenkqiku is my username. If you don’t know your username, simply type:

whoami

Add a password to your SSH keys. This is an additional security measure to prevent unwanted access.

linearly separable data

This will create your SSH keys:

linearly separable data

To locate your SSH keys, go to the following directory:

cd ~/.ssh/

If you type the following command, you’ll see the content of your directory:

ls

You should be able to see the keys that we have just created. Please store them securily.

linearly separable data

Now, we need to take our public key, the lock, and add it to our VM.

Go to where your VM is located, and click on its name:

linearly separable data

Click on EDIT:

linearly separable data

Go to the section called Security and access, and click on ADD ITEM:

linearly separable data

Now, go back to the terminal and type:

cat kafka-streaming.pub

This will display the content of your public key, your lock.

linearly separable data

Copy the content of your public key and add it to your VM:

linearly separable data

Click on SAVE:

linearly separable data

Now, go back to where your VM is located and copy the External IP address. This basically represents the location of your VM.

linearly separable data

Go back to terminal and type:

ssh -i kafka-streaming arbenkqiku@34.65.113.154

Here is what the previous command means:

ssh -i name_of_private_key user_name@gcp_vm_instance_external_ip

With this command, we are basically saying: go to this external IP address, and use this private key to acces the VM. Once you run the command, it will ask you if you are sure you want to connect, type yes. You’ll also be prompted to add the password that we added to the SSH keys when we created them.

linearly separable data

If everything worked correctly, you should be inside your VM:

linearly separable data

Access VM Through Visual Studio Code (VS Code)

Now that we know that we can access our VM remotely, let’s do the same thing through VS code. Go to this link and download VS code.

Then, go to extensions, look for remote ssh and install the “Remote - SSH” extension:

linearly separable data

In VS code, go to the the search bar and type > and select the following option:

linearly separable data

If you already have an SSH config file, select it:

linearly separable data

Then, type the following:

Host kafka-streaming # Give a name to your host, this could be anything you want
  HostName 34.65.113.154 # Replace with the External IP address in GCP
  User arbenkqiku # Replace this with your user name
  IdentityFile /Users/arbenkqiku/.ssh/kafka-streaming # Location of your private SSH Key

Now, we still have to go back to the terminal one last time and type this:

eval $(ssh-agent)
ssh-add /Users/arbenkqiku/.ssh/kafka-streaming # Path to private SSH key

Then, type your password when prompted. This basically means that you can use your password when you try to access the VM through Visual Studio Code.

linearly separable data

Now, go back to the search bar of Visual Studio Code, type > and select the following option:

linearly separable data

Among the options, you should see the host that you just created. Click on it:

linearly separable data

This should open a new window and you should be prompted to add your password:

linearly separable data

In the new window, click on this icon at the top right to open the terminal in VS code:

linearly separable data

First of all, we can can see our user name followed by the name of the VM:

linearly separable data

In addition, if we type:

uname -r

We can see that it is a GCP (Google Cloud Platform) machine.

Congrats! We connected to the VM remotely by using VS code.

Stream from UK’s Companies House API and Produce to Kafka

Create a GitHub Repository

Let’s create a GitHub repository where we’ll store all of our code. Go to GitHub and click on NEW:

linearly separable data

Give your repo a name, select the Add a README file checkbox and click on Create repository:

linearly separable data

Now, click on Code and copy the URL of the repo:

linearly separable data

Go to the terminal of your VM in VS code and type:

git clone https://github.com/ArbenKqiku/kafka-streaming-project.git

This will clone our project locally. Now, we can make changes locally and push them to our repository.

Stream Data from UK’s Companies House

linearly separable data

So, our goal right now will be to create a python script able to stream data in real time from UK’s Companies House streaming API. Here the steps that we’ll have to follow:

Once you have created the application, you can click on it:

linearly separable data

Copy the API key and store it somewhere:

linearly separable data

In the terminal, create a directory named streaming-pipeline and access it.

linearly separable data

Now that you have created the directory, click on Open…:

linearly separable data

You should see that directory that we just created. Click on it:

linearly separable data

You’ll prompted to enter your password again. Now, let’s install the python extention. This will make it easier to code later on. Go to Extensions and install the Python extension.

linearly separable data

Now, we need to install pip in order to install python libraries. Go to the terminal and type the follwing command:

sudo apt update

The sudo apt update command is used on Debian-based Linux systems, such as Ubuntu, to update the package lists for repositories configured on the system.

Then type the following to install pip:

sudo apt install python3-pip

If you type:

pip list

You should be able to see python packages installed by default:

linearly separable data

Here is the list of packages that we’ll have to install to complete the project. Copy them and save them into a file named requirements.txt

cachetools==5.3.3
certifi==2024.2.2
charset-normalizer==3.3.2
confluent-kafka==2.3.0
google-api-core==2.18.0
google-api-python-client==2.123.0
google-auth==2.29.0
google-auth-httplib2==0.2.0
google-auth-oauthlib==1.2.0
google-cloud==0.34.0
google-cloud-bigquery==3.19.0
google-cloud-core==2.4.1
google-crc32c==1.5.0
google-resumable-media==2.7.0
googleapis-common-protos==1.63.0
grpcio==1.62.1
grpcio-status==1.62.1
httplib2==0.22.0
idna==3.6
numpy==1.26.4
oauthlib==3.2.2
packaging==24.0
pandas==2.2.0
proto-plus==1.23.0
protobuf==4.25.3
pyarrow==15.0.0
pyasn1==0.5.1
pyasn1-modules==0.3.0
pyparsing==3.1.2
python-dateutil==2.8.2
pytz==2024.1
requests==2.31.0
requests-oauthlib==2.0.0
rsa==4.9
six==1.16.0
tzdata==2023.4
uritemplate==4.1.1
urllib3==2.2.1

linearly separable data

Then, install all the packages with this command:

pip install -r requirements.txt

This should start the installation:

linearly separable data

Now, we need to test our API connection. Before testing it in the code, I usually prefer to test things in Postman. Postman is a platform that simplifies the process of working with APIs by providing a user-friendly interface for sending HTTP requests and viewing their responses. Go to the link previously provided and create an account:

linearly separable data

Once you are in Postman, type https://stream.companieshouse.gov.uk/companies in the request field:

linearly separable data

Go to Authorization, select Basic Auth and under Username paste your API key.

linearly separable data

In theory, this should not work, but I could not find any alternatives. Usually, you simply add the API key at the end of the URL, similar to this:

https://stream.companieshouse.gov.uk/companies?api_key=12345

However, this was the only way to make it work. If everything worked correctly, the status of the request should indicate 200 OK and you should some results:

linearly separable data

Please note that the response may take some time. In my experience, it could take approximately 30 seconds. Occasionally, the cloud agent of Postman may not function properly when a request exceeds this duration. In such cases, you can try sending the request multiple times (I had to do it up to 3 times). Alternatively, consider downloading Postman Desktop, which has a higher tolerance for lengthier requests.

Postman makes it super easy to build your API requests. Once you have built a successful request, you can translate it to actual code in multiple programming languages. I love this feature of Postman. To see what our request looks in code, click on this icon at the top right:

linearly separable data

You’ll see that the Authorization header looks different than the API key. In reality, it is a Base64-encoded representation of the API.

linearly separable data

Now, let’s go back to the terminal in VS code. Click this icon to create a new file:

linearly separable data

Name the file company_house_stream.py and click on enter:

linearly separable data

Here is the code to create the API request:

import requests
import json

# Define the base URL for the Companies House API
base_url = "https://stream.companieshouse.gov.uk/"

# Make a GET request to the 'companies' endpoint
response = requests.get(
    url=base_url + "companies",
    headers={
        "Authorization": 'Basic faefaefae==',
        "Accept": "application/json",
        "Accept-Encoding": "gzip, deflate"
    },
    stream=True  # Stream the response data
)

First of all, we’re importing the necessary packages. Then, we define the endpoint with the necessary headers, that we got from Postman. Even though my request is exactly the same as in Postman, for some reason it would not work. I later discovered that the issue is due to the stream argument. When accessing a streaming API, it is very important to specifiy this aspect. With the previous code, our request is successful, however, it doesn’t produce any output. We need to find a way to create an open connection, where results are streamed and displayed in real time.

Let’s add this piece of code:

# Stream data as it comes in
for line in response.iter_lines():
    # if there is data, decode it and convert it to json.
    if line:
        decoded_line = line.decode('utf-8')

        json_line = json.loads(decoded_line)

        # print streamed line
        print(json_line)

With this piece of code for line in response.iter_lines():, we iterate over each line of data received from the response object. iter_lines() is a method provided by the requests library to iterate over the response content line by line. With if line: we check for data. With if line: decoded_line = line.decode('utf-8'), if there is data in the line, it is decoded from bytes to a string using UTF-8 encoding. The decode() method is used to decode byte sequences into strings. With json_line = json.loads(decoded_line), once the line of data is decoded, it is assumed to be in JSON format, and the loads() method from the json module is used to parse the JSON data into a Python dictionary. Finally, it is printed. this is the final code:

import requests
import json

url = "https://stream.companieshouse.gov.uk/"

response=requests.get(
    url=url+"companies",
    headers={
        "Authorization": 'Basic ZTA1YjQzYmYtNabcabcabcabcabcabcabcabcabcabc',
        "Accept": "application/json",
        "Accept-Encoding": "gzip, deflate"
    },
    stream=True
)

print("Established connection to Company House UK streaming API")

for line in response.iter_lines():
    if line:
        decoded_line = line.decode('utf-8')

        json_line = json.loads(decoded_line)

        print(json_line)

If everything works correctly, you should have an open connection to the API and see continuosly streamed data:

linearly separable data

However, let’s try to make our code more resistant by handling exceptions:

import requests
import json

url = "https://stream.companieshouse.gov.uk/"

try:
    response=requests.get(
        url=url+"companies",
        headers={
            "Authorization": 'Basic ZTA1YjQzabcabcabcabcabcabcabcabcabcabc',
            "Accept": "application/json",
            "Accept-Encoding": "gzip, deflate"
        },
        stream=True
    )

    print("Established connection to Company House UK streaming API")

    for line in response.iter_lines():
        if line:
            decoded_line = line.decode('utf-8')

            json_line = json.loads(decoded_line)

            # Build empty JSON
            company_profile = {
                "company_name": None,
                "company_number": None,
                "company_status": None,
                "date_of_creation": None,
                "postal_code": None,
                "published_at": None
            }

            try:
                company_profile['company_name'] = json_line["data"]["company_name"]
            except KeyError:
                company_profile['company_name'] = "NA"

            try:
                company_profile['company_number'] = json_line["data"]["company_number"]
            except KeyError:
                company_profile['company_number'] = "NA"
            
            try:
                company_profile['company_status'] = json_line["data"]["company_status"]
            except KeyError:
                company_profile['company_status'] = "NA"

            try:
                company_profile["date_of_creation"] = json_line["data"]["date_of_creation"]
            except KeyError:
                company_profile["date_of_creation"] = "NA"

            try:
                company_profile["postal_code"] = json_line["data"]["registered_office_address"]["postal_code"]
            except KeyError:
                company_profile["postal_code"] = "NA"

            try:
                company_profile["published_at"] = json_line["event"]["published_at"]
            except KeyError:
                company_profile["published_at"] = "NA"

            print("")
            print("BREAK")
            print("")
            print(company_profile)

except Exception as e:
    print(f"an error occurred {e}")

Firstly, it’s essential to handle exceptions that may occur when connecting to the API. Additionally, while the API provides a lot of columns for each row, we are only interested in retrieving specific fields. To facilitate this, I’ve initialized an empty JSON placeholder:

# Build an empty JSON placeholder
company_profile = {
    "company_name": None,
    "company_number": None,
    "company_status": None,
    "date_of_creation": None,
    "postal_code": None,
    "published_at": None
}

Then, I attempt to access the desired fields using try-except blocks:

try:
    # Check if the key 'company_name' exists in json_line. If it does, assign its value to the 'company_name' key in the company_profile dictionary.
    company_profile['company_name'] = json_line["data"]["company_name"]
except KeyError:
    # If the key does not exist, assign "NA" to the 'company_name' key in the company_profile dictionary.
    company_profile['company_name'] = "NA"

As you can see, for each company data that we stream, we only keep the data we’re interested:

linearly separable data

As the next step, we’ll stream our data to Kafka!

What is Kafka?

Apache Kafka is a streaming platform that is used for building real-time data pipelines and streaming applications. In simple terms, think of Apache Kafka as a large messaging system that allows different applications to communicate with each other by sending and receiving streams of data in real-time. Apache Kafka was originally developed by engineers at LinkedIn. Kafka was conceived to address the challenges LinkedIn faced in handling real-time data streams, such as user activity tracking, operational metrics, and logging. In 2011, Kafka was open-sourced and became an Apache Software Foundation project.

In Kafka, there are essentially producers and consumers of data.

linearly separable data

However, the data is not directly shared from a producer to a consumer. The producer first sends the data to a topic, and then the consumer retrieves this data from the topic.

linearly separable data

In our project, there will only one consumer, but in reality, there could be multiple consumers retrieving data from a topic:

linearly separable data

One of the strengths of Kafka is that each topic can have multiple copies, called partitions.

linearly separable data

Think of a Kafka partition as a separate stack of ordered messages within a library. Each stack (partition) holds a series of messages related to a particular topic. This has many advantages:

  1. Parallel Work: If multiple people want to read different books, they have to wait for their turn. Now, imagine if there are several stacks of books (partitions). Different people can read different books at the same time without waiting.

  2. Scalability: Let’s say the library gets more popular, and more books keep coming in. Instead of piling them all on one stack (partition), we create more stacks. This way, each stack can handle its own set of books without getting too crowded.

  3. Reliability: What if one of the book stacks falls over or gets damaged? If all the books were in that one stack, it would be a problem. But since we have multiple stacks (partitions), if something happens to one, the others are still okay.

Kafka partitions are within a broker. A Kafka broker is a single server instance within the Kafka cluster. Brokers store and manage data (partitions) and handle requests from producers and consumers. Imagine a librarian at a specific desk in the library. Each librarian (broker) manages a set of books (partitions) and assists library visitors (producers and consumers).

A Kafka cluster is a collection of Kafka brokers that work together to handle and manage the flow of messages. Think of the Kafka cluster as a team of librarians working together in a large library.

linearly separable data

Create a Kafka Cluster

To send messages to Kafka, we first need to create a Kafka Cluster. To do that, we’ll use Confluent. Confluent is a company founded by the creators of Apache Kafka. Confluent abstracts the complexities of managing Kafka clusters, similar to how Google Cloud abstracts the underlying server infrastructure for cloud-based services.

Once you are on the homepage, click on GET STARTED FOR FREE and create an account:

linearly separable data

If you are new to Confluent, you get a $400 credit:

linearly separable data

Click on Environments and then on default:

linearly separable data

Click on Add cluster:

linearly separable data

Select the Basic plan and click on Begin configuration:

linearly separable data

Select a region and click on Continue:

linearly separable data

Click on Skip payment. Be aware that you need to add a credit card if you don’t want your application to stop after you run out of the free credit:

linearly separable data

Give a name to your cluster and click on Launch cluster:

linearly separable data

Click on Topics:

linearly separable data

Click on Create topic:

linearly separable data

Give you topic a name, I named mine company_house, and select 2 partitions, which are more than sufficient for our project. Then, click on Create with defaults:

linearly separable data

You can also define a data contract, but we’ll skip this step:

linearly separable data

Now that our topic is created, go to API keys and click on Create key:

linearly separable data

Select the Global access option and click on Next:

linearly separable data

Click on Download and continue and store your API Key and Secret somewhere safe:

linearly separable data

We’re all set, we can now start producing data to our Kafka topic!

Produce Simulated Data to a Kafka Topic

I have taken part the following guide from Confluent’s documentation. To produce data to our Kafka cluster, we need a configuration file. In the VM, copy and paste the following configuration data into a file named getting_started.ini, substituting the API key and secret that you just created for the sasl.username and sasl.password values, respectively.

[default]
bootstrap.servers=
security.protocol=SASL_SSL
sasl.mechanisms=PLAIN
sasl.username=< CLUSTER API KEY >
sasl.password=< CLUSTER API SECRET >

[consumer]
group.id=python_example_group_1

# 'auto.offset.reset=earliest' to start reading from the beginning of
# the topic if no committed offsets exist.
auto.offset.reset=earliest

To find your bootstrap server, go under Cluster Settings:

linearly separable data

Here is what it should look like:

[default]
bootstrap.servers=pkc-lzoyy.europe-west6.abcabcabcabcabc
security.protocol=SASL_SSL
sasl.mechanisms=PLAIN
sasl.username=GGPabcabcabcabcabc
sasl.password=/rmTFMqsvD4/CEs7tJEZAD6/NV9OOabcabcabcabcabcabcabcabcabcabc

[consumer]
group.id=python_example_group_1

# 'auto.offset.reset=earliest' to start reading from the beginning of
# the topic if no committed offsets exist.
auto.offset.reset=earliest

Now, create a file named producer.py and paste the following:

#!/usr/bin/env python

import sys
from random import choice
from argparse import ArgumentParser, FileType
from configparser import ConfigParser
from confluent_kafka import Producer

if __name__ == '__main__':
    # Parse the command line.
    parser = ArgumentParser()
    parser.add_argument('config_file', type=FileType('r'))
    args = parser.parse_args()

    # Parse the configuration.
    # See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
    config_parser = ConfigParser()
    config_parser.read_file(args.config_file)
    config = dict(config_parser['default'])

    # Create Producer instance
    producer = Producer(config)

    # Optional per-message delivery callback (triggered by poll() or flush())
    # when a message has been successfully delivered or permanently
    # failed delivery (after retries).
    def delivery_callback(err, msg):
        if err:
            print('ERROR: Message failed delivery: {}'.format(err))
        else:
            print("Produced event to topic {topic}: key = {key:12} value = {value:12}".format(
                topic=msg.topic(), key=msg.key().decode('utf-8'), value=msg.value().decode('utf-8')))

    # Produce data by selecting random values from these lists.
    topic = "purchases"
    user_ids = ['eabara', 'jsmith', 'sgarcia', 'jbernard', 'htanaka', 'awalther']
    products = ['book', 'alarm clock', 't-shirts', 'gift card', 'batteries']

    count = 0
    for _ in range(10):

        user_id = choice(user_ids)
        product = choice(products)
        producer.produce(topic, product, user_id, callback=delivery_callback)
        count += 1

    # Block until the messages are sent.
    producer.poll(10000)
    producer.flush()

Let’s go through the code together. This line #!/usr/bin/env python tells the terminal that the script should be executed with a python interpreter. The next piece of code parses an argument from the command line. Later, when we’ll run this script, we’ll execute it with ./producer.py getting_started.ini. getting_started.ini represents the command-line argument that we’re parsing.

# Parse the command line.
parser = ArgumentParser()
parser.add_argument('config_file', type=FileType('r'))
args = parser.parse_args()

Here, we’re are creating a Kafka Producer based on our configuration file getting_started.ini.

# Parse the configuration.
# See https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md
config_parser = ConfigParser()
config_parser.read_file(args.config_file)
config = dict(config_parser['default'])

# Create Producer instance
producer = Producer(config)

Here, we define the topic where we’ll produce the data. I named my topic company_house, so, I’ll change the name from purchases to company_house in the producer.py file. user_id and products are simply fake data that we’ll produce to our topic to test things out.

# Produce data by selecting random values from these lists.
topic = "purchases"
user_ids = ['eabara', 'jsmith', 'sgarcia', 'jbernard', 'htanaka', 'awalther']
products = ['book', 'alarm clock', 't-shirts', 'gift card', 'batteries']

Here, we are producing the data to the topic:

count = 0
for _ in range(10):

    user_id = choice(user_ids)
    product = choice(products)
    producer.produce(topic, product, user_id, callback=delivery_callback)
    count += 1

# Block until the messages are sent.
producer.poll(10000)
producer.flush()

If you go to the terminal, and if you type python3 producer.py getting_started.ini, you should see this as the output:

linearly separable data

Then, go to the Confluent console, click on Topics and then on your topic name, in my case company_house:

linearly separable data

Click on Messages:

linearly separable data

Below, you should see the messages that we just produced to the topic:

linearly separable data

This is awesome!

Produce UK’s Companies House Data to a Kafka Topic

So far, we were able to do 2 things:

  • Stream data continuously from UK’s Companies House API
  • Produce simulated data to our Kafka topic

Now, to integrate these 2 scripts, we need to stream data from UK’s Companies House API and produce it (send it) to our Kafka topic.

Create a file named producer_company_house.py and paste the following code:

#!/usr/bin/env python

import sys
from random import choice
import json
import requests
from argparse import ArgumentParser, FileType
from configparser import ConfigParser
from confluent_kafka import Producer

if __name__ == '__main__':

    url = "https://stream.companieshouse.gov.uk/"

    # Parse the command line.
    parser = ArgumentParser()
    parser.add_argument('config_file', type=FileType('r'))
    args = parser.parse_args()

    # Parse the configuration.
    config_parser = ConfigParser()
    config_parser.read_file(args.config_file)
    config = dict(config_parser['default'])

    # Set idempotence configuration
    config['enable.idempotence'] = 'true'

    # Create Producer instance
    producer = Producer(config)

    try:
        response=requests.get(
            url=url+"companies",
            headers={
                "Authorization": 'Basic ZTA1YjQzYmYtNDY4OC00YWFjLWIxZWItOTY2MWUxYzZjMDA0Og==',
                "Accept": "application/json",
                "Accept-Encoding": "gzip, deflate"
            },
            stream=True
        )

        print("Established connection to Company House UK streaming API")

        for line in response.iter_lines():
            if line:
                decoded_line = line.decode('utf-8')

                json_line = json.loads(decoded_line)

                # Build empty JSON
                company_profile = {
                    "company_name": None,
                    "company_number": None,
                    "company_status": None,
                    "date_of_creation": None,
                    "postal_code": None,
                    "published_at": None
                }

                try:
                    company_profile['company_name'] = json_line["data"]["company_name"]
                except KeyError:
                    company_profile['company_name'] = "NA"

                try:
                    company_profile['company_number'] = json_line["data"]["company_number"]
                except KeyError:
                    company_profile['company_number'] = "NA"
                
                try:
                    company_profile['company_status'] = json_line["data"]["company_status"]
                except KeyError:
                    company_profile['company_status'] = "NA"

                try:
                    company_profile["date_of_creation"] = json_line["data"]["date_of_creation"]
                except KeyError:
                    company_profile["date_of_creation"] = "NA"

                try:
                    company_profile["postal_code"] = json_line["data"]["registered_office_address"]["postal_code"]
                except KeyError:
                    company_profile["postal_code"] = "NA"

                try:
                    company_profile["published_at"] = json_line["event"]["published_at"]
                except KeyError:
                    company_profile["published_at"] = "NA"

                # Produce data to Kafka topics
                topics = {
                    "company_house": company_profile,
                }

                # Optional per-message delivery callback
                def delivery_callback(err, msg):
                    if err:
                        print('ERROR: Message failed delivery: {}'.format(err))
                    else:
                        print("Produced event to topic {}: key = {} value = {}".format(
                            msg.topic(), msg.key(), msg.value()))

                # Produce data to Kafka topics
                for topic, message in topics.items():
                    producer.produce(topic, key=message["company_number"].encode('utf-8'), value=json.dumps(message).encode('utf-8'), callback=delivery_callback)

                # Block until the messages are sent.

    except Exception as e:
        print(f"an error occurred {e}")

Let’s go through the major changes. Here, we define the topic and the content of a specific message with company_profile.

# Produce data to Kafka topics
topics = {
    "company_house": company_profile,
}

Here is what the following code does. It iterates through each topic and message pair in the topics dictionary, extracts the company_number from the message, encodes the company_number and the JSON-formatted message content to UTF-8 encoding using .encode(‘utf-8’) and produces the encoded key-value pair to the Kafka topic using producer.produce().

# Produce data to Kafka topics
for topic, message in topics.items():
    producer.produce(topic, key=message["company_number"].encode('utf-8'), value=json.dumps(message).encode('utf-8'), callback=delivery_callback)

If you type the following code in your terminal python3 producer_company_house.py getting_started.ini, it should stream data from UK’s Companies House and produce it to the Kafka topic company_house. If you go on the Confluent console, you should see incoming messages:

linearly separable data

As a next step, we’ll containerize our application.

Containerize our Application

Create a Docker Container of Our Application

A Docker container is a small box that holds everything your application needs to run, like its code, libraries, and settings. This box is like a mini-computer that you can easily move around. Docker containers work similarly. They bundle up all the software and dependencies your application needs into a single package, making it easy to run your application on any computer that has Docker installed. This is very useful because regardless of where you run your container, it will always run consistently. On the other hand, if you share your code with someone else, maybe they don’t have the necessary dependencies to run your code.

To create a Docker container, you need a Docker image, and to create a Docker image, you need a Dockerfile. A Dockerfile is like a recipe, that shows all the ingredients and steps necessary to prepare a meal. A Docker image is analogous to having the necessary ingredients and appliances ready to cook the meal. It’s a snapshot of a filesystem and its configuration that forms the basis of a container. A Docker container is like the actual prepared meal. It’s a running instance of a Docker image, containing everything needed to run the application, including the code, runtime, libraries, and dependencies.

linearly separable data

So, this is what our Dockerfile looks like. Create a new file named Dockerfile and paste the following content. Docker automatically recognizes the file named Dockerfile:

# Use an official Python runtime as a parent image
FROM python:3.10

# Set the working directory in the container
WORKDIR /app

# Copy the current directory contents into the container at /app
COPY . /app

# Install any needed packages specified in requirements.txt
RUN pip install --no-cache-dir -r requirements_docker.txt

# Make the script executable
RUN chmod +x producer_company_house.py

# Run the script with the provided configuration file as an argument
CMD ["./producer_company_house.py", "getting_started.ini"]

Here is what each line of code does. In the following snippet, we are basically taking an existing Docker image and building on top of it. It would be like buying the pre-made dough and then add your toppings to prepare your pizza.

# Use an official Python runtime as a parent image
FROM python:3.10

Here, we are setting the working directory in the container. It is like running cd from the command-line.

# Set the working directory in the container
WORKDIR /app

Here, we are copying the entire content of the current directory, the directory where your Docker image will be created, and paste all the contents.

# Copy the current directory contents into the container at /app
COPY . /app

Here, we install the necessary python packages.

# Install any needed packages specified in requirements.txt
RUN pip install --no-cache-dir -r requirements_docker.txt

Please, create a file named requirements_docker.txt and paste the following content. We are doing this because to produce data to our Kafka topic we don’t need all the packages contained in the file requirements.txt.

configparser==6.0.1
confluent-kafka==2.3.0
requests==2.31.0

This line makes the file producer_company_house.py executable. Making a file executable is necessary to allow it to be run as a program or script. When a file is marked as executable, it means that the operating system recognizes it as a program or script that can be executed directly. Without executable permissions, attempting to run the file will result in a permission denied error.

# Make the script executable
RUN chmod +x producer_company_house.py

Finally, we run the script producer_company_house.py with the config file getting_started.ini as a command-line argument.

# Run the script with the provided configuration file as an argument
CMD ["./producer_company_house.py", "getting_started.ini"]

To create our Docker image from our Dockerfile, we first need to install Docker. Let’s download a GitHub repo that contains the installation for Docker:

git clone https://github.com/MichaelShoemaker/DockerComposeInstall.git

Let’s access the folder that contains the Docker installation:

cd DockerComposeInstall

Let’s modify the file to make it executable:

chmod +x InstallDocker

Then, let’s run it:

./InstallDocker

Type this to verify that Docker has been installed correctly:

docker run hello-world

This should show the following message:

linearly separable data

Before proceeding, make sure you saved all the files mentioned in the Dockerfile. If you made any modifications to a file without saving it, the modifications will not be included in the Dockerfile.

Now, go back to the folder that contains your Dockerfile and type the following. This basically instructs the system to build a docker image called kafka producer from our Dockerfile. The dot at then end indicates the Dockerfile.

docker build -t kafka-producer .

This should build your Docker image:

linearly separable data

If you type docker images, you should see the Docker image we just built:

linearly separable data

To run a container instance of the Docker image, type:

docker run kafka-producer

linearly separable data

Now, if you go to Confluent, you’ll see that we’re actually producing data to the company_house topic:

linearly separable data

If you go under messages, you should see new incoming messages:

linearly separable data

Here are a couple of nifty tricks related to Docker. To see the Docker containers that are currently running, type:

docker ps

linearly separable data

To see all Docker containers, type:

docker ps -a

linearly separable data

To stop a running Docker container, type:

docker stop container_id

For example:

docker stop a2cd18c97c49

To see restart a Docker container, you don’t need to type docker run image_name again, otherwise you would be running a new instance of the Docker image. Once the Docker container has been created, you can simply restart it:

docker start container_id

To remove a Docker container, first, you need to stop it, then type:

docker rm container_id

To see all Docker images available, type:

docker images

Now that we know that we can produce data to Kafka, you can stop your Docker container.

So far, we’ve developed a Kafka producer that streams data from the UK’s Companies House API to a Kafka topic. Everything is nicely packaged into a Docker image, from which we’ve launched a Docker container. I would say that this is quite an achievement, congrats!

Up until now, we produced data to a Kafka topic. Right now, we need to consume this data!

From Kafka to BigQuery through Mage

Install Mage

A data orchestrator allows to build ETL (extract, transform and load) data pipelines. That means extracting data from one or multiple systems, apply some transformations to it, and then export it to another system, such as a data warehouse. Mage is a modern and very user-friendly orchestrator, that provides a visually appealing structure of your pipelines:

linearly separable data

Also, with simple drag and drop you can change how the various blocks interact with each other.

Here is the repo for the installation guide, but we’ll go through the installation here as well.

You can start by cloning the repo:

git clone https://github.com/mage-ai/mage-zoomcamp.git mage-zoomcamp

Navigate to the repo:

cd mage-zoomcamp

Rename dev.env to simply .env — this will ensure the file is not committed to Git by accident, since it will contain credentials in the future.

Now, let’s build the container with Docker compose. When we use Docker Compose to build containers, we’re essentially creating applications made up of multiple containers that can communicate with each other. This is particularly handy for complex applications where different parts of the system need to work together.

In contrast, the usual process of building Docker images involves focusing on creating one container at a time. Each Dockerfile represents a single container, and while these containers can interact, managing their communication and dependencies can become more complex as the application grows.

With Docker Compose, all the details about the containers, such as Docker images, environment variables, and dependencies, are stored in a YAML file. This file acts as a configuration blueprint for our application. For example, in our scenario, we have a YAML file that specifies both a Docker image to install Mage and a PostgreSQL database. Even if we only need Mage for our task, we can still use this YAML file to set up our environment.

docker compose build

Finally, start the Docker container:

docker compose up

In the YAML file, we have defined Mage’s port as 6789. In fact, you can access Mage by going to your VM and clicking on the browser icon next to port 6789:

linearly separable data

If everything worked correctly, you should be able to access Mage:

linearly separable data

However, right now we’re accessing Mage from the our localhost. It would be great to access it via our external IP address.

linearly separable data

The external IP address allows users outside the VM’s network to interact with Mage. This can be useful for remote access, enabling team members or clients to use Mage without needing direct access to the VM.

Type firewall in the search bar and click on Firewall:

linearly separable data

At the top, click on CREATE FIREWALL RULE:

linearly separable data

Give a name to your firewall rule, then select the following options and click on CREATE:

linearly separable data

With this, we’re basically allowing ingress traffic through port 6789, which is where Mage is located. Now, go back to your VM, copy your external IP address and type it followed by :6789. This is what it looks like in my case:

34.65.113.154:6789

Now, if you type this address in your browser you should be able to access Mage via the external IP address:

linearly separable data

Consume Data from Kafka Topic

In Mage, you can create either batch or streaming pipelines. In our case, we need to consume data in real-time from a Kafka topic, so we’ll create a streaming pipeline. At the top left, click on New and then on Streaming:

linearly separable data

Give a name to your streaming pipeline and click on Create:

linearly separable data

In Mage, a pipeline is composed of blocks. The different blocks are Data loader, Transformer and Data exporter. Your pipeline can have as many blocks as you want. As we need to consume the data from a Kafka topic, click on Data loader and select Kafka:

linearly separable data

Give a name to your block and click on Save and add:

linearly separable data

Similar to what we did in the file getting_started.ini, we need to define the configuration with the following parameters:

  • bootstrap server
  • topic
  • username
  • password

Here is what it should look like:

connector_type: kafka
bootstrap_server: "pkc-lzoyblablablablaluent.cloud:9092"
topic: company_house
consumer_group: unique_consumer_group
include_metadata: false
api_version: 0.10.2

# Uncomment the config below to use SASL_SSL config
security_protocol: "SASL_SSL"
sasl_config:
    mechanism: "PLAIN"
    username: GGPabcabcaabcabca
    password: /rmTFMqsvD4/CEs7tJEZAD6/NV9Oabcabcaabcabcaabcabcaabcabcaabcabca

In the Data loader block, we’re consuming the data. However, if you run the pipeline with this block only, there is no output of the consumed data. To see the data, we also need to add a Transformer block. Click on Transformer > Python > Generic (no template):

linearly separable data

Give a name to your block and click on Save and add:

linearly separable data

Now, we are ready to consume data. However, we are currently not producing any. To produce data, we can simply start the Docker container that we previously created. So, let’s see what is the container id of our Docker container:

docker ps -a

linearly separable data

Therefore, we’ll type:

docker start 3d7a82c606d4

Soon enough, on Confluent we should see that we are producing data:

linearly separable data

Now, go back to Mage and click on Execute pipeline:

linearly separable data

Soon enough, you should see that we’re consuming messages from Kafka:

linearly separable data

Congrats! As a next step, we’ll send the streamed data to BigQuery.

Send Streamed Data to BigQuery

Now, let’s send the data that we’re streaming to BigQuery. To do that, we first need to create a data set in BigQuery. In BigQuery, a data set is the equivalent of a database in other systems. So, go to Google Cloud, type big query and click on BigQuery:

linearly separable data

If you havent’ done it already, enable the BigQuery API:

linearly separable data

The basic structure of BigQuery is Project > Dataset > Tables. In our case, we first have to create a dataset. Click on the three dots next to your project name and click on Create dataset:

linearly separable data

Give a name to your dataset, select a region and click on CREATE DATASET:

linearly separable data

In our case, we’re sending data to BigQuery from Mage, which is a production environement. On the other hand, when you try to send data manually from your local machine, you can login to your Google account through the browser and grant permission to your script to perform tasks. This image provides an example of a manual authentication through the browser to grant access to the Tidyverse API Packages to make changes on BigQuery:

linearly separable data

However, in a production environment, we don’t have the luxury to go through the browser’s authentication process, so we need to find another solution to prove to Google that we’re actually authorized to perform tasks. To do that, we can create a service account. A service account is essentially an account authorized to make changes on your behalf. Go to the VM and type service accounts in the search bar and click on Service Accounts:

linearly separable data

On the top left, click on CREATE SERVICE ACCOUNT:

linearly separable data

Give a name to your service account and click on CREATE AND CONTINUE:

linearly separable data

This service account will only interact with BigQuery, so I only gave it BigQuery Admin permissions. If you want your service account to interact with other resources, you should extend your permissions. Click on CONTINUE:

linearly separable data

Finally, click on DONE:

linearly separable data

Click on the service account that you just created:

linearly separable data

Go to KEYS:

linearly separable data

Click on ADD KEY, and then Create new key:

linearly separable data

Select JSON and click on CREATE:

linearly separable data

This will download the credentials of the service account that we’ll use to make changes on BigQuery when in a non-interactive environement. To add the credentials to the VM, drag them to this area of VS code:

linearly separable data

One of the worst mistakes you could make is to inadvertently publish your credentials to a GitHub repository. To prevent this, we can create a file named .gitignore. Any content listed in this file will be ignored by GitHub, ensuring it’s not published in the current repository. Since our credentials are stored in .json format, to prevent them from being added to our GitHub repository, simply add the following line to your .gitignore file:

# Ignore files with .json extension
*.json

Currently, we’ve only created a dataset in BigQuery. To begin streaming data, we need to create a table. So, let’s start by pushing some mock streaming data to a new table. Open a new python file and paste this code:

import io
import json
import pandas as pd
import pyarrow.parquet as pq
from google.cloud import bigquery # pip install google-cloud-bigquery and pyarrow as a dependency
from google.oauth2 import service_account

credentials = service_account.Credentials.from_service_account_file(
    '/home/arbenkqiku/streaming-pipeline/streaming-pipeline-418713-7f7d915b1fc7.json', scopes=['https://www.googleapis.com/auth/cloud-platform'],
)

client = bigquery.Client(project=credentials.project_id, credentials=credentials)

job_config = bigquery.LoadJobConfig()

json_example = {'company_name': 'CONSULTANCY, PROJECT AND INTERIM MANAGEMENT SERVICES LTD', 'company_number': '13255037', 'company_status': 'active', 'date_of_creation': '2021-03-09', 'postal_code': 'PE6 0RP', 'published_at': '2024-03-23T18:37:03'}
df = pd.DataFrame([json_example])

df['date_of_creation'] = pd.to_datetime(df['date_of_creation'])
df['published_at'] = pd.to_datetime(df['published_at'])

table_name = 'company_house_stream'

table_id = '{0}.{1}.{2}'.format(credentials.project_id, "company_house", table_name)
job_config.write_disposition = bigquery.WriteDisposition.WRITE_APPEND

# Upload new set incrementally:
# ! This method requires pyarrow to be installed:
job = client.load_table_from_dataframe(
    df, table_id, job_config=job_config
)

Let’s analyze what this code snippet does. The first piece of code loads service account credentials from a JSON key file located at /home/arbenkqiku/streaming-pipeline/streaming-pipeline-418713-7f7d915b1fc7.json. These credentials are used to authenticate with Google Cloud services, in our case with BigQuery.

credentials = service_account.Credentials.from_service_account_file(
    '/home/arbenkqiku/streaming-pipeline/streaming-pipeline-418713-7f7d915b1fc7.json', scopes=['https://www.googleapis.com/auth/cloud-platform'],
)

The next code snippet initializes a BigQuery client using the obtained credentials and specifies the project ID associated with those credentials and creates a BigQuery load job configuration (job_config):

client = bigquery.Client(project=credentials.project_id, credentials=credentials)
job_config = bigquery.LoadJobConfig()

The following code snippet creates a DataFrame df containing a single example record json_example representing data to be loaded into BigQuery. This record includes fields like ‘company_name’, ‘company_number’, ‘company_status’, etc. It then converts date fields (‘date_of_creation’ and ‘published_at’) in the DataFrame to datetime objects using pd.to_datetime() to ensure compatibility with BigQuery’s date format.

json_example = {'company_name': 'CONSULTANCY, PROJECT AND INTERIM MANAGEMENT SERVICES LTD', 'company_number': '13255037', 'company_status': 'active', 'date_of_creation': '2021-03-09', 'postal_code': 'PE6 0RP', 'published_at': '2024-03-23T18:37:03'}
df = pd.DataFrame([json_example])

df['date_of_creation'] = pd.to_datetime(df['date_of_creation'])
df['published_at'] = pd.to_datetime(df['published_at'])

Here we define the destination table_id where the DataFrame data will be loaded. This table is located in the company_house dataset within the specified project.

table_name = 'company_house_stream'
table_id = '{0}.{1}.{2}'.format(credentials.project_id, "company_house", table_name)

This configures the job to append the DataFrame data to the existing table if it already exists.

job_config.write_disposition = bigquery.WriteDisposition.WRITE_APPEND

Finally, this initiates a BigQuery load job job to upload the DataFrame data to the specified table.

# Upload new set incrementally:
# ! This method requires pyarrow to be installed:
job = client.load_table_from_dataframe(
    df, table_id, job_config=job_config
)

If you now go to BigQuery, you should see a table named company_house_stream. If you click on PREVIEW, you should see the mock-up data that we sent.

linearly separable data

Now, let’s got back on Mage to the transfomer block, remove the existing code and add this code:

from typing import Dict, List
import pandas as pd
import pyarrow as pa
from google.cloud import bigquery
from google.oauth2 import service_account

if 'transformer' not in globals():
    from mage_ai.data_preparation.decorators import transformer

@transformer
def transform(messages: List[Dict], *args, **kwargs):

    # define container for incoming messages
    incoming_messages = []

    counter = 1
    for msg in messages:
        # print(counter)
        counter += 1
        # print(msg)

        # append each message
        incoming_messages.append(msg)
    
    # turn into a pandas data frame
    df = pd.DataFrame(incoming_messages)

    # convert string columns to date or date time
    df['date_of_creation'] = pd.to_datetime(df['date_of_creation'])
    df['published_at'] = pd.to_datetime(df['published_at'])

    # define credentials
    credentials = service_account.Credentials.from_service_account_file(
        '/home/src/streaming-pipeline-418713-7f7d915b1fc7.json', scopes=['https://www.googleapis.com/auth/cloud-platform'],
    )

    # define client
    client = bigquery.Client(project=credentials.project_id, credentials=credentials)

    # define job
    job_config = bigquery.LoadJobConfig()

    # define table name and big query details
    table_name = 'company_house_stream'
    table_id = '{0}.{1}.{2}'.format(credentials.project_id, "company_house", table_name)
    job_config.write_disposition = bigquery.WriteDisposition.WRITE_APPEND

    print("BLOCK 3.1")

    print(df)

    # Upload new set incrementally:
    # ! This method requires pyarrow to be installed:
    job = client.load_table_from_dataframe(
        df, table_id, job_config=job_config
    )

    print(df.head())

    return df

Let’s analyze this code. Here we iterate over each message in the messages list and append it to a new list called incoming_messages. This is essentially collecting all the incoming messages from Kafka into a single list.

# define container for incoming messages
incoming_messages = []

counter = 1
for msg in messages:
    # print(counter)
    counter += 1
    # print(msg)

    # append each message
    incoming_messages.append(msg)

This converts the list of dictionaries incoming_messages into a Pandas DataFrame df.

    # turn into a pandas data frame
    df = pd.DataFrame(incoming_messages)

The rest of the code is basically the same as the previous script. The only thing that really changes is the location of our credentials, as we’re now in Mage and not in the VM:

# define credentials
credentials = service_account.Credentials.from_service_account_file(
    '/home/src/streaming-pipeline-418713-7f7d915b1fc7.json', scopes=['https://www.googleapis.com/auth/cloud-platform'],
)

One of the cool features of Mage is that you can access its internal terminal, and see what files are available and what are the respective paths:

linearly separable data

First of all, as you can see the default path in Mage starts with /home/src. However, when we type ls the credentials are nowhere to be seen:

linearly separable data

This is because we actually need to drag our credentials into the Mage folder:

linearly separable data

Once we do that, we see that the credentials are available:

linearly separable data

Now we are able to test the pipeline! Go back to the terminal and start the Docker container that produces data to the Kafka topic. Once you see incoming data on Confluent, go to Mage and click on Execute pipeline:

linearly separable data

On Mage, you should see the data that we’re consuming:

linearly separable data

On BigQuery, you should see the number of rows gradually increasing:

linearly separable data

Here is approximately 1 minute later:

linearly separable data

Congrats! We just built a working streaming pipeline, that:

  • Streams data from UK’s Companies House API
  • Produces it to a Kafka topic
  • Consumes it from a Kafka topic
  • And finally streams it to a BigQuery table

Create Table that Contains all UK’s Companies House Data

For now, let’s stop our Docker container that produces data to the Kafka topic and cancel our pipeline on Mage. Next, we need to create a BigQuery table containing all the available data from the UK’s Companies House up to March 2024. I downloaded the data from here and cleaned it up. You can find the cleaned data here; please download it. In the next step, as we stream incoming data, we’ll merge it with this table containg all the available data.

Here is the script to send the overall table of UK’s Companies House data to BigQuery. Given the size of the table, namely 437 MB, I was not able to run the script in my VM so I had to run it locally, which took almost 15 minutes. Anyway, this does not have any incidence on our project as in the end everything will run programmatically in the cloud.

import io
import json
import pandas as pd
import pyarrow.parquet as pq
from google.cloud import bigquery # pip install google-cloud-bigquery and pyarrow as a dependency
from google.oauth2 import service_account

credentials = service_account.Credentials.from_service_account_file(
    '/home/arbenkqiku/streaming-pipeline/mage-zoomcamp/streaming-pipeline-418713-7f7d915b1fc7.json', scopes=['https://www.googleapis.com/auth/cloud-platform'],
)

df = pd.read_csv("company_house_core_clean.csv")

client = bigquery.Client(project=credentials.project_id, credentials=credentials)

job_config = bigquery.LoadJobConfig()

table_name = 'company_house_core'

table_id = '{0}.{1}.{2}'.format(credentials.project_id, "company_house", table_name)
job_config.write_disposition = bigquery.WriteDisposition.WRITE_APPEND

# Upload new set incrementally:
# ! This method requires pyarrow to be installed:
job = client.load_table_from_dataframe(
    df, table_id, job_config=job_config
)

These are the only two aspects that differ from the previous script. First of all, we read a csv of the clean data. Then, the table name changes.

df = pd.read_csv("company_house_core_clean.csv")

table_name = 'company_house_core'

Please makre sure that the columns date_of_creation and month_of_creation are of type DATE:

linearly separable data

Apply Transformations with dbt (data build tool)

How to Set-Up dbt

dbt, also known as Data Build Tool, is designed for data transformation tasks. It enables the creation of reusable modules, which are essentially reusable blocks of SQL code that can be consistently applied across different scenarios. For example, you can ensure that a KPI is calculated in a standardized manner by utilizing the same dbt module across various contexts. Additionally, dbt introduces programming language features such as functions and variables into SQL workflows, expanding the capabilities beyond traditional SQL. Furthermore, dbt offers version control, a feature typically absent in SQL environments, allowing for better management and tracking of changes to data transformations.

In our case, we’ll use dbt to create the following workflow:

linearly separable data

  • (1) As new data is streamed in, we capture a snapshot of the latest information, which includes all data accumulated since the previous snapshot was taken.
  • (2) From this data, we extract the most recent timestamp available. This timestamp will be helpful for retrieving the next snapshot.
  • (3) Subsequently, we utilize the snapshot to update the overall UK’s Companies House data.
  • This process repeats as we capture another snapshot of the most recent streamed data, using the last timestamp from the previous snapshot.

This enables continuous updates to the UK’s Companies House data as new data is streamed.

Let’s set up dbt. To start, we’ll focus on two key tasks:

  • Linking GitHub Repository with dbt for Versioning: this involves connecting your GitHub repository with dbt to enable version control.
  • Linking BigQuery with dbt for Data Retrieval and Pushing: this step entails establishing a connection between BigQuery and dbt, allowing you to retrieve data from BigQuery and push transformed data back into BigQuery.

Go to this link and click on Create a free account:

linearly separable data

Click on the settings icon on the top right and the click on Account settings:

linearly separable data

On the left side, click on Integrations and then on GitHub:

linearly separable data

Click on your profile settings:

linearly separable data

Click on Link at the GitHub row and follow the process:

linearly separable data

Come back and click on Configure integration in GitHub:

linearly separable data

Select your account and click on Configure:

linearly separable data

In my case, I am confirming my access through my GitHub app:

linearly separable data

Give it access to your repo and click on Save:

linearly separable data

Now, go back to dbt and click on projects:

linearly separable data

If you don’t have a project click on New project:

linearly separable data

Give a name to your project and click on Configure Repository:

linearly separable data

Click on GitHub:

linearly separable data

Choose the repo that you just configured:

linearly separable data

Now, click on Configure Connection:

linearly separable data

Select BigQuery and click on Upload a Service Account JSON file:

linearly separable data

Select the JSON file (the credentials) that we previously created, the same that we used to upload the entire UK’s Companies House data set:

linearly separable data

Make sure the location matches the datasets in BigQuery. Otherwise, dbt won’t be able to use the existing tables to create new ones:

linearly separable data

Finally, click on Save:

linearly separable data

Once you’re done, on the top left, click on Develop and on Cloud IDE:

linearly separable data

On the left side, click on Initialize dbt project:

linearly separable data

As you can see, some files, which represent the basic structure of a dbt project, were created. Click on Commit and sync to push the changes to the GitHub repository:

linearly separable data

As mentioned earlier, dbt enforces good software development practices. In this case, it is asking us to create a new branch and to commit to it. Give a name to your new branch, add a message to your commit and click on Commit Changes:

linearly separable data

Once your new branch has been created, and you are done with your development, you can click on Create a pull request on GitHub:

linearly separable data

Click on Create pull request:

linearly separable data

If there are no conflicts, GitHub will tell you that you can merge with the main branch. Click on Merge pull request and then Confirm merge:

linearly separable data

In theory, now you could safely delete your development branch. However, in our case we have just started, so we won’t delete it yet.

Create dbt Data Sources

First of all, let’s create a new folder for our staging models and data sources. In fact, to create data models, you need data sources. Click on the three dots next to the models folder, and click on Create folder:

linearly separable data

Call your folder staging and click on Create:

linearly separable data

Within the staging folder, click on Create file:

linearly separable data

Click on Save as at the top right:

linearly separable data

Name your file schema.yml and click on Create:

linearly separable data

Paste the following code:

version: 2

sources:
  - name: staging
    database: streaming-pipeline-418713 # BigQuery project id
    schema: company_house # BigQuery data set
    tables:
      - name: company_house_core
      - name: company_house_stream

In this configuration:

  • version: 2: Specifies the version of the YAML configuration file.
  • sources: Defines the data sources used in the project.
  • name: staging: Provides a name for the data source.
  • database: streaming-pipeline-418713: Specifies the BigQuery project ID where the data resides.
  • schema: company_house: Indicates the BigQuery dataset containing the tables.
  • tables: Lists the tables within the specified dataset.
  • name: company_house_core: Names one of the tables within the dataset.
  • name: company_house_stream: Names another table within the dataset.

This configuration allows dbt to locate and access the specified data in BigQuery. It is essentially a representation of the data that we have in BigQuery:

linearly separable data

Create dbt Models

Let’s begin constructing our initial model. We encounter a classic “chicken and egg” scenario: we require a snapshot of the most recent data, yet we lack a timestamp. To address this, our first step is to generate a timestamp.

linearly separable data

In the staging folder, create a file named get_last_timestamp.sql and paste the following code. This code snippet is written in Jinja templating language. This is used by dbt for managing and transforming data in data warehouses.

linearly separable data

Unfortunately, for some reason my site was not able to parse jinja code, so, I’ll simply paste here an image of the code. In any case, you can find the code in my GitHub repo.

Here’s a breakdown of what each part of the code does:

The config part is a Jinja directive used to configure settings for the dbt model. In this case, it sets the materialization method for the dbt model to table, indicating that the results of the SQL query will be stored in a table.

select published_at from order by published_at limit 1: This is a SQL query written inside the Jinja template. It selects the published_at column from the company_house_stream table in the staging schema. The “” syntax is a Jinja function call that dynamically generates the name of the table based on the provided arguments.

In this query, we’re selecting the first timestamp because it represents the initial retrieval of streamed data. However, for subsequent iterations, we’ll use the last timestamp from the previous snapshot, as we want the most recent timestamp.

order by
    published_at desc
limit
    1

Once you have created your model, click on Save at the top right:

linearly separable data

At the bottom, if you click on Compile:

linearly separable data

You can see what the code looks like in actual SQL:

linearly separable data

If you click on Preview, you can see what the actual result would be:

linearly separable data

If you click on Build, it would build your model, which means that a table named get_last_timestamp will be created:

linearly separable data

If you click at the bottom left corner:

linearly separable data

You can see the details that went into building your model:

linearly separable data

Now, on BigQuery you should see a new dataset called dbt_ followed by your username, in addition to the table get_last_timestamp:

linearly separable data

The dataset has this name because we’re still in a development environement. Later, when we’ll push our dbt models to production, a new dataset called prod will be created.

While developing, given that we’ll integrate the newly streamed data into the complete UK’s Companies House data, and given that our models will be pushed to the dbt_arbenkqiku dataset, we need to have the complete UK’s Companies House data in our development dataset as well. Create a new dbt model called company_house_core.sql and paste the following code:

linearly separable data

Save the model and click on Build. In BigQuery, next to the development dataset, click on the three dots and then click on Refresh contents:

linearly separable data

You should see a replica of the complete UK’s Companies House data:

linearly separable data

Now that we have a new dataset with two tables, we can add them to our configuration file schema.yml. We can call these sources development:

version: 2

sources:
  - name: staging
    database: streaming-pipeline-418713 # BigQuery project id
    schema: company_house # BigQuery data set
    tables:
      - name: company_house_core
      - name: company_house_stream
  - name: development
    database: streaming-pipeline-418713 # BigQuery project id
    schema: dbt_arbenkqiku # BigQuery data set
    tables:
      - name: company_house_core
      - name: get_last_timestamp

Now, create a new dbt model named snapshot_streamed_data.sql and paste the following code:

linearly separable data

In this code, by using the timestamp that we retrieved earlier, we only get the data that is newer then the last timestamp. This is reflected in the condition stream.published_at > last_timestamp.published_at. This ensures that we create a snapshot of the most recent data. Also, here I purposely modify the company status with concat("test_", current_timestamp()) as company_status. This is because later we will merge the snapshot of the streamed data with the main table, and I want to ensure that the data in the main table is properly updated. Of course, later we’ll use the actual company_status. Save your dbt model and build it.

If everything worked correctly, you should see a new table named snapshot_streamed_data:

linearly separable data

Also, let’s add this new table to our configuration file by adding this line:

- name: snapshot_streamed_data

Now, let’s integrate the streamed data in the complete UK’s Companies House data. To do so, we need to replace the content of the dbt model called company_house_core.sql with the following code. This is because we want to update the BigQuery table called company_house_core:

linearly separable data

The following code snippet implements an incremental materialization strategy, which means updating the table instead of recreating it. Specifically, it updates rows based on a unique identifier called company_number. The model compares existing data with incoming data for each company_number and updates rows if there are any differences.

linearly separable data

Here we create a common table expression (CTE) named ranked_data. We do this because we want to able to filter out duplicate rows. In this line of code specifically row_number() over (partition by company_number order by published_at desc) as row_num, with this window function we group data by company number, order it by the column published_at in descending order, and count the row numbers.

linearly separable data

Finally, with the condition row_num = 1, we only select the most recent instance for each company_number.

select
    company_name,
    company_number,
    company_status,
    date_of_creation,
    postal_code,
    month_of_creation,
from
    ranked_data
where
    row_num = 1

Now, we need to verify that the data has been updated correctly in the company_house_core table. If we run this query in BigQuery, where we’re essentially extracting the rows from the company_house_core table that match a company_number contained in the table snapshot_streamed_data:

select
  *
from
  `streaming-pipeline-418713.dbt_arbenkqiku.company_house_core`
where
  company_number in 
    (
    select
      distinct company_number
    from
      `streaming-pipeline-418713.dbt_arbenkqiku.snapshot_streamed_data`
    )

We can see that the column company_status matches the artifically created column:

linearly separable data

Test the Entire Pipeline in a Development Environment

First of all, in the dbt model get_last_timestamp.sql, replace order by published_at with order by published_at desc, as we want to get the last timestamp. Also, replace the source as we want to get the last timestamp from the most recent snapshot. This is what the final code looks like:

linearly separable data

Save the dbt model and click on Build. Now, if you check the timestamp in the table get_last_timestamp, you should have the most recent timestamp available from the table snapshot_streamed_data:

linearly separable data

Now, go to the terminal and start the Docker container that produces data to the Kafka topic. When you see production data in the Confluent console, start the Mage streaming pipeline as well. After a couple of minutes, stop these 2 process as we have generated enough new data to test things in our dbt development environment.

In dbt, build (run) the model snapshot_streamed_data.sql. If you then go to BigQuery, you should see that the table snapshot_streamed_data only contains the most recent streamed data.

Finally, we can build model company_house_core.sql. If we extract the unique company_number values from the company_house_core table, we can see that the data has been updated successfully:

linearly separable data

Right now, we are on the development branch of our GitHub repo. To use our dbt models in production, we need to make a pull request to our main branch, and then merge the dev branch with the main branch, as we did at the beginning of the dbt section:

linearly separable data

Test the Pipeline in Production

In dbt production, a new dataset named prod will be created. However, the entire UK’s Companies House data does not exist in this dataset. So, in the dbt model company_house_core.sql replace the content with this code:

linearly separable data

Save the model and push it to production. Now, from the dbt menu, click on Deploy and then on Jobs:

linearly separable data

Click on Create job and then on Deploy job:

linearly separable data

Give a name to your job, add the command dbt build --select company_house_core, which essentially replicates the entire UK’s Companies House data, and click on Save at the top right:

linearly separable data

Then, go back to that job and click on Run now at the top right:

linearly separable data

Below, at the run history, you can see the current run. Click on it:

linearly separable data

If everything worked correctly, all the steps should be green:

linearly separable data

Also, in BigQuery you should see a new dataset named prod with the company_house_core table:

linearly separable data

Now, given that our production dataset is called prod, in our configuration environment we need to replace the dataset from dbt_arbenkqiku to prod. Also, I replaced the name of the data sources from development to production:

version: 2

sources:
  - name: staging
    database: streaming-pipeline-418713 # BigQuery project id
    schema: company_house # BigQuery data set
    tables:
      - name: company_house_core
      - name: company_house_stream
  - name: production
    database: streaming-pipeline-418713 # BigQuery project id
    schema: prod # BigQuery data set
    tables:
      - name: company_house_core
      - name: get_last_timestamp
      - name: snapshot_streamed_data

development should be replaced with production in all the models. Also, in the model company_house_core.sql we can replace the code that replicates the entire UK’s Companies House data with the code that merges the most recent streamed data:

linearly separable data

As before, commit and sync your changes, create a pull request on GitHub and merge the branches. Remember, if your models are not in the main branch they won’t run in production.

Now, go back to Jobs, and create a job for each model:

  • snapshot_streamed_data
  • company_house_core_prod. This was already created.
  • get_last_timestamp

Here is what the job Snapshot streamed data looks like:

linearly separable data

Here is what the job Get last timestamp looks like:

linearly separable data

Now, as before, go to the terminal and start the Docker container that produces data to the Kafka topic. When you see production data in the Confluent console, start the Mage streaming pipeline as well. After a couple of minutes, stop these 2 process as we have generated enough new data to test things in our dbt production environment.

Now, let’s run our jobs in the following order:

  • Snapshot streamed data
  • Core Prod
  • Get last timestamp

Select the job Snapshot streamed data. Click on Run now:

linearly separable data

There should be a new snapshot of data:

linearly separable data

Now, select the job Core Prod. Click on Run now:

linearly separable data

We can see that the data was successfully integrated in the entire UK’s Companies House data:

linearly separable data

If we retrieve the most recent timestamp from the snapshot_streamed_data table, we get this

linearly separable data

Now, let’s run the Get last timestamp job and see if we get the same result:

linearly separable data

Aaaaaaand it matches:

linearly separable data

Schedule and Run the Entire Pipeline

Previously, we ran our jobs in the following order:

  • Snapshot streamed data
  • Core Prod
  • Get last timestamp

Now, we need to schedule them. The job Snapshot streamed data, needs to be run separately from the other jobs. This is because we must ensure that the snapshot data remains static and does not change.

Go to the Snapshot streamed data job and click on Settings:

linearly separable data

Click on Edit:

linearly separable data

Scroll down, toggle Run on schedule, select Cron schedule and the following Custom cron schedule:

0,20,40 * * * *

linearly separable data

This means that this job will always run at minute 0, 20, and 40 of every hour, every day. Save the job.

Create a new job named Core Prod & Get last timestamp with the following commands:

dbt build --select company_house_core
dbt build --select get_last_timestamp

linearly separable data

Add the following cron schedule:

10,30,50 * * * *

linearly separable data

Save the job. On dbt, the maximum frequency for a job is every 10 minutes. Therefore, to test whether everything runs smoothly, we have to be patient.

Now, as before, go to the terminal and start the Docker container that produces data to the Kafka topic. When you see production data in the Confluent console, go to Mage and click on Triggers:

linearly separable data

Click on New trigger:

linearly separable data

Give a name to the trigger, click on Schedule and then on Save:

linearly separable data

Now click on Enable trigger:

linearly separable data

Our pipeline is now running, it consumes data from Kafka and sends it to BigQuery:

linearly separable data

On Confluent, we can see that we’re both producing and consuming 64 bytes per second:

linearly separable data

So, I let the entire pipeline run for a couple of hours and no issues were detected. The newly streamed data was successfully integrated into the entire UK’s Companies House data:

linearly separable data

If you recall, in the model snapshot_streamed_data.sql, the column company status was defined as concat("test_", current_timestamp()) as company_status. This was done for testing purposes. Now that we know that everything runs smoothly, we can replace it with simply company_status.