← Back to Tutorials
Intermediate• 30 minutes

Data Transformation Pipeline

Build a complete data transformation pipeline with real-time processing, filtering, aggregation, and storage.

What You'll Build

In this tutorial, you'll create a data transformation pipeline that:

  • Receives JSON data via webhook
  • Filters records based on conditions
  • Aggregates data by categories
  • Transforms the output to CSV format
  • Stores the transformed data to cloud storage

Prerequisites

Before You Start

  • ✅ A Sematryx account with an active API key
  • ✅ Basic understanding of data transformation concepts
  • ✅ Familiarity with JSON and CSV formats
  • ✅ Access to cloud storage (S3, Azure, GCS) for output

Step 1: Create the Pipeline

Create a new automation with transformation actions:

Create data transformation pipeline
curl -X POST https://api.sematryx.com/v1/automations \
  -H "Authorization: Bearer YOUR_API_KEY" \
  -H "Content-Type: application/json" \
  -d '{
    "name": "data-transformation-pipeline",
    "trigger": {
      "type": "webhook",
      "config": {
        "path": "/webhook/data-pipeline"
      }
    },
    "actions": [
      {
        "type": "transform",
        "config": {
          "input_format": "json",
          "output_format": "csv",
          "transformations": [
            {
              "operation": "filter",
              "condition": "price > 100"
            },
            {
              "operation": "aggregate",
              "group_by": "category",
              "aggregations": {
                "total": "sum(price)",
                "count": "count()"
              }
            }
          ]
        }
      },
      {
        "type": "store",
        "config": {
          "destination": "s3://bucket/transformed-data.csv"
        }
      }
    ]
  }'

This pipeline includes:

  • Webhook trigger: Receives data via HTTP POST
  • Transform action: Filters and aggregates the data
  • Store action: Saves the result to cloud storage

Step 2: Using the Python SDK

You can also create the pipeline using the Python SDK:

Create pipeline with Python SDK
from sematryx import SematryxClient

client = SematryxClient(api_key='your-api-key')

# Create data transformation pipeline
pipeline = client.automations.create(
    name='data-transformation-pipeline',
    trigger={
        'type': 'webhook',
        'config': {'path': '/webhook/data-pipeline'}
    },
    actions=[
        {
            'type': 'transform',
            'config': {
                'input_format': 'json',
                'output_format': 'csv',
                'transformations': [
                    {'operation': 'filter', 'condition': 'price > 100'},
                    {
                        'operation': 'aggregate',
                        'group_by': 'category',
                        'aggregations': {
                            'total': 'sum(price)',
                            'count': 'count()'
                        }
                    }
                ]
            }
        },
        {
            'type': 'store',
            'config': {
                'destination': 's3://bucket/transformed-data.csv'
            }
        }
    ]
)

print(f'Pipeline created: {pipeline.id}')

Step 3: Trigger the Pipeline

Send data to your pipeline via webhook or manual trigger:

Trigger the pipeline with data
curl -X POST https://api.sematryx.com/v1/automations/auto_1234567890/trigger \
  -H "Authorization: Bearer YOUR_API_KEY" \
  -H "Content-Type: application/json" \
  -d '{
    "input_data": {
      "records": [
        {"id": 1, "name": "Product A", "price": 150, "category": "Electronics"},
        {"id": 2, "name": "Product B", "price": 80, "category": "Electronics"},
        {"id": 3, "name": "Product C", "price": 200, "category": "Clothing"}
      ]
    }
  }'

Step 4: Monitor Execution

Check the execution status and view results:

Check Execution Status

GET /v1/automations/auto_1234567890/executions

View Transformed Data

Access the transformed CSV file from your configured storage destination.

Advanced Transformations

Available Operations

  • Filter: Filter records based on conditions
  • Map: Transform individual fields
  • Aggregate: Group and aggregate data
  • Join: Combine multiple data sources
  • Sort: Order records by fields
  • Deduplicate: Remove duplicate records

🎉 Next Steps

You've successfully created a data transformation pipeline! Explore more advanced features: