Snowflake integration for Haystack
Module haystack_integrations.components.retrievers.snowflake.snowflake_table_retriever
MAX_SYS_ROWS
Max rows to fetch from a table
SnowflakeTableRetriever
Connects to a Snowflake database to execute a SQL query. For more information, see Snowflake documentation.
Usage example:
executor = SnowflakeTableRetriever(
user="<ACCOUNT-USER>",
account="<ACCOUNT-IDENTIFIER>",
api_key=Secret.from_env_var("SNOWFLAKE_API_KEY"),
database="<DATABASE-NAME>",
db_schema="<SCHEMA-NAME>",
warehouse="<WAREHOUSE-NAME>",
)
# When database and schema are provided during component initialization.
query = "SELECT * FROM table_name"
# or
# When database and schema are NOT provided during component initialization.
query = "SELECT * FROM database_name.schema_name.table_name"
results = executor.run(query=query)
print(results["dataframe"].head(2)) # Pandas dataframe
# Column 1 Column 2
# 0 Value1 Value2
# 1 Value1 Value2
print(results["table"]) # Markdown
# | Column 1 | Column 2 |
# |:----------|:----------|
# | Value1 | Value2 |
# | Value1 | Value2 |
SnowflakeTableRetriever.__init__
def __init__(user: str,
account: str,
api_key: Secret = Secret.from_env_var("SNOWFLAKE_API_KEY"),
database: Optional[str] = None,
db_schema: Optional[str] = None,
warehouse: Optional[str] = None,
login_timeout: Optional[int] = None) -> None
Arguments:
user
: User's login.account
: Snowflake account identifier.api_key
: Snowflake account password.database
: Name of the database to use.db_schema
: Name of the schema to use.warehouse
: Name of the warehouse to use.login_timeout
: Timeout in seconds for login. By default, 60 seconds.
SnowflakeTableRetriever.to_dict
def to_dict() -> Dict[str, Any]
Serializes the component to a dictionary.
Returns:
Dictionary with serialized data.
SnowflakeTableRetriever.from_dict
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "SnowflakeTableRetriever"
Deserializes the component from a dictionary.
Arguments:
data
: Dictionary to deserialize from.
Returns:
Deserialized component.
SnowflakeTableRetriever.run
@component.output_types(dataframe=pd.DataFrame, table=str)
def run(query: str) -> Dict[str, Any]
Execute a SQL query against a Snowflake database.
Arguments:
query
: A SQL query to execute.