Data Engineering : Extract live Data Collection from images and logs

In this learning exercise, we are going to explore the complete MLOps pipeline, utilizing OpenShift AI. The MLOps pipeline includes storing models in object storage like S3, fetching models in OpenShift AI, and model serving.

Start your OpenShift AI

Prerequisites:


Step-by-step guide:

1. Create a new workbench and save the trained model

  • Select the TensorFlow image for the workbench Notebook.
  • Set the container Deployment size to Medium.
  • Click on the "Create workbench" button.

Please refer to the following figure 1 for guidance.

To complete this learning exercise, create a new workbench.
Figure 1: Creating Workbench.

When the workbench is in the running state, click on the "Open" button (↗) to access the Jupyter Notebook.

You will be directed to the Jupyter Notebook hosted on OpenShift AI. For this learning exercise, you can utilize a pre-trained model accessible here.

Clone the GitHub repository into your Jupyter Notebook.

From the top menu, click on the Git icon. In the popup window, specify the following URL: https://github.com/redhat-developer-demos/openshift-ai.git and check out in the kubeflow_mlops directory.

2. Push the trained model to the AWS S3 bucket using the boto package

import boto3
# Replace with your AWS access key ID and secret access key
AWS_ACCESS_KEY_ID = 'Access_Key'
AWS_SECRET_ACCESS_KEY =’Secret-Key'
# Optional: Specify the AWS region where your S3 bucket resides
AWS_REGION = 'ap-south-1'  # Example region
# Create an S3 client object
s3_client = boto3.client('s3',
                         aws_access_key_id=AWS_ACCESS_KEY_ID,
                         aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
                         region_name=AWS_REGION)
                         
# s3_client = boto3.client('s3')
s3_client.upload_file(
    Filename='image_classification_model.onnx',
    Bucket='intel-landscape',
    Key='converted_model/image_classification_model.onnx'
)

Check your S3 bucket in AWS account to verify that your bucket lists the model as shown below in figure 2.

By using python pushed model in S3 bucket.
Figure 2: Model listed in AWS S3 bucket.

3. Fetch the model in OpenShift AI

Access the OpenShift AI dashboard within your cluster to initiate preparations for model retrieval.

3.1 Data connections

Incorporating a data connection into your project enables you to establish links between data inputs and your workbenches.

For handling extensive datasets, consider storing data in an S3-compatible object storage bucket to prevent local storage depletion. Additionally, you can link the data connection with an existing workbench lacking a connection. This facilitates seamless data access and management within your project workflow. The following steps will help you to create the data connections.

  • Select "Data Science Project" from the left-hand menu.
  • Scroll down and click on "Components".
  • Under "Data Connection" select "Add Data Connections".
  • Enter the details for the data connection name.
  • Enter the access key and secret key of your AWS account.
  • Enter the endpoint of the AWS S3 bucket.
    [ Based on your region, add the endpoint using this list.]
  • Specify the region where the S3 bucket is located.
  • Specify the name of the bucket.
  • Click on "Add Data Connections" to save the connection.

Please refer to the following figure 3 for guidance.

Create data connections.
Figure 3: Create data connections.
3.2 Models and model server

Deploy a trained data science model for serving intelligent applications. The model is deployed with an endpoint, enabling applications to send requests for processing.

  • From the left menu, select "Data Science Projects".
  • Under "Data Connections" select "Models and Model Servers".
  • Click on "Add Model Server".
  • Specify the name of the model server.
  • Select "OpenVINO Model Server" as the serving runtime from the dropdown menu.
  • Set the "Model Server Replicas" to "1". Depending on your requirements, you can adjust the number of replicas.
    [Define the resource allocation per replica. The model server size will be set to "small" for this learning exercise.]
  • Check the box for "Model Route". This provides an external access domain URL to reach the model server. Depending on your security compliance requirements, select whether to use Token authentication. For this learning exercise, we will not use tokens, so uncheck the box for "Require Token Authentication".
  • Click on "Add" to finalize the configuration.

You can refer to the following figure 4 for configuring the model server.

Create a model server based on the sizing and resource requirements of the model.
Figure 4: Create Model server.
3.3 Deploy the Model server

Within the "Models and Model Server" section, you will find the recently created model server listed, as shown in the accompanying figure 5.

Listed Model server.
Figure 5: Listed Model server.

Click on the "Deploy Model" button located in the row of the newly created model server.

After clicking, you will be presented with a popup form that you need to fill out according to the requirements of this learning exercise.

  • Specify the name of the model server.
  • Choose the "ONNX-1" framework from the dropdown menu of "Model Framework".
  • In the "Model Location" section, select the existing data connection created above.
  • Choose the name of the data connection from the dropdown menu, which we created earlier.
  • Add the path of the model under the AWS S3 bucket.
  • Click on the "Deploy" button to initiate the deployment process.

Please refer to the following figure 6 for guidance.

Configure the model location so that the model server can fetch the model from S3.
Figure 6: Model deployment.

Upon deployment, ensure that the status of the deployed model server displays a green tick mark, as shown in the following figure 7.

The green tick indicates that it has loaded successfully.
Figure 7: Model successfully deployed.

4. Process the prediction with the deployed model

Pre-processing is necessary for the deployed model prior to fulfilling requests. The following code will assist you in accomplishing this task.

Image pre-processing

import cv2
import numpy as np
import requests
import json
# Function load image from disk
def load_image(image_path):
    return cv2.imread(image_path)
# Function to preprocess image as per model requirements
# More details at: https://github.com/onnx/models/tree/main/validated/vision/classification/resnet
def preprocess_image(img):
    # Resize image to minimum size of 256x256 while maintaining aspect ratio
    min_size = min(img.shape[:2])
    scale_factor = 256 / min_size
    new_size = (int(img.shape[1] * scale_factor), int(img.shape[0] * scale_factor))
    img_resized = cv2.resize(img, new_size)
    # Crop 224x224 from the center
    center_x = new_size[0] // 2
      center_y = new_size[1] // 2
    half_crop = 112
    img_cropped = img_resized[center_y - half_crop:center_y + half_crop, center_x - half_crop:center_x + half_crop]
    # Normalize pixel values
    mean = np.array([0.485, 0.456, 0.406]) * 255
    std = np.array([0.229, 0.224, 0.225]) * 255
    img_normalized = (img_cropped - mean) / std
    # Transpose image from HWC to CHW layout
    img_transposed = np.transpose(img_normalized, (2, 0, 1))
    return img_transposed
# Function to convert image data to flat array
def image_to_flat_array(image_data):
    return image_data.flatten().tolist()
# Function to convert image data to JSON format
def image_to_json(image_data):
    return json.dumps({"inputs": [{"name": "data", "shape": [1, 3, 224, 224], "datatype": "FP32", "data": image_data}]})

For image prediction using the deployed ONNX model on OpenShift AI, copy the "Inference Endpoint" from the listed deployed model, as shown in the following figure 8.

Copy the URL of Inference endpoint.
Figure 8: Copy the inference endpoint.

Insert your inference endpoint into the following code, replacing "url=".

# Path to the image file
image_path = 'bee.jpg'
# Load image
image = load_image(image_path)
# Preprocess image
image_processed = preprocess_image(image)
# Convert image to flat array and JSON format
image_flat = image_to_flat_array(image_processed)
image_json = image_to_json(image_flat)
# Send request to OpenVINO server
url = 'https://testing-mlops.apps.cluster-5djnl.dynamic.redhatworkshops.io/v2/models/testing/infer'
headers = {'Content-Type': 'application/json'}
try:
    response = requests.post(url, data=image_json, headers=headers)
    if response.status_code == 200:
        # Parse response
        results = json.loads(response.text)
        # Get the top-5 class labels
        with open('imagenet_classes.txt', 'r') as f:
            class_labels = f.read().splitlines()
        # Get the top-1 prediction
        predictions = np.array(response.json()['outputs'][0]['data'])
        top_prediction_idx = np.argmax(predictions)
        top_prediction_label = class_labels[top_prediction_idx]
        print("Top prediction:", top_prediction_label)
except Exception as e:
    print("Error:", e)

Sample result: Top prediction: n02206856 bee

We have successfully obtained the image prediction values. Now, it's time to integrate with the Flask application, which will operate on POST and GET APIs hosted on port 5000.

! pip install flask
from flask import Flask, request, render_template
import requests
import json
import numpy as np
app = Flask(__name__)
# Function to preprocess image (replace this with your actual preprocessing function)
def preprocess_image(image):
    # Your preprocessing code here
    return image_processed
# Function to load class labels
def load_class_labels():
    with open('imagenet_classes.txt', 'r') as f:
        class_labels = f.read().splitlines()
    return class_labels
# Function to perform inference
def perform_inference(image):
    # Preprocess image
    image_processed = preprocess_image(image)
    
    # Convert image to flat array and JSON format
    image_flat = image_to_flat_array(image_processed)
    image_json = image_to_json(image_flat)
    # Send request to OpenVINO server
    url = 'https://testing-mlops.apps.cluster-5djnl.dynamic.redhatworkshops.io/v2/models/testing/infer'
    headers = {'Content-Type': 'application/json'}
    try:
        response = requests.post(url, data=image_json, headers=headers)
        if response.status_code == 200:
            # Parse response
            results = json.loads(response.text)
            # Get class labels
            class_labels = load_class_labels()
            # Get the top-1 prediction
            predictions = np.array(response.json()['outputs'][0]['data'])
            top_prediction_idx = np.argmax(predictions)
            top_prediction_label = class_labels[top_prediction_idx]
            return top_prediction_label
    except Exception as e:
        return "Error: {}".format(e)
@app.route('/', methods=['GET', 'POST'])
def index():
    if request.method == 'POST':
        # Check if the post request has the file part
        if 'file' not in request.files:
            return render_template('index.html', message='No file part')
        file = request.files['file']
        # If user does not select file, browser also
        # submit an empty part without filename
        if file.filename == '':
            return render_template('index.html', message='No selected file')
        if file:
            # Perform inference
            result = perform_inference(file)
            return render_template('index.html', message='Prediction: {}'.format(result))
    return render_template('index.html')
if __name__ == "__main__":
  app.run(debug=False, host="0.0.0.0", port=5000)

[notice] A new release of pip available: 22.2.2 -> 24.0

[notice] To update, run: pip install --upgrade pip 

* Serving Flask app '__main__'

* Debug mode: off 

WARNING: This is a development server. Do not use it in a production deployment. Use a production WSGI server instead. 

* Running on all addresses (0.0.0.0)

* Running on http://127.0.0.1:5000

* Running on http://10.132.2.11:5000

Press CTRL+C to quit 

127.0.0.1 - - [26/Apr/2024 16:08:23] "GET / HTTP/1.1" 200 -

127.0.0.1 - - [26/Apr/2024 16:08:27] "POST / HTTP/1.1" 200 -

To access the Flask application in your browser, duplicate the playbook URL and append "/proxy/5000" after the lab address. This is necessary because the JupyterLab server incorporates a built-in proxy server.

Example:

URL of Jupyter notebook:

https://kubeflow-mlops.apps.cluster-5djnl.dynamic.redhatworkshops.io/notebook/mlops/kubeflow/lab/tree/model%20serving.ipynb 

Flask application endpoint:

https://kubeflow-mlops.apps.cluster-5djnl.dynamic.redhatworkshops.io/notebook/mlops/kubeflow/proxy/5000

Replace the "lab" section with "proxy" and specify the port number where the application is deployed, as shown in the following figure 9.

The upload page allows you to submit the input for prediction.
Figure 9: Flask server hosted on port 5000.

Click on the "Image" button and choose the "Bee.jpeg" image. Then, select the "Upload" button. Result.html will display the prediction for that image, as shown in the following image.

This page provides you with a prediction of the image you uploaded.
Figure 10: Result window.

Summary 

This exercise covered deploying an ML model for image prediction. We learned to store the model in object storage (like AWS S3) and deploy it on OpenShift Model Serve. Finally, we used Python code within a Flask application to connect and send image prediction requests to the deployed model on OpenShift AI.

Previous resource
Overview: Data Engineering : Extract live Data Collection from images and logs
Next resource
Additional resources