Skip to main content
This recipe shows how to build an automated content ingestion pipeline using the Realtime API. You’ll learn to set up real-time content monitoring, process various content types, and create production-ready ingestion workflows that automatically make content available for search and AI interaction. We’ll build a complete content pipeline that watches for new files, extracts metadata, and uploads content to the Gloo AI platform with proper error handling and monitoring.

Prerequisites

Before starting, ensure you have:
The Realtime Ingestion API requires Bearer token authentication. If you haven’t set up authentication yet, follow the Authentication Tutorial to learn how to exchange your credentials for access tokens and manage token expiration.

Step 1: Understanding the Realtime API

The Realtime Ingestion API allows you to upload content that gets processed and made available for search and AI interaction. The primary endpoint is: POST /ingestion/v1/real_time_upload

Key Features

  • Real-time Processing: Content is processed upon upload
  • Rich Metadata: Support for comprehensive content categorization
  • Flexible Content Types: Articles, documents, media, and structured content
  • Automatic Indexing: Content becomes searchable instantly

Required Fields

{
  "content": "Your content text here",
  "publisherId": "Your publisher id here"
}

Optional Metadata Fields

  • Content Details: author, publication_date, item_title, item_subtitle, item_summary
  • Categorization: type, pub_type, denomination, item_tags
  • Media: item_image, item_url, hosted_url
  • Access Control: drm, evergreen
  • Hierarchical Structure: h2_title, h3_title for document sections

Step 2: Basic Content Upload

Let’s start with a simple content upload example. This demonstrates the core API call with proper authentication and error handling.

API Request Structure

{
  "content": "Your article content...",
  "publisherId": "b6ada63e-f1ce-4a16-80da-9864f58c5bd7",
  "item_title": "Developer Happiness",
  "item_subtitle": "Beyond the Code", 
  "author": ["Aspiring Developer"],
  "publication_date": "2025-08-26",
  "type": "Article",
  "pub_type": "technical",
  "item_tags": ["documentation", "reference"],
  "evergreen": true,
  "drm": ["aspen", "kallm"]
}

Expected Response

{
  "success": true,
  "message": "Processing started, data will be uploaded shortly.",
  "task_id": null,
  "batch_id": null,
  "processing_details": null
}

Step 3: Verifying Content Upload

After uploading content, you can check on progress through Gloo AI Studio.
  1. Log in to Gloo AI Studio.
  2. Navigate to the Data Engine section from the main Studio sidebar.
  3. Click on Your Data.
Your Data in Studio

Step 4: Setting Up File Monitoring

For automated content ingestion, you’ll want to monitor directories for new files and automatically process them. This creates a real-time content pipeline.

File Watching Strategy

  1. Monitor Target Directory: Watch for new files or changes
  2. Extract Metadata: Parse filename and content for metadata
  3. Validate Content: Ensure required fields are present
  4. Upload with Retry: Handle failures gracefully
  5. Log Results: Track successful and failed uploads

Step 5: Batch Processing Pipeline

For processing multiple files or handling large volumes of content, batch processing provides better performance and resource management.

Batch Processing Benefits

  • Rate Limiting: Control API call frequency
  • Progress Tracking: Monitor processing status
  • Error Recovery: Retry failed uploads
  • Resource Management: Efficient memory and network usage

Step 6: Production Considerations

Error Handling

  • Authentication Failures: Token refresh and retry
  • Rate Limiting: Exponential backoff
  • Network Issues: Connection retry logic
  • Validation Errors: Content preprocessing

Monitoring and Logging

  • Success Metrics: Upload counts and timing
  • Error Tracking: Failed uploads with reasons
  • Performance Monitoring: API response times
  • Health Checks: System status monitoring

Complete Examples

The following examples combine token management, file processing, and error handling into complete, production-ready solutions for each language. First, set up your environment variables in a .env file:
GLOO_CLIENT_ID=YOUR_CLIENT_ID
GLOO_CLIENT_SECRET=YOUR_CLIENT_SECRET
Or export them in your shell for Go and Java:
export GLOO_CLIENT_ID="your_actual_client_id_here"
export GLOO_CLIENT_SECRET="your_actual_client_secret_here"
import requests
import time
import json
import os
from datetime import datetime
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
from dotenv import load_dotenv

# Load environment variables from .env file
load_dotenv()

# --- Configuration ---
CLIENT_ID = os.getenv("GLOO_CLIENT_ID", "YOUR_CLIENT_ID")
CLIENT_SECRET = os.getenv("GLOO_CLIENT_SECRET", "YOUR_CLIENT_SECRET")
TOKEN_URL = "https://platform.ai.gloo.com/oauth2/token"
API_URL = "https://platform.ai.gloo.com/ingestion/v1/real_time_upload"
PUBLISHER_ID = "b6ada63e-f1ce-4a16-80da-9864f58c5bd7"  # Replace with your publisher ID

# --- State Management ---
access_token_info = {}

def get_access_token():
    """Retrieves a new access token."""
    headers = {"Content-Type": "application/x-www-form-urlencoded"}
    data = {"grant_type": "client_credentials", "scope": "api/access"}
    response = requests.post(TOKEN_URL, headers=headers, data=data, auth=(CLIENT_ID, CLIENT_SECRET))
    response.raise_for_status()
    token_data = response.json()
    token_data['expires_at'] = int(time.time()) + token_data['expires_in']
    return token_data

def is_token_expired(token_info):
    """Checks if the token is expired or close to expiring."""
    if not token_info or 'expires_at' not in token_info:
        return True
    return time.time() > (token_info['expires_at'] - 60)

def upload_content(content_data):
    """Uploads content to the Realtime API."""
    global access_token_info
    if is_token_expired(access_token_info):
        print("Token is expired or missing. Fetching a new one...")
        access_token_info = get_access_token()

    headers = {
        "Authorization": f"Bearer {access_token_info['access_token']}",
        "Content-Type": "application/json"
    }

    response = requests.post(API_URL, headers=headers, json=content_data)
    response.raise_for_status()
    return response.json()

def process_file(file_path):
    """Processes a single file and uploads its content."""
    try:
        with open(file_path, 'r', encoding='utf-8') as file:
            content = file.read()
        
        # Extract metadata from filename and content
        filename = os.path.basename(file_path)
        title = filename.replace('.txt', '').replace('.md', '').replace('_', ' ').title()
        
        content_data = {
            "content": content,
            "publisherId": PUBLISHER_ID,
            "item_title": title,
            "author": ["Automated Ingestion"],
            "publication_date": datetime.now().strftime("%Y-%m-%d"),
            "type": "Article",
            "pub_type": "technical",
            "item_tags": ["automated", "ingestion"],
            "evergreen": True,
            "drm": ["aspen", "kallm"]
        }
        
        result = upload_content(content_data)
        print(f"✅ Successfully uploaded: {title}")
        print(f"   Response: {result['message']}")
        return True
        
    except Exception as e:
        print(f"❌ Failed to process {file_path}: {e}")
        return False

class ContentHandler(FileSystemEventHandler):
    """Handles file system events for content monitoring."""
    
    def on_created(self, event):
        if not event.is_directory and event.src_path.endswith(('.txt', '.md')):
            print(f"📄 New file detected: {event.src_path}")
            time.sleep(1)  # Allow file write to complete
            process_file(event.src_path)

def start_file_watcher(watch_directory):
    """Starts monitoring a directory for new content files."""
    if not os.path.exists(watch_directory):
        os.makedirs(watch_directory)
        print(f"Created watch directory: {watch_directory}")
    
    event_handler = ContentHandler()
    observer = Observer()
    observer.schedule(event_handler, watch_directory, recursive=True)
    
    print(f"🔍 Monitoring directory: {watch_directory}")
    print("   Supported file types: .txt, .md")
    print("   Press Ctrl+C to stop")
    
    observer.start()
    try:
        while True:
            time.sleep(1)
    except KeyboardInterrupt:
        observer.stop()
        print("\n👋 Stopping file monitor...")
    observer.join()

def batch_process_directory(directory_path):
    """Processes all supported files in a directory."""
    if not os.path.exists(directory_path):
        print(f"Directory does not exist: {directory_path}")
        return
    
    processed = 0
    failed = 0
    
    for filename in os.listdir(directory_path):
        if filename.endswith(('.txt', '.md')):
            file_path = os.path.join(directory_path, filename)
            if process_file(file_path):
                processed += 1
            else:
                failed += 1
            
            # Rate limiting - avoid overwhelming the API
            time.sleep(1)
    
    print(f"\n📊 Batch processing complete:")
    print(f"   ✅ Processed: {processed} files")
    print(f"   ❌ Failed: {failed} files")

# --- Example Usage ---
if __name__ == "__main__":
    import sys
    
    if len(sys.argv) < 2:
        print("Usage:")
        print("  python main.py watch <directory>     # Monitor directory for new files")
        print("  python main.py batch <directory>     # Process all files in directory")
        print("  python main.py single <file_path>    # Process single file")
        sys.exit(1)
    
    command = sys.argv[1]
    
    if command == "watch" and len(sys.argv) > 2:
        start_file_watcher(sys.argv[2])
    elif command == "batch" and len(sys.argv) > 2:
        batch_process_directory(sys.argv[2])
    elif command == "single" and len(sys.argv) > 2:
        process_file(sys.argv[2])
    else:
        print("Invalid command or missing arguments")

Testing Your Implementation

To test any of the complete examples:

1. Environment Setup

Create a .env file with your Gloo AI credentials:
GLOO_CLIENT_ID=your_actual_client_id
GLOO_CLIENT_SECRET=your_actual_client_secret

2. Install Dependencies

Each language has specific setup requirements - check the README files in the sandbox examples.

3. Test Single File Upload

# Create a test file
echo "This is a test article about automated content ingestion." > test_content.txt

# Upload the file
python main.py single test_content.txt

4. Test Directory Monitoring

# Start monitoring a directory
python main.py watch ./watch_folder

# In another terminal, add files to the directory
echo "New content article" > ./watch_folder/new_article.txt

5. Test Batch Processing

# Create multiple test files
mkdir batch_test
echo "Article 1 content" > batch_test/article1.txt
echo "Article 2 content" > batch_test/article2.md

# Process all files
python main.py batch ./batch_test
The system will:
  • Automatically handle token retrieval and refresh
  • Extract metadata from filenames and content
  • Upload content with proper error handling
  • Display success/failure status for each operation
  • Provide structured JSON responses from the API

Production Deployment Considerations

Error Handling

  • Authentication: Automatic token refresh with exponential backoff
  • Rate Limiting: Built-in delays between API calls
  • Network Issues: Retry logic with configurable timeouts
  • File Processing: Validation and error recovery

Monitoring

  • Logging: Structured logs for all operations
  • Metrics: Success rates, processing times, error counts
  • Alerting: Failed upload notifications
  • Health Checks: System status monitoring

Scaling

  • Concurrent Processing: Parallel file processing capabilities
  • Queue Management: Async processing for high-volume scenarios
  • Resource Management: Memory and CPU optimization
  • Load Balancing: Multiple instance coordination

Next Steps

Now that you have a working content ingestion pipeline, consider exploring:
  1. Search API - Query your ingested content
  2. Chat Integration - Use ingested content in conversations
  3. Content Management - Organize and categorize content
  4. Advanced Metadata - Rich content classification and tagging
  5. Custom Pipelines - Integrate with CMS, RSS feeds, or other content sources
This tutorial provides the foundation for building sophisticated content ingestion workflows that can scale from simple file uploads to enterprise-grade automated pipelines.
I