How to Upload CSV Data to Snowflake Using Python: A Step-by-Step Guide
How to Upload CSV Data to Snowflake Using Python: A Step-by-Step Guide
By - Ankush Rathour
Uploading CSV data into Snowflake can be streamlined using Python's powerful libraries. In this guide, we'll walk through the process of installing the necessary packages, setting up the connection, and executing the upload. This step-by-step tutorial is designed to help both beginners and experienced developers efficiently manage data within Snowflake.
Table of Contents
- Prerequisites
- Installing Required Packages
- Connecting to Snowflake
- Configuring the Database
- Uploading Data to Snowflake
- Putting It All Together
- Running the Script
- Conclusion
- References
Prerequisites
Before we begin, ensure you have the following:
- Python 3.x installed on your machine.
- A Snowflake account with appropriate permissions.
- Basic knowledge of Python and SQL.
Installing Required Packages
First, we need to install the Snowflake Connector for Python and Pandas
pip install snowflake-connector-python pandas
Connecting to Snowflake
We use the snowflake.connector
module to establish a
connection:
import snowflake.connector as snowflake
def connect(user, password, account, warehouse):
conn = snowflake.connect(
user=user,
password=password,
account=account,
warehouse=warehouse
)
return conn
snowflake.connector
module to establish a
connection:
import snowflake.connector as snowflake def connect(user, password, account, warehouse): conn = snowflake.connect( user=user, password=password, account=account, warehouse=warehouse ) return conn
Parameters:
-
user
: Your Snowflake username. -
password
: Your Snowflake password. -
account
: Your Snowflake account identifier (e.g.,company_name
). -
warehouse
: The name of the warehouse to use.
Configuring the Database
Next, we'll set up the database, schema, and table where the data
will be uploaded:
import os
import pandas as pd
def database_config(conn, database, schema, table, file):
cs = conn.cursor()
cs.execute(f"CREATE DATABASE IF NOT EXISTS {database}")
cs.execute(f"USE DATABASE {database}")
cs.execute(f"CREATE SCHEMA IF NOT EXISTS {schema}")
cs.execute(f"USE SCHEMA {schema}")
filename = os.path.splitext(os.path.basename(file))[0].upper()
table = table or f"{filename}_table"
df = pd.read_csv(file)
df.columns = df.columns.str.upper()
create_table = f"CREATE TABLE IF NOT EXISTS {table} (\n"
for col in df.columns:
dtype = df[col].dtype
if dtype == "int64":
col_type = "INT"
elif dtype == "float64":
col_type = "FLOAT"
elif dtype == "bool":
col_type = "BOOLEAN"
elif dtype == "datetime64[ns]":
col_type = "DATETIME"
else:
col_type = "VARCHAR(16777216)"
create_table += f"{col} {col_type},\n"
create_table = create_table.rstrip(",\n") + ")"
cs.execute(create_table)
cs.execute(f"TRUNCATE TABLE IF EXISTS {table}")
return cs, df
import os
import pandas as pd
def database_config(conn, database, schema, table, file):
cs = conn.cursor()
cs.execute(f"CREATE DATABASE IF NOT EXISTS {database}")
cs.execute(f"USE DATABASE {database}")
cs.execute(f"CREATE SCHEMA IF NOT EXISTS {schema}")
cs.execute(f"USE SCHEMA {schema}")
filename = os.path.splitext(os.path.basename(file))[0].upper()
table = table or f"{filename}_table"
df = pd.read_csv(file)
df.columns = df.columns.str.upper()
create_table = f"CREATE TABLE IF NOT EXISTS {table} (\n"
for col in df.columns:
dtype = df[col].dtype
if dtype == "int64":
col_type = "INT"
elif dtype == "float64":
col_type = "FLOAT"
elif dtype == "bool":
col_type = "BOOLEAN"
elif dtype == "datetime64[ns]":
col_type = "DATETIME"
else:
col_type = "VARCHAR(16777216)"
create_table += f"{col} {col_type},\n"
create_table = create_table.rstrip(",\n") + ")"
cs.execute(create_table)
cs.execute(f"TRUNCATE TABLE IF EXISTS {table}")
return cs, df
- Database and Schema Creation: Ensures the specified database and schema exist.
- Table Creation: Dynamically creates a table based on the CSV's column names and data types.
- DataFrame Preparation: Reads the CSV file into a Pandas DataFrame and converts column names to uppercase.
Uploading Data to Snowflake
write_pandas
function to upload the Data Frame:
from snowflake.connector.pandas_tools import write_pandas
def upload_snowflake(user, password, account, warehouse, database, schema, table, file):
try:
conn = connect(user, password, account, warehouse)
cs, df = database_config(conn, database, schema, table, file)
write_pandas(conn, df, table_name=table.upper(), database=database, schema=schema)
cs.close()
conn.close()
conn = connect(user, password, account, warehouse)
cs, df = database_config(conn, database, schema, table, file)
write_pandas(conn, df, table_name=table.upper(), database=database, schema=schema)
cs.close()
conn.close()
except Exception as e:
print(f"An error occurred: {e}")
- Error Handling: Catches exceptions to prevent the script from crashing and provides informative messages.
- Connection Management: Ensures that the cursor and connection are properly closed after operations.
Putting It All Together
Below is the complete script combining all the functions:
import os
import argparse
import pandas as pd
import snowflake.connector as snowflake
from snowflake.connector.pandas_tools import write_pandas
# [Include the connect, database_config, and upload_snowflake functions here]
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Load data into a Snowflake database.')
parser.add_argument('user', type=str, help='Snowflake username')
parser.add_argument('password', type=str, help='Snowflake password')
parser.add_argument('account', type=str, help='Snowflake account identifier')
parser.add_argument('warehouse', type=str, help='Snowflake warehouse name')
parser.add_argument('database', type=str, help='Database name')
parser.add_argument('schema', type=str, help='Schema name')
parser.add_argument('table', type=str, help='Table name')
parser.add_argument('file', type=str, help='Path to the CSV file')
args = parser.parse_args()
upload_snowflake(
args.user,
args.password,
args.account,
args.warehouse,
args.database,
args.schema,
args.table,
args.file
)
import os
import argparse
import pandas as pd
import snowflake.connector as snowflake
from snowflake.connector.pandas_tools import write_pandas
# [Include the connect, database_config, and upload_snowflake functions here]
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Load data into a Snowflake database.')
parser.add_argument('user', type=str, help='Snowflake username')
parser.add_argument('password', type=str, help='Snowflake password')
parser.add_argument('account', type=str, help='Snowflake account identifier')
parser.add_argument('warehouse', type=str, help='Snowflake warehouse name')
parser.add_argument('database', type=str, help='Database name')
parser.add_argument('schema', type=str, help='Schema name')
parser.add_argument('table', type=str, help='Table name')
parser.add_argument('file', type=str, help='Path to the CSV file')
args = parser.parse_args()
upload_snowflake(
args.user,
args.password,
args.account,
args.warehouse,
args.database,
args.schema,
args.table,
args.file
)
Running the Script
python snowflake.py [username] [password] [account] [warehouse] [database] [schema] [table] [file]
python snowflake_upload.py my_user my_pass my_account my_warehouse my_db public my_table data.csv
Conclusion
By following this guide, you can automate the process of uploading CSV data into Snowflake using Python. This script handles database configuration, table creation, and data insertion, making it a valuable tool for data engineers and analysts.
References
Comments
Post a Comment