Airflow vs n8n for API-driven data pipelines
What You’ll Need
- n8n Cloud or self-hosted n8n instance
- Hetzner VPS or Contabo VPS for hosting (if self-hosting)
- DigitalOcean as an alternative hosting option
- Python 3.8+ (for Airflow)
- PostgreSQL or MySQL database
- Basic understanding of REST APIs and workflow automation concepts
Table of Contents
- Understanding Airflow and n8n
- Architecture and Deployment Models
- Building Your First API-Driven Pipeline
- Scalability, Performance, and Real-World Considerations
- Which One Should You Actually Choose?
- Getting Started
Understanding Airflow and n8n
I’ve spent the last few years building data pipelines, and the Airflow vs n8n question keeps coming up. Both tools solve the same fundamental problem—orchestrating tasks and workflows—but they approach it from completely different angles.
Apache Airflow is a workflow orchestration platform built by Airbnb, now an Apache project. It’s written in Python, code-first, and treats workflows as directed acyclic graphs (DAGs). Think of it as a framework for engineers who want maximum control and don’t mind writing Python code to define their pipelines.
n8n, on the other hand, is a visual workflow automation platform that’s been gaining serious traction. It’s node-based, has a browser-friendly UI, and integrates with hundreds of APIs out of the box. You can build complex workflows without touching code, though it supports code nodes when you need them.
For API-driven data pipelines specifically, this distinction matters. A lot.
Architecture and Deployment Models
Here’s where the philosophies diverge.
Airflow’s Architecture:
Airflow uses a distributed architecture with a scheduler, executor, and metadata database. The scheduler reads your DAGs and decides what to execute. Executors actually run the tasks—you can use the LocalExecutor for development, CeleryExecutor for distributed task queues, or KubernetesExecutor for container-based scaling.
This complexity gives you power. You get fine-grained control over task dependencies, retries, backoffs, and custom operators. But it also means you’re responsible for maintaining multiple components.
n8n’s Architecture:
n8n runs as a single application (though it supports clustering). It uses a database to store workflow definitions and execution history. There’s no separate scheduler/executor split—the application handles orchestration internally. When you deploy n8n Cloud , you’re getting fully managed infrastructure. If you self-host on Hetzner VPS or DigitalOcean , it’s a single Docker container or Node.js process.
For API-driven pipelines, this matters. n8n’s simplicity means you can have a workflow hitting 50 different APIs up and running in an afternoon. Airflow would take longer to set up, but it’d handle 50,000 API calls across thousands of tasks more elegantly at scale.
Building Your First API-Driven Pipeline
Let me show you how both tools handle a common scenario: fetch data from an API, transform it, and save it to a database.
Airflow Approach
In Airflow, you define a DAG in Python:
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
import requests
import json
import psycopg2
from psycopg2.extras import execute_values
default_args = {
'owner': 'data_team',
'retries': 2,
'retry_delay': timedelta(minutes=5),
'start_date': datetime(2024, 1, 1),
}
dag = DAG(
'fetch_user_data_pipeline',
default_args=default_args,
description='Fetch user data from API and store in PostgreSQL',
schedule_interval='0 */6 * * *',
catchup=False,
)
def fetch_from_api():
url = 'https://jsonplaceholder.typicode.com/users'
try:
response = requests.get(url, timeout=30)
response.raise_for_status()
data = response.json()
with open('/tmp/user_data.json', 'w') as f:
json.dump(data, f)
print(f"Successfully fetched {len(data)} users")
return len(data)
except requests.exceptions.RequestException as e:
print(f"API request failed: {e}")
raise
def transform_and_validate(ti):
with open('/tmp/user_data.json', 'r') as f:
users = json.load(f)
validated_users = []
for user in users:
if all(key in user for key in ['id', 'name', 'email', 'phone']):
validated_users.append({
'user_id': user['id'],
'name': user['name'],
'email': user['email'],
'phone': user['phone'],
'company': user.get('company', {}).get('name', 'Unknown'),
'fetched_at': datetime.now().isoformat()
})
with open('/tmp/validated_users.json', 'w') as f:
json.dump(validated_users, f)
print(f"Validated {len(validated_users)} out of {len(users)} users")
return len(validated_users)
def load_to_database(ti):
try:
conn = psycopg2.connect(
host='localhost',
database='pipeline_db',
user='pipeline_user',
password='secure_password',
port=5432
)
cursor = conn.cursor()
with open('/tmp/validated_users.json', 'r') as f:
users = json.load(f)
insert_query = """
INSERT INTO users (user_id, name, email, phone, company, fetched_at)
VALUES %s
ON CONFLICT (user_id) DO UPDATE SET
name = EXCLUDED.name,
email = EXCLUDED.email,
phone = EXCLUDED.phone,
company = EXCLUDED.company,
fetched_at = EXCLUDED.fetched_at
"""
values = [
(u['user_id'], u['name'], u['email'], u['phone'], u['company'], u['fetched_at'])
for u in users
]
execute_values(cursor, insert_query, values)
conn.commit()
cursor.close()
conn.close()
print(f"Loaded {len(users)} users into database")
return len(users)
except psycopg2.Error as e:
print(f"Database error: {e}")
raise
task_fetch = PythonOperator(
task_id='fetch_api_data',
python_callable=fetch_from_api,
dag=dag,
)
task_transform = PythonOperator(
task_id='transform_data',
python_callable=transform_and_validate,
dag=dag,
)
task_load = PythonOperator(
task_id='load_to_postgres',
python_callable=load_to_database,
dag=dag,
)
task_cleanup = BashOperator(
task_id='cleanup_temp_files',
bash_command='rm -f /tmp/user_data.json /tmp/validated_users.json',
dag=dag,
)
task_fetch >> task_transform >> task_load >> task_cleanup
This DAG runs every 6 hours, fetches user data from an API, validates it, loads it into PostgreSQL, and cleans up temporary files. If any task fails, Airflow retries it twice with 5-minute delays.
n8n Approach
With n8n , you’d build the same workflow visually, but here’s how you’d export and manage it via JSON (which is what n8n stores internally):
{
"name": "Fetch User Data Pipeline",
"nodes": [
{
"parameters": {
"triggerType": "interval",
"unit": "hours",
"value": 6
},
"id": "schedule-trigger",
"name": "Schedule",
"type": "n8n-nodes-base.scheduleTrigger",
"typeVersion": 1,
"position": [250, 300]
},
{
"parameters": {
"url": "https://jsonplaceholder.typicode.com/users",
"method": "GET",
"authentication": "none",
"responseFormat": "json"
},
"id": "http-request-api",
"name": "Fetch User API",
"type": "n8n-nodes-base.httpRequest",
"typeVersion": 4,
"position": [450, 300]
},
{
"parameters": {
"functionCode": "return items.map(item => {\n const user = item.json;\n if (!user.id || !user.name || !user.email || !user.phone) {\n return null;\n }\n return {\n json: {\n user_id: user.id,\n name: user.name,\n email: user.email,\n phone: user.phone,\n company: user.company?.name || 'Unknown',\n fetched_at: new Date().toISOString()\n }\n };\n}).filter(item => item !== null);"
},
"id": "code-transform",
"name": "Transform & Validate",
"type": "n8n-nodes-base.code",
"typeVersion": 2,
"position": [650, 300]
},
{
"parameters": {
"host": "localhost",
"port": 5432,
"database": "pipeline_db",
"user": "pipeline_user",
"password": "secure_password",
"ssl": false,
"query": "INSERT INTO users (user_id, name, email, phone, company, fetched_at) VALUES (@user_id, @name, @email, @phone, @company, @fetched_at) ON CONFLICT (user_id) DO UPDATE SET name=EXCLUDED.name, email=EXCLUDED.email, phone=EXCLUDED.phone, company=EXCLUDED.company, fetched_at=EXCLUDED.fetched_at"
},
"id": "postgres-insert",
"name": "Load to PostgreSQL",
"type": "n8n-nodes-base.postgres",
"typeVersion": 2,
"position": [850, 300]
}
],
"connections": {
"schedule-trigger": {
"main": [[{ "node": "http-request-api", "type": "main", "index": 0 }]]
},
"http-request-api": {
"main": [[{ "node": "code-transform", "type": "main", "index": 0 }]]
},
"code-transform": {
"main": [[{ "node": "postgres-insert", "type": "main", "index": 0 }]]
}
},
"settings": {
"saveDataErrorExecution": "all",
"save
Want to automate this yourself?
Start with n8n Cloud (free tier available) or self-host on a Hetzner VPS for full control.