To support new data source type in Redash, you need to implement a Query Runner for this data source type. I’m working on a Snowflake query runner, so decided to use this opportunity to describe the process of adding a new data source:
We start with implementing the BaseQueryRunner
class:
from redash.query_runner import BaseQueryRunner
class Snowflake(BaseQueryRunner):
def run_query(self, query, user):
pass
The only method that you must implement is the run_query
method, which accepts a query
parameter (string) and the user
who invoked this query. The user
is irrelevant for most query runners and can be ignored.
Configuration
Usually the query runner needs some configuration to be used, so for this we need to implement the configuration_schema
class method:
@classmethod
def configuration_schema(cls):
return {
"type": "object",
"properties": {
"account": {
"type": "string"
},
"user": {
"type": "string"
},
"password": {
"type": "string"
},
"warehouse": {
"type": "string"
},
"database": {
"type": "string"
}
},
"required": ["user", "password", "account", "database", "warehouse"],
"secret": ["password"]
}
This method returns a JSON schema object. The supported types for the properties are string
, number
and boolean
. Also note the required
field which defines the required properties (all of them in this case) and secret
, which defines the secret fields (won’t be sent back to the UI).
Executing the query
Now that we defined the configuration we can implement the run_query
method:
def run_query(self, query, user):
connection = snowflake.connector.connect(
user=self.configuration['user'],
password=self.configuration['password'],
account=self.configuration['account'],
)
cursor = connection.cursor()
try:
cursor.execute("USE WAREHOUSE {}".format(self.configuration['warehouse']))
cursor.execute("USE {}".format(self.configuration['database']))
cursor.execute(query)
columns = self.fetch_columns([(i[0], TYPES_MAP.get(i[1], None)) for i in cursor.description])
rows = [dict(zip((c['name'] for c in columns), row)) for row in cursor]
data = {'columns': columns, 'rows': rows}
error = None
json_data = json_dumps(data)
finally:
cursor.close()
connection.close()
return json_data, error
This is the minimum required code and what it does is:
- Connect to the Snowflake cluster.
- Run the query.
- Transform the results into the format Redash expects.
Mapping Column Types to Redash Types
Note this line:
columns = self.fetch_columns([(i[0], TYPES_MAP.get(i[1], None)) for i in cursor.description])
We use a helper function (fetch_columns
) to de-duplicate column names and assign the type (if known) to the column. If no type is assigned, the default is string
. The TYPES_MAP
dictionary is a custom one we define for each query runner type.
The return value of the run_query
method is a tuple of the JSON encoded results and error string. The error string is used in case you want to return some kind of custom error message, otherwise you can let the exceptions to propagate.
Fetching Database Schema
This is the minimum required to run a query. If you want Redash to show the database schema and enable autocomplete, you need to implement the get_schema
method:
def get_schema(self, get_stats=False):
query = """
SELECT col.table_schema,
col.table_name,
col.column_name
FROM {database}.information_schema.columns col
WHERE col.table_schema <> 'INFORMATION_SCHEMA'
""".format(database=self.configuration['database'])
results, error = self.run_query(query, None)
if error is not None:
raise Exception("Failed getting schema.")
schema = {}
results = json.loads(results)
for row in results['rows']:
table_name = '{}.{}'.format(row['TABLE_SCHEMA'], row['TABLE_NAME'])
if table_name not in schema:
schema[table_name] = {'name': table_name, 'columns': []}
schema[table_name]['columns'].append(row['COLUMN_NAME'])
return schema.values()
The implementation of get_schema
is specific to the data source you’re adding support to but the return value needs to be an array of dictionaries, where each dictionary has a name
key (table name) and columns
key (array of column names).
Test Connection Support
The last thing that you need to implement is the Test Connection button support. You can either supply a NOOP query or implement the test_connection
method. In this case we opted for the first:
noop_query = "SELECT 1"
Checking for required dependencies
If the query runner needs some external Python packages, we wrap those imports with a try
/except
block, to prevent crashing deployments where this package is not available:
try:
import snowflake.connector
enabled = True
except ImportError:
enabled = False
The enabled
variable is later used in the query runner’s enabled
class method:
@classmethod
def enabled(cls):
return enabled
If it returns False
the query runner won’t be enabled.
Finishing up
Usually the connector will need to have some additional Python packages, we add those to the requirements_all_ds.txt
file. If the required Python packages don’t have any special dependencies (like some system packages), we usually add the query runner to the list of default ones in settings.py
.
See the full pull request here.