We have provided you with several tutorials in Snowflake. In this example, we consider the scenario where we have to connect Snowflake with Python, with an EC2 server and finally with an S3 bucket. The scenario is to get the data from Snowflake and to load it to an S3 bucket and/or to the EC2 server. From there, we run the machine learning models and we load the output of the models to an S3 bucket. Finally, we load the data from the S3 bucket to Snowflake. All these tasks should be done by running a python script (.py) where we can schedule it to run periodically using Airflow or a cron job. Let’s start building the .py script.
Import the libraries
For this task, we will need the following libraries that we import to the script.
import snowflake.connector import pandas as pd from snowflake.sqlalchemy import URL from sqlalchemy import create_engine import boto3
Connect Snowflake with EC2 using Python
Assuming that we are working on the EC2 server, we can connect Snowflake using the Python connector.
url = URL( user='xxx', password='xxx', account='xxx', warehouse='COMPUTE_WH', database='GPIPIS_DB', schema='PUBLIC', role = 'SYSADMIN' ) engine = create_engine(url) connection = engine.connect()
Get the data from Snowflake to EC2
Now, we can run an SQL statement to bring the data from Snowflake to EC2 and load them in Pandas.
# get a sample of the data query = ''' select col1, col2, col3 from my_table ''' df = pd.read_sql(query, connection) df
Send Data from Snowflake to S3
Assuming that we have created the stages and the integrations, we can send files from Snowflake to S3. In this example, we send a CSV file to the S3 bucket that we have set in the stage. Notice that we can specify the key of the bucket plus the file name (/daily_file/mydaily_export.csv)
# Sends the output of the query to S3 daily_file_query = """ copy into @my_s3_test_snowflake_stage/daily_file/mydaily_export.csv from (select * from my_other_table) single=TRUE header=TRUE OVERWRITE=TRUE file_format = (type = CSV COMPRESSION = NONE ) """ unload_query = connection.execute(daily_file_query)
Get the Data from S3 to EC2
Using boto3, we can get and send data from S3 to EC2 and vice versa.
# get the daily export using boto3 s3 = boto3.client('s3') # if you want to save it # s3.download_file(Filename = 'tmp.csv', Bucket = 'my-bucket', Key='daily_file/mydaily_export.csv') # read it in memory obj = s3.get_object(Bucket='my-bucket', Key='daily_file/mydaily_export.csv') obj = pd.read_csv(obj['Body']) ### assume that we are doing some other things withthe file and then we have to send it to S3 ### here we build the Machine Learning models and we save it as "my_export.csv" obj.to_csv("my_export.csv", index=False) s3.upload_file(Bucket='my-bucket', # Set filename and key Filename= 'my_export.csv', Key='exports/my_export.csv' )
Copy the Data from S3 to Snowflake
Finally, we want to copy the data from S3 to Snowflake. First, we will truncate the table in order to be sure that there are not any other records and then we run the copy command.
# copy the data from S3 Bucket to Snowflake truncate_query = """ truncate TEST_TABLE """ copy_query = """ copy into TEST_TABLE from @my_s3_test_snowflake_stage/exports/ file_format= (type = csv field_delimiter=',' skip_header=1) """ trunc_query = connection.execute(truncate_query) cp_query = connection.execute(copy_query)
Finally, we close the connection.
connection.close()
The Takeaway
All the above commands can be run by a single .py script or by different .py scripts as a DAG using Airflow. In general, it is good practice to be able to connect different tools together like what we did here by connecting AWS S3, Snowflake, AWS EC2 and Python