Download and Upload: Streamlining File Transfers with Airflow
In the world of data pipelines, efficiently moving files between different systems is crucial. Often, you'll need to fetch data from a remote source, like a website or API, and store it securely in a cloud storage solution like Amazon S3. This is where Airflow shines. Its flexible architecture and diverse operators enable us to build robust, reliable workflows for seamless data transfers.
Scenario: Let's imagine you need to download a daily CSV report from a website and upload it to your S3 bucket for further analysis. This article will guide you through building an Airflow DAG to automate this process.
Original Code:
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.providers.amazon.aws.operators.s3 import S3CreateBucketOperator, S3UploadFileOperator
from airflow.providers.http.operators.http import SimpleHttpOperator
from datetime import datetime
with DAG(
dag_id='download_and_upload_to_s3',
start_date=datetime(2023, 1, 1),
schedule_interval='@daily',
catchup=False
) as dag:
create_bucket = S3CreateBucketOperator(
task_id='create_bucket',
bucket_name='your-s3-bucket-name'
)
download_file = SimpleHttpOperator(
task_id='download_file',
method='GET',
http_conn_id='http_default',
endpoint='https://www.example.com/daily_report.csv',
data={'param1': 'value1'},
headers={'Authorization': 'your-token'},
save_kwargs={'filename': '/tmp/daily_report.csv'},
response_filter=lambda response: response.content
)
upload_to_s3 = S3UploadFileOperator(
task_id='upload_to_s3',
bucket_name='your-s3-bucket-name',
filepath='/tmp/daily_report.csv',
key='reports/daily_report_{{ ds }}.csv',
aws_conn_id='aws_default'
)
create_bucket >> download_file >> upload_to_s3
Explanation:
-
DAG Definition: We define our DAG with a descriptive name, a start date, a daily schedule, and disable catchup (ensuring only the current day's task is run).
-
Create S3 Bucket: The
S3CreateBucketOperator
creates the bucket if it doesn't exist. Ensure your AWS credentials are configured in theaws_default
connection. -
Download File: The
SimpleHttpOperator
fetches the file from the provided URL. You can specify custom headers, parameters, and a temporary file location. -
Upload to S3: The
S3UploadFileOperator
takes the downloaded file and uploads it to your S3 bucket. Thekey
parameter specifies the file's path within the bucket. Theds
template variable dynamically adds the execution date to the filename. -
Task Dependencies: The
>>
operator defines task dependencies, ensuring the bucket is created first, followed by download and finally upload.
Insights:
-
Flexibility: The
SimpleHttpOperator
allows for various HTTP methods, custom headers, and data parameters, making it adaptable to different APIs and websites. -
Error Handling: Consider adding error handling within the
SimpleHttpOperator
usingon_failure
oron_retry
callbacks to gracefully handle potential download failures. -
Data Validation: After uploading to S3, you might want to add a validation step to check the downloaded file's format or content using a custom operator.
-
Logging: Enable logging in your DAG to monitor the execution progress and identify any potential issues.
Conclusion:
Airflow simplifies file transfers by providing specialized operators for downloading, uploading, and managing files across various systems. This article showcased a basic example, but its versatility allows you to build complex workflows that cater to your specific data needs. Remember to experiment, enhance with error handling, and adapt the code to fit your particular use case. By leveraging Airflow, you can ensure reliable and efficient data movement throughout your pipeline, fostering better data insights and analysis.
References:
- Airflow Documentation: https://airflow.apache.org/docs/apache-airflow/stable/
- AWS Documentation: https://aws.amazon.com/