Prefect works with Snowflake by providing dataflow automation for faster, more efficient data pipeline creation, execution, and monitoring.
This results in reduced errors, increased confidence in your data, and ultimately, faster insights.
To set up a table, use the execute and execute_many methods. Then, use the fetch_many method to retrieve data in a stream until there's no more data.
By using the SnowflakeConnector as a context manager, you can make sure that the Snowflake connection and cursors are closed properly after you're done with them.
fromprefectimportflow,taskfromprefect_snowflakeimportSnowflakeConnector@taskdefsetup_table(block_name:str)->None:withSnowflakeConnector.load(block_name)asconnector:connector.execute("CREATE TABLE IF NOT EXISTS customers (name varchar, address varchar);")connector.execute_many("INSERT INTO customers (name, address) VALUES (%(name)s, %(address)s);",seq_of_parameters=[{"name":"Ford","address":"Highway 42"},{"name":"Unknown","address":"Space"},{"name":"Me","address":"Myway 88"},],)@taskdeffetch_data(block_name:str)->list:all_rows=[]withSnowflakeConnector.load(block_name)asconnector:whileTrue:# Repeated fetch* calls using the same operation will# skip re-executing and instead return the next set of resultsnew_rows=connector.fetch_many("SELECT * FROM customers",size=2)iflen(new_rows)==0:breakall_rows.append(new_rows)returnall_rows@flowdefsnowflake_flow(block_name:str)->list:setup_table(block_name)all_rows=fetch_data(block_name)returnall_rowsif__name__=="__main__":snowflake_flow()
fromprefectimportflow,taskfromprefect_snowflakeimportSnowflakeConnectorimportasyncio@taskasyncdefsetup_table(block_name:str)->None:withawaitSnowflakeConnector.load(block_name)asconnector:awaitconnector.execute("CREATE TABLE IF NOT EXISTS customers (name varchar, address varchar);")awaitconnector.execute_many("INSERT INTO customers (name, address) VALUES (%(name)s, %(address)s);",seq_of_parameters=[{"name":"Ford","address":"Highway 42"},{"name":"Unknown","address":"Space"},{"name":"Me","address":"Myway 88"},],)@taskasyncdeffetch_data(block_name:str)->list:all_rows=[]withawaitSnowflakeConnector.load(block_name)asconnector:whileTrue:# Repeated fetch* calls using the same operation will# skip re-executing and instead return the next set of resultsnew_rows=awaitconnector.fetch_many("SELECT * FROM customers",size=2)iflen(new_rows)==0:breakall_rows.append(new_rows)returnall_rows@flowasyncdefsnowflake_flow(block_name:str)->list:awaitsetup_table(block_name)all_rows=awaitfetch_data(block_name)returnall_rowsif__name__=="__main__":asyncio.run(snowflake_flow("example"))
If the native methods of the block don't meet your requirements, don't worry.
You have the option to access the underlying Snowflake connection and utilize its built-in methods as well.
importpandasaspdfromprefectimportflowfromprefect_snowflake.databaseimportSnowflakeConnectorfromsnowflake.connector.pandas_toolsimportwrite_pandas@flowdefsnowflake_write_pandas_flow():connector=SnowflakeConnector.load("my-block")withconnector.get_connection()asconnection:table_name="TABLE_NAME"ddl="NAME STRING, NUMBER INT"statement=f'CREATE TABLE IF NOT EXISTS {table_name} ({ddl})'withconnection.cursor()ascursor:cursor.execute(statement)# case sensitivity matters here!df=pd.DataFrame([('Marvin',42),('Ford',88)],columns=['NAME','NUMBER'])success,num_chunks,num_rows,_=write_pandas(conn=connection,df=df,table_name=table_name,database=snowflake_connector.database,schema=snowflake_connector.schema_# note the "_" suffix)