Real-time Data Collection and Processing using AI/ML on OpenShift AI

In this learning exercise, we'll explore how to set up a robust system for processing live image streams using Apache Kafka, OpenShift AI, and PostgreSQL. By leveraging all components, we are developing a computer vision system for pet inventory management in retail settings. We simulate pet shop environments using camera data to train a model that can accurately detect and track animals within the store. This will ensure real-time inventory monitoring and potentially improve animal welfare by minimizing the risk of misplaced pets.

Try it in our Sandbox

Prerequisites:


Lesson 1:

1. Launch an Jupyter notebook with TensorFlow server image

This lesson delves into image prediction within the OpenShift AI platform. We'll explore how to train a model using a dataset of cat and dog images to achieve this task. To get started, we'll establish a new Data Science project within OpenShift AI that leverages a pre-configured TensorFlow image. This image provides a ready-to-use environment for building and training your machine learning models.

Creating a New Workbench in OpenShift AI

  • Log in and navigate to the OpenShift AI platform to access the OpenShift AI Dashboard.
  • Locate the "Data Science Projects" option in the left-hand menu and click on it.

Choose a Project:

  • Click on the pre-created Data Science project associated with your username.

Create a Workbench:

  • Within the chosen project, navigate to the "Workbenches" tab.
  • Click on "Create Workbench" to initiate the workbench creation process.

Configure the Workbench:

  • Provide a descriptive name for your workbench.
  • Under "Notebook image," select "TensorFlow" and keep the version set to "Recommended".
  • In the "Deployment size" section, choose "Medium" for the container size.
  • Change the "Cluster storage" setting to 10Gi.

Finalize Creation

  • Click on "Create Workbench" to initiate the creation process and launch your new workbench environment, as shown in Figure 1 below.
The review page displays both the workbench and cluster storage.
Figure 1: Workbench is in Running state.

Verify Workbench Status:

  • Once you click "Create Workbench", monitor the workbench status. It should eventually transition to "Running", indicating successful creation.

Access Jupyter Notebook:

  • Locate the "Open" button displayed next to your newly created workbench (refer to the provided image for reference). Click on this button.
  • You might encounter a permission prompt. Select the option "Allow selected permissions" to grant the necessary access.

Your Jupyter Notebook environment will come pre-configured with TensorFlow and its dependencies (Figure 2).

The Explore page of Jupyter Notebook provides a plethora of options to begin with.
Figure 2: Jupyter notebook landing page.

The Jupyter notebook offers the ability to fetch or clone existing GitHub repositories, just like any standard IDE. Follow these instructions to clone an existing simple ML/AI project into the notebook.

  • At the top, click on the "Git Clone" icon.
  • In the popup window, enter the following GitHub URL: https://github.com/redhat-developer-demos/openshift-ai.git
  • Click the "Clone" button as shown below.
  • Navigate to the "openshift-ai/2_Cat-dog-prediction" directory.

2. Access to datasets

We are collecting the dataset of images from Kaggle. Kaggle is the world's largest data science community tool. To get access to Kaggle, use this link

Obtaining Your Kaggle API Token

To interact with the Kaggle API from your Jupyter Notebook environment, you'll need a personal API token. Here's how to obtain it:

  • Access Your Kaggle Account Settings: Navigate to your Kaggle account settings. You can typically access these settings by clicking on your profile icon in the top right corner and selecting "Account" or a similar option.
  • Generate Your Token: Within the API section, find a button labeled "Create New Token". Clicking this button will initiate the token creation process.
  • Download the kaggle.json File: Upon creating the token, your web browser should automatically download a file named "kaggle.json". This file contains your unique API credentials and is essential for authorizing your Jupyter Notebook to access Kaggle datasets.
To obtain the token for your account, please visit the settings page.
Figure 3: Kaggle settings page.

3. Image prediction python code

This section delves into image prediction using TensorFlow and leverages a dataset from the Kaggle platform. We'll train a model to classify cat and dog images, assigning "0" to cat images and "1" to dog images. This serves as a foundational example of supervised machine learning and showcases the capabilities of OpenShift AI.

Select the "Notebook Python 3.9" option from the landing page of Jupyter Notebook.

Upload the “kaggle.json” file using the "Upload Files" option in the Notebook.

Execute the following code in your Jupyter notebook one by one in notebook cells.

!mkdir -p ~/.kaggle
!cp kaggle.json ~/.kaggle/
pip install kaggle
!kaggle datasets download -d salader/dogs-vs-cats

Dataset URL: https://www.kaggle.com/datasets/salader/dogs-vs-cats

License(s): unknown

Downloading dogs-vs-cats.zip to /opt/app-root/src

100%|██████████████████████████████████████| 1.06G/1.06G [00:15<00:00, 64.6MB/s]

100%|██████████████████████████████████████| 1.06G/1.06G [00:15<00:00, 73.1MB/s]

import zipfile
zip_ref = zipfile.ZipFile('dogs-vs-cats.zip', 'r')
zip_ref.extractall('/opt/app-root/src/cat-doc-predict')
zip_ref.close()
import tensorflow as tf
from tensorflow import keras
from keras import Sequential
from keras.layers import Dense,Conv2D,MaxPooling2D,Flatten,BatchNormalization,Dropout
# generators
train_ds = keras.utils.image_dataset_from_directory(
   directory='cat-doc-predict/train',
   labels='inferred',
   label_mode='int',
   batch_size=32,
   image_size=(256, 256)
)

validation_ds = keras.utils.image_dataset_from_directory(
   directory='cat-doc-predict/test',
   labels='inferred',
   label_mode='int',
   batch_size=32,
   image_size=(256, 256)
)

Found 20000 files belonging to 2 classes.

Found 5000 files belonging to 2 classes.

# Normalize
def process(image, label):
    image = tf.cast(image / 255.0, tf.float32)
    return image, label


train_ds = train_ds.map(process)
validation_ds = validation_ds.map(process)

# Create CNN model
model = Sequential()

model.add(Conv2D(32, kernel_size=(3, 3), padding='valid', activation='relu'
                 , input_shape=(256, 256, 3)))
model.add(BatchNormalization())
model.add(MaxPooling2D(pool_size=(2, 2), strides=2, padding='valid'))

model.add(Conv2D(64, kernel_size=(3, 3), padding='valid', activation='relu'))
model.add(BatchNormalization())
model.add(MaxPooling2D(pool_size=(2, 2), strides=2, padding='valid'))

model.add(Conv2D(128, kernel_size=(3, 3), padding='valid', activation='relu'))
model.add(BatchNormalization())
model.add(MaxPooling2D(pool_size=(2, 2), strides=2, padding='valid'))

model.add(Flatten())
model.add(Dense(128, activation='relu'))
model.add(Dropout(0.1))
model.add(Dense(64, activation='relu'))
model.add(Dropout(0.1))
model.add(Dense(1, activation='sigmoid'))
model.summary()
Expected output:
Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
=================================================================
 conv2d (Conv2D)             (None, 254, 254, 32)      896       
                                                                 
 batch_normalization (Batch  (None, 254, 254, 32)      128       
 Normalization)                                                  
                                                                 
 max_pooling2d (MaxPooling2  (None, 127, 127, 32)      0         
 D)                                                              
                                                                 
 conv2d_1 (Conv2D)           (None, 125, 125, 64)      18496     
                                                                 
 batch_normalization_1 (Bat  (None, 125, 125, 64)      256       
 chNormalization)                                                
                                                                 
 max_pooling2d_1 (MaxPoolin  (None, 62, 62, 64)        0         
 g2D)                                                            
                                                                 
 conv2d_2 (Conv2D)           (None, 60, 60, 128)       73856     
                                                                 
 batch_normalization_2 (Bat  (None, 60, 60, 128)       512       
 chNormalization)                                                
                                                                 
 max_pooling2d_2 (MaxPoolin  (None, 30, 30, 128)       0         
 g2D)                                                            
                                                                 
 flatten (Flatten)           (None, 115200)            0         
                                                                 
 dense (Dense)               (None, 128)               14745728  
                                                                 
 dropout (Dropout)           (None, 128)               0         
                                                                 
 dense_1 (Dense)             (None, 64)                8256      
                                                                 
 dropout_1 (Dropout)         (None, 64)                0         
                                                                 
 dense_2 (Dense)             (None, 1)                 65        
=================================================================                                                               Total params: 14848193 (56.64 MB)
Trainable params: 14847745 (56.64 MB)
Non-trainable params: 448 (1.75 KB)
_________________________________________________________________

Note: Based on your CPU/GPU performance, epoch training time will differ.

model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy']) 
history = model.fit(train_ds,epochs=10,validation_data=validation_ds)
Expected output:
Epoch 1/10
625/625 [==============================] - 921s 1s/step - loss: 0.0904 - accuracy: 0.9689 - val_loss: 0.8111 - val_accuracy: 0.8330
Epoch 2/10
625/625 [==============================] - 948s 2s/step - loss: 0.0630 - accuracy: 0.9764 - val_loss: 0.6527 - val_accuracy: 0.8182
Epoch 3/10
625/625 [==============================] - 925s 1s/step - loss: 0.0626 - accuracy: 0.9793 - val_loss: 0.6581 - val_accuracy: 0.8398
Epoch 4/10
625/625 [==============================] - 945s 2s/step - loss: 0.0574 - accuracy: 0.9790 - val_loss: 0.7771 - val_accuracy: 0.8210
Epoch 5/10
625/625 [==============================] - 943s 2s/step - loss: 0.0489 - accuracy: 0.9829 - val_loss: 0.6356 - val_accuracy: 0.8346
Epoch 6/10
625/625 [==============================] - 946s 2s/step - loss: 0.0481 - accuracy: 0.9820 - val_loss: 0.6537 - val_accuracy: 0.8294
Epoch 7/10
625/625 [==============================] - 943s 2s/step - loss: 0.0507 - accuracy: 0.9819 - val_loss: 0.7618 - val_accuracy: 0.8138
Epoch 8/10
625/625 [==============================] - 924s 1s/step - loss: 0.0511 - accuracy: 0.9818 - val_loss: 1.0860 - val_accuracy: 0.8326
Epoch 9/10
625/625 [==============================] - 918s 1s/step - loss: 0.0435 - accuracy: 0.9847 - val_loss: 1.7417 - val_accuracy: 0.7792
Epoch 10/10
625/625 [==============================] - 932s 1s/step - loss: 0.0340 - accuracy: 0.9869 - val_loss: 0.8242 - val_accuracy: 0.8322
pip install opencv-python
import cv2
import matplotlib.pyplot as plt
test_img = cv2.imread('openshift-ai/2_Cat-dog-prediction/cat.jpg')
plt.imshow(test_img)

<matplotlib.image.AxesImage at 0x7f025c05dee0>

Test image
test_img = cv2.resize(test_img,(256,256))
test_input = test_img.reshape((1,256,256,3))
model.predict(test_input)

1/1 [==============================] - 0s 37ms/step

array([[0.]], dtype=float32)

Lesson 2:

To save time on model training, you can use the pre-trained model provided in the GitHub repo. Execute the following code snippet in your Jupyter notebook to predict using the pre-trained model.

# Fetch the archived model
! cd /opt/app-root/src/openshift-ai/2_Cat-dog-prediction/keras_model && cat my_dog_cat_model.zip.001  my_dog_cat_model.zip.002  my_dog_cat_model.zip.003  my_dog_cat_model.zip.004  my_dog_cat_model.zip.005  my_dog_cat_model.zip.006  my_dog_cat_model.zip.007  > my_dog_cat_model.zip && unzip my_dog_cat_model.zip && mv my_model.keras my_dog_cat_model.keras
# Use the pre-trained model

! pip install opencv-python
import cv2
import tensorflow as tf
from tensorflow import keras
import numpy as np
from PIL import Image
from matplotlib import pyplot as plt


test_img = cv2.imread('openshift-ai/2_Cat-dog-prediction/cat.jpg')

model = keras.models.load_model("keras_model/my_dog_cat_model.keras")
plt.imshow(test_img)

test_img.shape

test_img = cv2.resize(test_img,(256,256))
test_input = test_img.reshape((1,256,256,3))
model.predict(test_input)

1/1 [==============================] - 0s 189ms/step

Out[4]:
array([[0.]], dtype=float32)
 

Test image

The model's predictions will likely be delivered as an array. Within this array, a value close to "0" (depending on the chosen activation function) typically indicates that the image is classified as a cat. Conversely, a value closer to "1" (or another chosen value) suggests the image is classified as a dog.

4. Kafka setup

Apache Kafka is an open-source distributed streaming platform renowned for its application in stream processing, facilitating real-time data pipelines, and supporting large-scale data integration.

Red Hat OpenShift provides Kafka-related operators, enabling us to utilize 'AMQ Streams' for configuring Kafka operations. Here's a guide that walks you through the installation process of Kafka (AMQ Streams): Getting Started with AMQ Streams on OpenShift.

Alternatively, you can set up a containerized environment with Kafka and Zookeeper using Podman.

5. Kafka Producer

Simulate live image streaming with Apache Kafka

This section explores how to simulate a live image stream using Apache Kafka. In this scenario, the producer acts as a source, continuously sending images captured from simulated events. We'll leverage the Kafka-Python package to achieve this functionality.

Create a New Jupyter Notebook

Initiate a new notebook within your OpenShift AI environment. Click on the "+" icon typically located beside Notebook, or from "Files", select "New file".

Replace "Kafka_server_endpoint:9092" with your Kakfa endpoint and "my-topic" with your Kakfa topic name in the code below.

! pip install kafka-python opencv-python
import cv2
from kafka import KafkaProducer
producer=KafkaProducer(bootstrap_servers=['Kafka_server_endpoint:9092'],api_version=(0,10,1))
image = cv2.imread("/opt/app-root/src/openshift-ai/2_Cat-dog-prediction/cat.jpg")
ret, buffer = cv2.imencode('.jpg', image)
producer.send("my-topic",buffer.tobytes())

6. Kafka Consumer

The Kafka consumer operates by issuing fetch requests to the brokers responsible for the partitions it intends to consume. These requests include the consumer's offset within the log, indicating the starting point for data retrieval. Upon receiving the request, the broker transmits a chunk of the log data starting from the specified offset position. This mechanism ensures efficient data retrieval and prevents consumers from processing duplicate messages.

Replace "Kafka_server_endpoint:9092" with your Kakfa endpoint and "my-topic" with your Kakfa topic name in the code below.

The following code will retrieve the images of animals from the "my-topic" topic in Kafka for our specific scenario:

from PIL import Image
from io import BytesIO
from kafka import KafkaConsumer
consumer = KafkaConsumer("my-topic",bootstrap_servers=['Kafka_server_endpoint:9092'],
                       api_version=(0,10,1))
for message in consumer:
   stream = BytesIO(message.value)
   image = Image.open(stream).convert("RGBA")
   print(image)
   stream.close()
   image.show()

The Kafka producer and consumer facilitate the transmission and reception of data between endpoints, enabling the emulation of live streaming data activity.

7.  Database setup PostgreSQL

This step explores using PostgreSQL as a relational database management system within your OpenShift environment. 

  • Log into the OpenShift cluster from the developer perspective.

The Web Terminal enables interaction with the OpenShift cluster via the CLI, simplifying the deployment of the PostgreSQL database pod in the cluster. To get started, open the web terminal from the OpenShift Developer Sandbox.

  • In the top right corner, you will find the Web Terminal icon.
  • Click on it, as illustrated in the Figure 4, below.
Launch Web Terminal
Figure 4: Launch Web Terminal.
  • The command line terminal will appear at the bottom of the browser window.

To deploy PostgreSQL in the cluster, copy the following command and paste it into the Web Terminal.

$ oc new-app --image=postgres --name=postgres-db -e POSTGRES_USER=admin -e POSTGRES_PASSWORD=redhat -e POSTGRES_DB=animal -e PGDATA=/var/lib/postgresql/data/pgdata1

After successfully deploying PostgreSQL, you will see the running pod in the Topology view in Developer perspective, as shown in Figure 5, below.

Running PostgreSQL pod in Topology view
Figure 5: Running PostgreSQL pod in Topology view.

To store the image predictions in our project, we'll create a table named "animal" within the PostgreSQL database. This table will include a column named "prediction" to store the predicted class (e.g., "cat" or "dog").

Here's how to access the PostgreSQL console for interacting with the database.

  • Click on the running pod.
  • Select the logs.
  • Choose the "Terminal" option from tabs.
$ psql -U admin -d animal

To create tables and columns, use the following query:

CREATE TABLE animal (
 id SERIAL PRIMARY KEY,
    prediction FLOAT
);

Check the created table using the following query:

SELECT * FROM animal;

After executing the query, you will see the results on your pod terminal, as shown in Figure 6, below.

Terminal access of PostgreSQL
Figure 6: Terminal access of PostgreSQL.

8. Consumer integration with database code

Merely having a consumer isn't sufficient to achieve real-time data processing. To accomplish this, we need to integrate prediction and database injection functionalities.

Replace the consumer code used in the previous step with the following code. Replace "Kafka_server_endpoint:9092" with your Kakfa endpoint and "my-topic" with your Kakfa topic name in the code below.

Replace "<postgresql-server>" with PostgreSQL endpoint.

The following code will wait for incoming data on the consumer side and subsequently execute image processing (image prediction) and database entry procedures:

! pip install kafka-python opencv-python psycopg2-binary
import numpy as np
from kafka import KafkaConsumer
import cv2
import psycopg2
consumer = KafkaConsumer("my-topic",bootstrap_servers=['Kafka_server_endpoint:9092'],
                       api_version=(0,10,1))
# Function to process the image
def process_image(image):
   # Convert bytes to numpy array
   nparr = np.frombuffer(image, np.uint8)
   # Decode image
   img = cv2.imdecode(nparr, cv2.IMREAD_COLOR)
   # Reshape and resize
   img = cv2.resize(img, (256, 256))
   img = img.reshape((1,256,256,3))
   return img
def save_db(prediction):
   # Connect to PostgreSQL database
   conn = psycopg2.connect(
       dbname="animal",
       user="admin",
       password="redhat",
       host="<postgresql-server>",
       port="5432"
   )
   # Create a cursor
   cur = conn.cursor()
   try:
       # Execute the INSERT statement
       cur.execute("INSERT INTO animal (prediction) VALUES (%s)", (prediction,))
       # Commit the transaction
       conn.commit()
       print("Prediction saved to the database successfully!")
   except Exception as e:
       # Rollback the transaction if any error occurs
       conn.rollback()
       print("Error while saving prediction to the database:", e)
   finally:
       # Close the cursor and connection
       cur.close()
       conn.close()
for message in consumer:
   processed_image = process_image(message.value)
   prediction = model.predict(processed_image)
   save_db(float(prediction[0][0]))

On the consumer side, we handle and store the data in the database.

9. Test the end-to-end activity with the Kafka and prediction code base

Execute the consumer code now, ensuring it's positioned below the prediction codebase. Click on the "Execute" button. It will run in a loop, as shown in Figure 7, below.

In the Jupyter Notebook, the consumer cell was executed.
Figure 7: Consumer code executed.

10. Load the image from producer side 

It's time to execute the producer. From the producer, we have loaded an image of a dog. This image will simulate the live data streaming activity captured by the cameras.

Replace the producer code used in the previous step with the following code. Replace "Kafka_server_endpoint:9092" with your Kakfa endpoint and "my-topic" with your Kakfa topic name in the code below.

! pip install kafka-python opencv-python
import cv2
from kafka import KafkaProducer
producer=KafkaProducer(bootstrap_servers=['Kafka_server_endpoint:9092'],api_version=(0,10,1))
image = cv2.imread("/opt/app-root/src/openshift-ai/2_Cat-dog-prediction/dog.jpg")
ret, buffer = cv2.imencode('.jpg', image)
producer.send("my-topic",buffer.tobytes())

<kafka.producer.future.FutureRecordMetadata at 0x7f999d725640>

On the consumer side, you will receive confirmation logs, as shown in Figure 8.

After receiving the image from the producer, the consumer commenced processing.
Figure 8: output of consumer after process.

Lesson 3:

Verifying prediction results in PostgreSQL

Once you've successfully executed the producer and consumer, it's crucial to confirm that the predicted classifications have been inserted into the database. Here's how to verify the results from a developer's perspective:

Accessing the PostgreSQL Console:

  • Navigate to the OpenShift web console.
  • Locate the pod corresponding to your PostgreSQL deployment configuration. This pod should be in a "Running" state.

Viewing Database Logs:

  • Within the identified pod details, locate the section for logs.
  • Select the option labeled "Terminal" to establish a terminal connection to the pod environment.

Executing SQL Queries:

  • Once you have access to the terminal, log into the PostgreSQL database using the following command:
$psql -U admin -d animal
  • To check the entry from the image prediction, please use the following SQL query:
# SELECT * FROM animal;

 

After executing the consumer, the predictions will be stored in the database.
Figure 9: Checking column and table of database.

The complete code for image prediction is provided in this repository. You can simply clone this repository in Jupyter Notebook and add your "kaggle.json" file to it. Following that, you can execute the code.

Summary

This learning exercise explored real-time data processing using Apache Kafka (represented by AMQ Streams in this case). We learned to build a producer-consumer system that streams animal images, trains a machine learning model for image prediction, and stores the results in a PostgreSQL database. The exercise covered fetching the dataset, training the model, and integrating the prediction with data streaming.

Our goal is to make your data science projects successful, not just as experiments, but as part of the next generation of intelligent applications.

Previous resource
Overview: Real-time Data Collection and Processing using AI/ML on OpenShift AI
Next resource
Additional resources