To support new data source type in Redash, you need to implement a Query Runner for this data source type. I’m working on a Snowflake query runner, so decided to use this opportunity to describe the process of adding a new data source:

We start with implementing the BaseQueryRunner class:

from redash.query_runner import BaseQueryRunner


class Snowflake(BaseQueryRunner):
    def run_query(self, query, user):
        pass

The only method that you must implement is the run_query method, which accepts a query parameter (string) and the user who invoked this query. The user is irrelevant for most query runners and can be ignored.

Configuration

Usually the query runner needs some configuration to be used, so for this we need to implement the configuration_schema class method:

    @classmethod
    def configuration_schema(cls):
        return {
            "type": "object",
            "properties": {
                "account": {
                    "type": "string"
                },
                "user": {
                    "type": "string"
                },
                "password": {
                    "type": "string"
                },
                "warehouse": {
                    "type": "string"
                },
                "database": {
                    "type": "string"
                }
            },
            "required": ["user", "password", "account", "database", "warehouse"],
            "secret": ["password"]
        }

This method returns a JSON schema object. The supported types for the properties are string, number and boolean. Also note the required field which defines the required properties (all of them in this case) and secret, which defines the secret fields (won’t be sent back to the UI).

Executing the query

Now that we defined the configuration we can implement the run_query method:


    def run_query(self, query, user):
        connection = snowflake.connector.connect(
            user=self.configuration['user'],
            password=self.configuration['password'],
            account=self.configuration['account'],
        )

        cursor = connection.cursor()

        try:
            cursor.execute("USE WAREHOUSE {}".format(self.configuration['warehouse']))
            cursor.execute("USE {}".format(self.configuration['database']))

            cursor.execute(query)

            columns = self.fetch_columns([(i[0], TYPES_MAP.get(i[1], None)) for i in cursor.description])
            rows = [dict(zip((c['name'] for c in columns), row)) for row in cursor]

            data = {'columns': columns, 'rows': rows}
            error = None
            json_data = json_dumps(data)
        finally:
            cursor.close()
            connection.close()

        return json_data, error

This is the minimum required code and what it does is:

  1. Connect to the Snowflake cluster.
  2. Run the query.
  3. Transform the results into the format Redash expects.

Mapping Column Types to Redash Types

Note this line:

columns = self.fetch_columns([(i[0], TYPES_MAP.get(i[1], None)) for i in cursor.description])

We use a helper function (fetch_columns) to de-duplicate column names and assign the type (if known) to the column. If no type is assigned, the default is string. The TYPES_MAP dictionary is a custom one we define for each query runner type.

The return value of the run_query method is a tuple of the JSON encoded results and error string. The error string is used in case you want to return some kind of custom error message, otherwise you can let the exceptions to propagate.

Fetching Database Schema

This is the minimum required to run a query. If you want Redash to show the database schema and enable autocomplete, you need to implement the get_schema method:

    def get_schema(self, get_stats=False):
        query = """
        SELECT col.table_schema,
               col.table_name,
               col.column_name
        FROM {database}.information_schema.columns col
        WHERE col.table_schema <> 'INFORMATION_SCHEMA'
        """.format(database=self.configuration['database'])

        results, error = self.run_query(query, None)

        if error is not None:
            raise Exception("Failed getting schema.")

        schema = {}
        results = json.loads(results)

        for row in results['rows']:
            table_name = '{}.{}'.format(row['TABLE_SCHEMA'], row['TABLE_NAME'])

            if table_name not in schema:
                schema[table_name] = {'name': table_name, 'columns': []}

            schema[table_name]['columns'].append(row['COLUMN_NAME'])

        return schema.values()

The implementation of get_schema is specific to the data source you’re adding support to but the return value needs to be an array of dictionaries, where each dictionary has a name key (table name) and columns key (array of column names).

Test Connection Support

The last thing that you need to implement is the Test Connection button support. You can either supply a NOOP query or implement the test_connection method. In this case we opted for the first:

    noop_query = "SELECT 1"

Checking for required dependencies

If the query runner needs some external Python packages, we wrap those imports with a try/except block, to prevent crashing deployments where this package is not available:

try:
    import snowflake.connector
    enabled = True
except ImportError:
    enabled = False

The enabled variable is later used in the query runner’s enabled class method:

    @classmethod
    def enabled(cls):
        return enabled

If it returns False the query runner won’t be enabled.

Finishing up

Usually the connector will need to have some additional Python packages, we add those to the requirements_all_ds.txt file. If the required Python packages don’t have any special dependencies (like some system packages), we usually add the query runner to the list of default ones in settings.py.

See the full pull request here.

6 Likes

@arikfr

If I am adding query runner in self hosted production version (I have used setup script with some modification), where should I place the query runner, in which folder?

It goes in app/query_runner.

1 Like

I noticed you’ve implemented cloudwatch.py and cloudwatch_insights.py in version 9.
I copied those files in the app/query_runner folder. But still can’t see cloudwatch as one of the sources in UI.
I hope at some location I need to specify this expected data source. what are the changes I need to make?

where is the settings.py file you mentioned above.
is that one of the files I must edit?

Thank you :slight_smile:

If any of the dependencies are missing you will need to install them. Otherwise the query runner won’t appear.