← Back to Tutorials
Intermediate• 30 minutes
Data Transformation Pipeline
Build a complete data transformation pipeline with real-time processing, filtering, aggregation, and storage.
What You'll Build
In this tutorial, you'll create a data transformation pipeline that:
- Receives JSON data via webhook
- Filters records based on conditions
- Aggregates data by categories
- Transforms the output to CSV format
- Stores the transformed data to cloud storage
Prerequisites
Before You Start
- ✅ A Sematryx account with an active API key
- ✅ Basic understanding of data transformation concepts
- ✅ Familiarity with JSON and CSV formats
- ✅ Access to cloud storage (S3, Azure, GCS) for output
Step 1: Create the Pipeline
Create a new automation with transformation actions:
Create data transformation pipeline
curl -X POST https://api.sematryx.com/v1/automations \
-H "Authorization: Bearer YOUR_API_KEY" \
-H "Content-Type: application/json" \
-d '{
"name": "data-transformation-pipeline",
"trigger": {
"type": "webhook",
"config": {
"path": "/webhook/data-pipeline"
}
},
"actions": [
{
"type": "transform",
"config": {
"input_format": "json",
"output_format": "csv",
"transformations": [
{
"operation": "filter",
"condition": "price > 100"
},
{
"operation": "aggregate",
"group_by": "category",
"aggregations": {
"total": "sum(price)",
"count": "count()"
}
}
]
}
},
{
"type": "store",
"config": {
"destination": "s3://bucket/transformed-data.csv"
}
}
]
}'This pipeline includes:
- Webhook trigger: Receives data via HTTP POST
- Transform action: Filters and aggregates the data
- Store action: Saves the result to cloud storage
Step 2: Using the Python SDK
You can also create the pipeline using the Python SDK:
Create pipeline with Python SDK
from sematryx import SematryxClient
client = SematryxClient(api_key='your-api-key')
# Create data transformation pipeline
pipeline = client.automations.create(
name='data-transformation-pipeline',
trigger={
'type': 'webhook',
'config': {'path': '/webhook/data-pipeline'}
},
actions=[
{
'type': 'transform',
'config': {
'input_format': 'json',
'output_format': 'csv',
'transformations': [
{'operation': 'filter', 'condition': 'price > 100'},
{
'operation': 'aggregate',
'group_by': 'category',
'aggregations': {
'total': 'sum(price)',
'count': 'count()'
}
}
]
}
},
{
'type': 'store',
'config': {
'destination': 's3://bucket/transformed-data.csv'
}
}
]
)
print(f'Pipeline created: {pipeline.id}')Step 3: Trigger the Pipeline
Send data to your pipeline via webhook or manual trigger:
Trigger the pipeline with data
curl -X POST https://api.sematryx.com/v1/automations/auto_1234567890/trigger \
-H "Authorization: Bearer YOUR_API_KEY" \
-H "Content-Type: application/json" \
-d '{
"input_data": {
"records": [
{"id": 1, "name": "Product A", "price": 150, "category": "Electronics"},
{"id": 2, "name": "Product B", "price": 80, "category": "Electronics"},
{"id": 3, "name": "Product C", "price": 200, "category": "Clothing"}
]
}
}'Step 4: Monitor Execution
Check the execution status and view results:
Check Execution Status
GET /v1/automations/auto_1234567890/executionsView Transformed Data
Access the transformed CSV file from your configured storage destination.
Advanced Transformations
Available Operations
- Filter: Filter records based on conditions
- Map: Transform individual fields
- Aggregate: Group and aggregate data
- Join: Combine multiple data sources
- Sort: Order records by fields
- Deduplicate: Remove duplicate records
🎉 Next Steps
You've successfully created a data transformation pipeline! Explore more advanced features: