Building Production ML Pipelines with KitOps and Vertex AI: A Practical Guide
Ship ML models like you ship code—packaged, versioned, and ready for production.
If you’ve ever struggled with deploying machine learning models to production, you know the pain points: inconsistent environments, missing dependencies, “it worked on my machine” syndrome, and the eternal question of “which version of the model are we running?”
Today, I’m walking you through a game-changing approach that combines KitOps, an open-source ML packaging tool, with Google Cloud’s Vertex AI Pipelines to create reproducible, production-ready ML workflows. Think of it as “Docker for ML models,” but better.
Why This Matters
Traditional ML deployment is messy. You’ve got:
Model files are scattered across different storage locations
Training data that nobody can find when you need to retrain
Code dependencies that break when someone updates a library
Zero traceability about what went into training that model
KitOps solves this by packaging everything—model weights, training code, datasets, configurations—into a single, versioned artifact called a ModelKit. These ModelKits use the OCI (Open Container Initiative) standard, meaning they work with existing container registries and DevOps tooling.
The Core Concept: ModelKits
A ModelKit is like a shipping container for your ML model. Here’s what makes it special:
yaml
# Kitfile - Think of this as your model’s blueprint
manifestVersion: v1.0
package:
name: sentiment-classifier
version: 1.0.0
description: BERT-based sentiment classification model
authors:
- name: ML Team
email: ml-team@company.com
model:
name: bert-sentiment
path: ./models/model.pkl
framework: scikit-learn
version: 1.0.0
code:
- path: ./src/train.py
- path: ./src/predict.py
- path: ./src/requirements.txt
datasets:
- name: training_data
path: ./data/train.csv
description: 50k labeled reviews
config:
- path: ./config/hyperparameters.yamlWhat’s happening here? This manifest describes everything needed to reproduce your model. No more hunting for training scripts or wondering which dataset version was used. It’s all declared, versioned, and packaged together.
Building Your First ModelKit
Let’s package a real sentiment analysis model. First, organize your project:
bash
ml-project/
├── Kitfile # Your manifest
├── models/
│ └── model.pkl # Trained model
├── src/
│ ├── train.py # Training logic
│ └── predict.py # Inference code
├── data/
│ └── train.csv # Training data
└── config/
└── hyperparameters.yamlNow pack it:
bash
# Pack everything into a ModelKit
kit pack . -t us-central1-docker.pkg.dev/my-project/ml-models/sentiment:v1.0.0
# Push to your registry (works with any OCI-compliant registry)
kit push us-central1-docker.pkg.dev/my-project/ml-models/sentiment:v1.0.0Behind the scenes: KitOps creates immutable layers (just like Docker) for each component. Change your model? Only that layer updates. Change your dataset? Only the dataset layer changes. This makes transfers fast and storage efficient.
Integrating with Vertex AI Pipelines
Here’s where it gets powerful. Vertex AI Pipelines orchestrate your ML workflows—data preprocessing, training, evaluation, deployment. By integrating KitOps, you get reproducibility at every step.
Step 1: Unpack the ModelKit in Your Pipeline
python
from kfp.v2.dsl import component, Output, Dataset
@component(
base_image=”python:3.9”,
packages_to_install=[’pykitops’, ‘google-cloud-storage’]
)
def unpack_modelkit_op(
modelkit_uri: str,
output_path: Output[Dataset]
) -> dict:
“”“
Pulls a ModelKit from the registry and unpacks it.
Returns paths to all components for downstream tasks.
“”“
import subprocess
import os
# Pull the ModelKit (like docker pull)
subprocess.run([’kit’, ‘pull’, modelkit_uri], check=True)
# Unpack to a directory
unpack_dir = output_path.path
os.makedirs(unpack_dir, exist_ok=True)
subprocess.run([
‘kit’, ‘unpack’, modelkit_uri,
‘-d’, unpack_dir
], check=True)
# Return structured paths for downstream components
return {
‘modelkit_uri’: modelkit_uri,
‘data_path’: f”{unpack_dir}/data”,
‘model_path’: f”{unpack_dir}/models”,
‘code_path’: f”{unpack_dir}/src”
}Key insight: This component is reusable. Any pipeline can start by unpacking a ModelKit, ensuring everyone works with the same artifacts.
Step 2: Train Your Model
python
@component(
base_image=”python:3.9”,
packages_to_install=[’pandas’, ‘scikit-learn’, ‘numpy’]
)
def train_model_op(
data_path: str,
model_output: Output[Model],
metrics: Output[Metrics]
):
“”“
Trains a model using data from the unpacked ModelKit.
Logs metrics for evaluation and stores the trained model.
“”“
import pandas as pd
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, f1_score
import pickle
# Load data from ModelKit
train_df = pd.read_csv(f”{data_path}/train.csv”)
X = train_df.drop(’target’, axis=1)
y = train_df[’target’]
# Train the model
model = RandomForestClassifier(n_estimators=100, random_state=42)
X_train, X_val, y_train, y_val = train_test_split(
X, y, test_size=0.2, random_state=42
)
model.fit(X_train, y_train)
# Evaluate
y_pred = model.predict(X_val)
accuracy = accuracy_score(y_val, y_pred)
f1 = f1_score(y_val, y_pred, average=’weighted’)
# Save the trained model
with open(model_output.path, ‘wb’) as f:
pickle.dump(model, f)
# Log metrics (visible in Vertex AI UI)
metrics.log_metric(”accuracy”, accuracy)
metrics.log_metric(”f1_score”, f1)
print(f”✓ Training complete: Accuracy={accuracy:.4f}, F1={f1:.4f}”)What’s different? The data path comes from the ModelKit, not some hardcoded bucket path. This means your training is reproducible—same ModelKit, same results.
Step 3: Conditional Deployment
python
from kfp.v2 import dsl
@dsl.pipeline(
name=”production-ml-pipeline”,
description=”Train and conditionally deploy models”
)
def training_pipeline(
project_id: str,
modelkit_uri: str,
deploy_threshold: float = 0.85
):
“”“
Complete pipeline: unpack → train → evaluate → conditionally deploy
“”“
# Unpack the ModelKit
unpack_task = unpack_modelkit_op(modelkit_uri=modelkit_uri)
# Train the model
train_task = train_model_op(
data_path=unpack_task.outputs[’output_path’]
)
# Only deploy if accuracy meets threshold
with dsl.Condition(
train_task.outputs[’metrics’].metadata[’accuracy’] >= deploy_threshold,
name=”accuracy-gate”
):
deploy_task = deploy_model_op(
project_id=project_id,
model_path=train_task.outputs[’model_output’]
)
# Package the new model as a ModelKit for versioning
pack_task = pack_new_modelkit_op(
model_path=train_task.outputs[’model_output’],
version=”v1.1.0”
)The magic: This pipeline has a quality gate. Poor-performing models never reach production. And when a good model is deployed, it’s automatically packaged as a new ModelKit version for traceability.
Real-World Pattern: Continuous Retraining
Let’s build something production-grade: a pipeline that checks for new data and re-trains automatically.
python
@component(packages_to_install=[’google-cloud-storage’])
def check_for_new_data_op(
bucket_name: str,
last_training_date: str
) -> bool:
“”“
Checks if new training data has arrived since last training.
Returns True if retraining is needed.
“”“
from google.cloud import storage
from datetime import datetime
client = storage.Client()
bucket = client.bucket(bucket_name)
# Check for new files in the training-data prefix
blobs = bucket.list_blobs(prefix=’training-data/’)
last_date = datetime.fromisoformat(last_training_date)
for blob in blobs:
if blob.time_created > last_date:
print(f”✓ New data found: {blob.name}”)
return True
print(”No new data found”)
return False
@dsl.pipeline(name=”continuous-training-pipeline”)
def continuous_training_pipeline(
project_id: str,
data_bucket: str,
current_modelkit_uri: str
):
“”“
Scheduled pipeline that retrains when new data arrives.
Run this daily via Cloud Scheduler.
“”“
# Check for new data
check_task = check_for_new_data_op(
bucket_name=data_bucket,
last_training_date=”2025-01-01T00:00:00”
)
# Only proceed if new data exists
with dsl.Condition(check_task.output == True):
# Unpack current ModelKit
unpack_task = unpack_modelkit_op(
modelkit_uri=current_modelkit_uri
)
# Load and merge new data
load_task = load_new_data_op(bucket_name=data_bucket)
merge_task = merge_datasets_op(
existing_data=unpack_task.outputs[’output_path’],
new_data=load_task.outputs[’data_path’]
)
# Retrain with updated data
train_task = train_model_op(
data_path=merge_task.outputs[’merged_data’]
)
# Deploy if model improves
with dsl.Condition(
train_task.outputs[’metrics’].metadata[’accuracy’] > 0.90
):
deploy_task = deploy_model_op(
project_id=project_id,
model_path=train_task.outputs[’model_output’]
)
# Create new ModelKit version
pack_task = pack_new_modelkit_op(
model_path=train_task.outputs[’model_output’],
version=”auto-increment” # Bumps version automatically
)Why this works: The pipeline is self-managing. Schedule it to run daily, and it automatically detects new data, retrains, evaluates, and deploys—all while maintaining version history through ModelKits.
Advanced Pattern: Model Promotion Pipeline
Production ML needs multiple environments: dev, staging, production. Here’s how to build a promotion pipeline:
python
@component
def validate_modelkit_op(
modelkit_uri: str,
validation_tests: list
) -> dict:
“”“
Runs a battery of validation tests on a ModelKit.
Tests include: schema validation, smoke tests, security scans.
“”“
import subprocess
results = {’passed’: True, ‘tests’: {}}
# Pull the ModelKit
subprocess.run([’kit’, ‘pull’, modelkit_uri], check=True)
for test in validation_tests:
try:
# Run each test (implementation depends on your needs)
test_result = run_validation_test(test, modelkit_uri)
results[’tests’][test] = test_result
if not test_result:
results[’passed’] = False
print(f”✗ Test failed: {test}”)
except Exception as e:
results[’passed’] = False
results[’tests’][test] = str(e)
return results
@dsl.pipeline(name=”modelkit-promotion-pipeline”)
def promotion_pipeline(
dev_modelkit_uri: str,
staging_modelkit_uri: str,
prod_modelkit_uri: str
):
“”“
Three-stage promotion: DEV → STAGING → PRODUCTION
Each stage has gates that must pass before promotion.
“”“
# Stage 1: Validate in DEV
dev_validation = validate_modelkit_op(
modelkit_uri=dev_modelkit_uri,
validation_tests=[
‘schema_check’,
‘smoke_test’,
‘security_scan’,
‘bias_detection’
]
)
# Gate 1: Only promote if dev validation passes
with dsl.Condition(
dev_validation.outputs[’passed’] == True,
name=”dev-gate”
):
# Promote to STAGING
staging_promote = promote_modelkit_op(
source_uri=dev_modelkit_uri,
target_uri=staging_modelkit_uri
)
# Stage 2: Run integration tests in STAGING
staging_tests = run_integration_tests_op(
modelkit_uri=staging_modelkit_uri,
test_endpoint=”staging-api.company.com”
)
# Gate 2: Only promote to production if staging tests pass
with dsl.Condition(
staging_tests.outputs[’passed’] == True,
name=”staging-gate”
):
# Final promotion to PRODUCTION
prod_promote = promote_modelkit_op(
source_uri=staging_modelkit_uri,
target_uri=prod_modelkit_uri
)
# Deploy to production endpoint with blue/green deployment
prod_deploy = deploy_to_production_op(
modelkit_uri=prod_modelkit_uri,
deployment_strategy=”blue-green”
)Production insight: This pattern enforces quality gates and prevents bad models from reaching production. Each ModelKit version must pass tests at every stage before promotion.
CI/CD Integration: GitHub Actions
Let’s automate everything with GitHub Actions:
yaml
# .github/workflows/ml-pipeline.yaml
name: ML Pipeline with KitOps
on:
push:
branches: [main]
paths:
- ‘models/**’
- ‘data/**’
- ‘Kitfile’
env:
PROJECT_ID: ${{ secrets.GCP_PROJECT_ID }}
REGION: us-central1
jobs:
build-and-deploy:
runs-on: ubuntu-latest
steps:
- name: Checkout code
uses: actions/checkout@v4
- name: Authenticate to Google Cloud
uses: google-github-actions/auth@v2
with:
workload_identity_provider: ${{ secrets.WIF_PROVIDER }}
service_account: ${{ secrets.WIF_SERVICE_ACCOUNT }}
- name: Install Kit CLI
run: |
curl -L https://github.com/kitops-ml/kitops/releases/latest/download/kitops-linux-x86_64.tar.gz | tar -xz
sudo mv kit /usr/local/bin/
kit version
- name: Build and push ModelKit
run: |
VERSION=$(cat version.txt)
MODELKIT_URI=”${REGION}-docker.pkg.dev/${PROJECT_ID}/ml-models/sentiment:${VERSION}”
# Pack the ModelKit
kit pack . -t $MODELKIT_URI
# Push to registry
gcloud auth configure-docker ${REGION}-docker.pkg.dev
kit push $MODELKIT_URI
echo “MODELKIT_URI=$MODELKIT_URI” >> $GITHUB_ENV
- name: Trigger Vertex AI Pipeline
run: |
pip install google-cloud-aiplatform kfp
python scripts/trigger_pipeline.py \
--project-id $PROJECT_ID \
--region $REGION \
--modelkit-uri $MODELKIT_URIWhat happens: Every time you push changes to your model, this workflow automatically:
Packages it as a ModelKit
Pushes to your registry
Triggers the training pipeline on Vertex AI
Best Practices I’ve Learned
1. Version Everything Semantically
yaml
# Use semantic versioning in your Kitfile
package:
name: fraud-detector
version: 2.1.0 # MAJOR.MINOR.PATCH
# 1.0.0 → Initial release
# 1.1.0 → New feature (backward compatible)
# 1.1.1 → Bug fix
# 2.0.0 → Breaking change2. Document Your Models
yaml
# Include comprehensive metadata
package:
name: fraud-detector
description: |
XGBoost model for real-time fraud detection.
Trained on 2M transactions from Q4 2024.
metadata:
training_date: “2025-01-15”
dataset_version: “v3.2”
performance_metrics:
accuracy: 0.947
precision: 0.923
recall: 0.951
f1_score: 0.9373. Use Type Hints and Clear Naming
python
@component(base_image=”python:3.9”)
def preprocess_data_op(
input_data: Input[Dataset],
output_data: Output[Dataset],
scaling_method: str = “standard”,
handle_missing: str = “mean”
) -> dict:
“”“
Clear docstrings explain what each component does.
Type hints make inputs/outputs explicit.
“”“
pass4. Implement Robust Error Handling
python
@component(packages_to_install=[’google-cloud-logging’])
def robust_training_op(data_path: str, model_output: Output[Model]):
“”“Training with comprehensive logging and error handling”“”
import logging
from google.cloud import logging as cloud_logging
# Setup Cloud Logging
logging_client = cloud_logging.Client()
logging_client.setup_logging()
logger = logging.getLogger(__name__)
try:
logger.info(”Starting model training”)
# Validate inputs
if not os.path.exists(data_path):
raise FileNotFoundError(f”Data not found: {data_path}”)
# Train model
model = train_model(data_path)
logger.info(”Training completed successfully”)
# Save model
save_model(model, model_output.path)
except Exception as e:
logger.error(f”Training failed: {str(e)}”, exc_info=True)
raise # Re-raise to fail the pipelineTroubleshooting Common Issues
Authentication Errors
bash
# If kit pull fails with auth errors:
gcloud auth configure-docker ${REGION}-docker.pkg.dev
# Verify your credentials
gcloud auth list
# Check repository permissions
gcloud artifacts repositories get-iam-policy ml-models --location=${REGION}Missing Dependencies
python
# Always specify exact versions in component decorators
@component(
base_image=”python:3.9”,
packages_to_install=[
‘pandas==2.0.0’, # Exact versions prevent surprises
‘scikit-learn==1.3.0’,
‘numpy==1.24.0’
]
)
def reliable_component():
passThe Results
After implementing this architecture, here’s what you get:
Reproducibility: Anyone can pull a ModelKit and get exactly the same artifacts you used
Traceability: Full lineage from raw data to deployed model.
Collaboration: Data scientists and ML engineers work from the same artifacts.
Compliance: Immutable, signed ModelKits satisfy audit requirements (EU AI Act ready)
Speed: Efficient layer caching means faster deployments
Getting Started
Install Kit CLI:
bash
# macOS
brew tap kitops-ml/kitops
brew install kitops
# Linux
curl -L https://github.com/kitops-ml/kitops/releases/latest/download/kitops-linux-x86_64.tar.gz | tar -xz
sudo mv kit /usr/local/bin/Set up Google Cloud:
bash
gcloud services enable aiplatform.googleapis.com artifactregistry.googleapis.com
gcloud artifacts repositories create ml-models --repository-format=docker --location=us-central1Create your first ModelKit:
bash
# Create a Kitfile
kit init
# Pack your model
kit pack . -t us-central1-docker.pkg.dev/my-project/ml-models/my-model:v1.0.0
# Push to registry
kit push us-central1-docker.pkg.dev/my-project/ml-models/my-model:v1.0.0Final Thoughts
The combination of KitOps and Vertex AI solves a critical problem in ML: the gap between experimentation and production. By packaging models as OCI artifacts and orchestrating with Vertex AI Pipelines, you get:
Development velocity: No more “works on my machine”
Production confidence: Reproducible, auditable deployments
Team alignment: Everyone works with versioned, shared artifacts
The best part? This isn’t theoretical. Teams are using this in production today for everything from fraud detection to recommendation systems to computer vision.
Resources:






Thanks for the simplified functioning walkthrough of CI/CD automation and services to the same for the good 😊