Automatisation Airflow avec n8n : gestion des exécutions de DAG
Ce workflow n8n a pour objectif de gérer les exécutions de DAG dans Airflow, permettant ainsi une automatisation efficace des processus de données. Dans un contexte où la gestion des flux de travail est cruciale pour les entreprises, ce workflow s'avère particulièrement utile pour les équipes techniques et les data engineers qui souhaitent surveiller et contrôler l'état de leurs tâches automatisées. En intégrant des appels HTTP pour interagir avec l'API d'Airflow, ce workflow permet de vérifier l'état des DAG et de réagir en conséquence, garantissant ainsi une gestion fluide des opérations.
- Étape 1 : Le workflow débute par un appel HTTP à l'API d'Airflow pour déclencher un DAG.
- Étape 2 : Ensuite, il vérifie l'état du DAG en cours d'exécution. Si l'état est 'queued', le workflow attend un certain temps avant de vérifier à nouveau.
- Étape 3 : Si le DAG prend trop de temps à s'exécuter, une erreur est générée.
- Étape 4 : En cas de succès, le workflow récupère les résultats de l'exécution. Les noeuds 'if' et 'switch' permettent de gérer les différentes conditions et de prendre des décisions basées sur l'état du DAG. Les bénéfices business de ce workflow incluent une réduction des temps d'attente et une meilleure visibilité sur l'état des processus, ce qui permet aux équipes de réagir rapidement en cas de problème. En automatisant la gestion des DAG, les entreprises peuvent améliorer leur efficacité opérationnelle et réduire les risques d'erreurs humaines.
Workflow n8n Airflow, gestion de données : vue d'ensemble
Schéma des nœuds et connexions de ce workflow n8n, généré à partir du JSON n8n.
Workflow n8n Airflow, gestion de données : détail des nœuds
Inscris-toi pour voir l'intégralité du workflow
Inscription gratuite
S'inscrire gratuitementBesoin d'aide ?{
"id": "Y5URlIlbX4HDzWKA",
"meta": {
"instanceId": "6ae0aa8b6c9099f1f8ed1265281802eab47aaf5b2845f317791e4ac7ee5b7279",
"templateCredsSetupCompleted": true
},
"name": "airflow dag_run",
"tags": [
{
"id": "YSelDQ3zfWB0LeVn",
"name": "airflow",
"createdAt": "2025-02-25T04:17:21.943Z",
"updatedAt": "2025-02-25T04:17:21.943Z"
}
],
"nodes": [
{
"id": "0d4457ef-7a88-4755-8bd2-b0e8148f86f4",
"name": "Airflow: dag_run",
"type": "n8n-nodes-base.httpRequest",
"position": [
380,
-40
],
"parameters": {
"url": "={{ $('airflow-api').item.json.prefix }}/api/v1/dags/{{ $('in data').item.json.dag_id }}/dagRuns",
"method": "POST",
"options": {},
"jsonBody": "={\n \"conf\": {{ $('in data').item.json.conf }}\n}",
"sendBody": true,
"specifyBody": "json",
"authentication": "genericCredentialType",
"genericAuthType": "httpBasicAuth"
},
"credentials": {
"httpBasicAuth": {
"id": "vTR4WWA7czRn2ULn",
"name": "Airflow"
}
},
"typeVersion": 4.2
},
{
"id": "acf477a0-aad5-402a-a5a0-543f3e277333",
"name": "Airflow: dag_run - state",
"type": "n8n-nodes-base.httpRequest",
"position": [
840,
60
],
"parameters": {
"url": "={{ $('airflow-api').item.json.prefix }}/api/v1/dags/{{ $('in data').item.json.dag_id }}/dagRuns/{{ $('Airflow: dag_run').item.json.dag_run_id }}",
"options": {},
"authentication": "genericCredentialType",
"genericAuthType": "httpBasicAuth"
},
"credentials": {
"httpBasicAuth": {
"id": "vTR4WWA7czRn2ULn",
"name": "Airflow"
}
},
"typeVersion": 4.2
},
{
"id": "26982a6f-6281-4140-a05c-ea6f3f2c0cb2",
"name": "count",
"type": "n8n-nodes-base.code",
"position": [
1180,
40
],
"parameters": {
"jsCode": "try {\n $('count').first().json.count += 1\n return {count:$('count').first().json.count};\n}\ncatch (error) {\n return {count:1};\n}"
},
"typeVersion": 2
},
{
"id": "613718f6-ba7e-415c-8e07-0123224e1ac6",
"name": "dag run fail",
"type": "n8n-nodes-base.stopAndError",
"position": [
1180,
400
],
"parameters": {
"errorMessage": "dag run fail"
},
"typeVersion": 1
},
{
"id": "66ba0e85-4608-418b-b27b-8cbc50f78319",
"name": "if state == queued",
"type": "n8n-nodes-base.if",
"position": [
520,
60
],
"parameters": {
"options": {},
"conditions": {
"options": {
"version": 2,
"leftValue": "",
"caseSensitive": true,
"typeValidation": "strict"
},
"combinator": "and",
"conditions": [
{
"id": "0ae67986-09c0-443d-9262-075bfe94ca2e",
"operator": {
"name": "filter.operator.equals",
"type": "string",
"operation": "equals"
},
"leftValue": "={{ $json.state }}",
"rightValue": "queued"
}
]
}
},
"typeVersion": 2.2
},
{
"id": "92176e9a-0ea7-48b0-9ca0-ea4ea8442cee",
"name": "dag run wait too long",
"type": "n8n-nodes-base.stopAndError",
"position": [
1500,
40
],
"parameters": {
"errorMessage": "dag run wait too long"
},
"typeVersion": 1
},
{
"id": "6a05471f-232e-44d6-9dbb-583400537507",
"name": "Airflow: dag_run - get result",
"type": "n8n-nodes-base.httpRequest",
"position": [
1180,
-140
],
"parameters": {
"url": "={{ $('airflow-api').item.json.prefix }}/api/v1/dags/{{ $('in data').item.json.dag_id }}/dagRuns/{{ $('Airflow: dag_run').item.json.dag_run_id }}/taskInstances/{{ $('in data').item.json.task_id }}/xcomEntries/return_value",
"options": {},
"authentication": "genericCredentialType",
"genericAuthType": "httpBasicAuth"
},
"credentials": {
"httpBasicAuth": {
"id": "vTR4WWA7czRn2ULn",
"name": "Airflow"
}
},
"typeVersion": 4.2
},
{
"id": "fb2211d5-cef2-4be2-b281-de315aa07093",
"name": "Switch: state",
"type": "n8n-nodes-base.switch",
"position": [
980,
-40
],
"parameters": {
"rules": {
"values": [
{
"outputKey": "=success",
"conditions": {
"options": {
"version": 2,
"leftValue": "",
"caseSensitive": true,
"typeValidation": "strict"
},
"combinator": "and",
"conditions": [
{
"id": "4d4ecf8a-c02b-4722-9528-1919cdf9b83e",
"operator": {
"name": "filter.operator.equals",
"type": "string",
"operation": "equals"
},
"leftValue": "={{ $json.state }}",
"rightValue": "success"
}
]
},
"renameOutput": true
},
{
"outputKey": "queued",
"conditions": {
"options": {
"version": 2,
"leftValue": "",
"caseSensitive": true,
"typeValidation": "strict"
},
"combinator": "and",
"conditions": [
{
"operator": {
"type": "string",
"operation": "equals"
},
"leftValue": "={{ $json.state }}",
"rightValue": "queued"
}
]
},
"renameOutput": true
},
{
"outputKey": "running",
"conditions": {
"options": {
"version": 2,
"leftValue": "",
"caseSensitive": true,
"typeValidation": "strict"
},
"combinator": "and",
"conditions": [
{
"id": "fa5d8524-334a-4ab1-b543-bba75cafd0ec",
"operator": {
"name": "filter.operator.equals",
"type": "string",
"operation": "equals"
},
"leftValue": "={{ $json.state }}",
"rightValue": "running"
}
]
},
"renameOutput": true
},
{
"outputKey": "failed",
"conditions": {
"options": {
"version": 2,
"leftValue": "",
"caseSensitive": true,
"typeValidation": "strict"
},
"combinator": "and",
"conditions": [
{
"id": "dd853677-c51c-4c06-8680-3c9d1829d6bd",
"operator": {
"name": "filter.operator.equals",
"type": "string",
"operation": "equals"
},
"leftValue": "={{ $json.state }}",
"rightValue": "failed"
}
]
},
"renameOutput": true
}
]
},
"options": {
"fallbackOutput": 3
}
},
"typeVersion": 3.2
},
{
"id": "5941496a-a64d-432c-9103-e7bcae4b8d67",
"name": "in data",
"type": "n8n-nodes-base.executeWorkflowTrigger",
"position": [
100,
-40
],
"parameters": {
"workflowInputs": {
"values": [
{
"name": "dag_id"
},
{
"name": "task_id"
},
{
"name": "conf"
},
{
"name": "wait",
"type": "number"
},
{
"name": "wait_time",
"type": "number"
}
]
}
},
"typeVersion": 1.1
},
{
"id": "e77fed4a-b61a-4126-8be3-43ef8a832cfe",
"name": "Wait",
"type": "n8n-nodes-base.wait",
"position": [
700,
-40
],
"webhookId": "3d164954-2926-4174-afc1-261dfdfa9e46",
"parameters": {
"amount": "={{ $('in data').item.json.hasOwnProperty('wait') ? $('in data').item.json.wait : 10 }}"
},
"typeVersion": 1.1
},
{
"id": "8ffae842-4400-4667-85bb-50644ef10fc0",
"name": "If count > wait_time",
"type": "n8n-nodes-base.if",
"position": [
1320,
140
],
"parameters": {
"options": {},
"conditions": {
"options": {
"version": 2,
"leftValue": "",
"caseSensitive": true,
"typeValidation": "strict"
},
"combinator": "and",
"conditions": [
{
"id": "1829d538-5633-4f5c-ad1b-285b084b35ee",
"operator": {
"type": "number",
"operation": "gt"
},
"leftValue": "={{ $json.count }}",
"rightValue": "={{ $('in data').item.json.hasOwnProperty('wait_time') ? $('in data').item.json.wait_time : 12 }}"
}
]
}
},
"typeVersion": 2.2
},
{
"id": "c49d4a1b-6f25-471b-9c21-d3defb709dda",
"name": "airflow-api",
"type": "n8n-nodes-base.set",
"position": [
240,
60
],
"parameters": {
"options": {},
"assignments": {
"assignments": [
{
"id": "40a5ab5b-dee1-44c4-910a-d6b85af75aed",
"name": "prefix",
"type": "string",
"value": "https://airflow.example.com"
}
]
}
},
"typeVersion": 3.4
}
],
"active": false,
"pinData": {
"in data": [
{
"json": {
"conf": "{\n \"image\": \"nginx\",\n \"tag\": \"latest\",\n \"tag_requested\": 1000\n}",
"wait": 10,
"dag_id": "image_tag_related",
"task_id": "image_tag_related",
"wait_time": 18
}
}
]
},
"settings": {
"executionOrder": "v1"
},
"versionId": "57fdbcfc-7950-4aff-9ac7-3c2a99a2c8c8",
"connections": {
"Wait": {
"main": [
[
{
"node": "Airflow: dag_run - state",
"type": "main",
"index": 0
}
]
]
},
"count": {
"main": [
[
{
"node": "If count > wait_time",
"type": "main",
"index": 0
}
]
]
},
"in data": {
"main": [
[
{
"node": "airflow-api",
"type": "main",
"index": 0
}
]
]
},
"airflow-api": {
"main": [
[
{
"node": "Airflow: dag_run",
"type": "main",
"index": 0
}
]
]
},
"Switch: state": {
"main": [
[
{
"node": "Airflow: dag_run - get result",
"type": "main",
"index": 0
}
],
[
{
"node": "count",
"type": "main",
"index": 0
}
],
[
{
"node": "count",
"type": "main",
"index": 0
}
],
[
{
"node": "dag run fail",
"type": "main",
"index": 0
}
]
]
},
"Airflow: dag_run": {
"main": [
[
{
"node": "if state == queued",
"type": "main",
"index": 0
}
]
]
},
"if state == queued": {
"main": [
[
{
"node": "Wait",
"type": "main",
"index": 0
}
],
[
{
"node": "dag run fail",
"type": "main",
"index": 0
}
]
]
},
"If count > wait_time": {
"main": [
[
{
"node": "dag run wait too long",
"type": "main",
"index": 0
}
],
[
{
"node": "Wait",
"type": "main",
"index": 0
}
]
]
},
"Airflow: dag_run - state": {
"main": [
[
{
"node": "Switch: state",
"type": "main",
"index": 0
}
]
]
},
"Airflow: dag_run - get result": {
"main": [
[]
]
}
}
}Workflow n8n Airflow, gestion de données : pour qui est ce workflow ?
Ce workflow s'adresse principalement aux équipes techniques, aux data engineers et aux responsables de la gestion des données au sein des entreprises. Il est conçu pour les organisations qui utilisent Airflow pour orchestrer leurs flux de travail et qui cherchent à automatiser la surveillance et la gestion de leurs DAG. Un niveau technique intermédiaire est recommandé pour une mise en œuvre efficace.
Workflow n8n Airflow, gestion de données : problème résolu
Ce workflow résout le problème de la gestion manuelle des exécutions de DAG dans Airflow, qui peut être source de frustration et de perte de temps. En automatisant le suivi de l'état des DAG, il élimine les risques d'erreurs humaines et permet une réaction rapide en cas de problème. Les utilisateurs bénéficient d'une meilleure visibilité sur leurs processus, ce qui leur permet d'optimiser leur temps et de se concentrer sur des tâches à plus forte valeur ajoutée.
Workflow n8n Airflow, gestion de données : étapes du workflow
Étape 1 : Le workflow commence par un appel HTTP à l'API d'Airflow pour déclencher un DAG.
- Étape 1 : Il vérifie ensuite l'état du DAG en cours d'exécution.
- Étape 2 : Si l'état est 'queued', le workflow attend un certain temps avant de vérifier à nouveau.
- Étape 3 : Si le DAG prend trop de temps, une erreur est générée.
- Étape 4 : En cas de succès, le workflow récupère les résultats de l'exécution. Les noeuds conditionnels permettent de gérer les différentes branches du flux en fonction de l'état du DAG.
Workflow n8n Airflow, gestion de données : guide de personnalisation
Pour personnaliser ce workflow, vous pouvez modifier l'URL de l'API d'Airflow dans les noeuds HTTP. Assurez-vous également d'ajuster les conditions dans les noeuds 'if' et 'switch' selon vos besoins spécifiques. Si vous souhaitez intégrer d'autres outils, vous pouvez ajouter des noeuds supplémentaires pour interagir avec des services tiers. Pensez à sécuriser vos appels API en utilisant des méthodes d'authentification appropriées et à surveiller les performances du workflow pour garantir son bon fonctionnement.