influxDB 2 support

Hi, do you know when the support for influxDB 2 will be available?
Thanks

Most data sources are made by users in the open source community and contributed. I don’t think there’s a “schedule” for them. It’s just whenever someone makes and shares it.

I have write one datasource for influxdb 2:

import logging

from redash.query_runner import *
from redash.utils import json_dumps
import requests
from dateutil import parser
from pytz import timezone

logger = logging.getLogger(__name__)

enabled = True

# example:
#
#   #group,false,false,true,true,false,false,true,true
#   #datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,double,string,string
#   #default,mean,,,,,,,
#   ,result,table,_start,_stop,_time,_value,_field,_measurement
#   ,,0,2021-02-09T06:16:54.252458367Z,2021-02-09T09:16:54.252458367Z,2021-02-09T06:20:00Z,,_3V3,88
#   ,,0,2021-02-09T06:16:54.252458367Z,2021-02-09T09:16:54.252458367Z,2021-02-09T06:30:00Z,3.3112,_3V3,88

influx_type_map = {
    "boolean": "bool",
    "unsignedLong": "int",
    "long": "int",
    "double": "float",
    "string": "string",
    "base64Binary": "string",
    "duration": "string",
    "dateTime:RFC3339": "date",
}

type_cast_map = {
    "bool": lambda v: True if v == "true" else False if v == "false" else None,
    "int": lambda v: None if v == "" else int(v),
    "float": lambda v: None if v == "" else float(v),
    "string": lambda v: v,
    "date": lambda v: parser.parse(v).astimezone(timezone("Asia/Shanghai")).strftime("%Y-%m-%dT%H:%M:%S.%f")
}


def _type_cast(v, t):
    t = influx_type_map[t]
    return type_cast_map[t](v)


def _transform_result(csv_str):
    result_columns = []
    result_rows = []
    if csv_str == "" or csv_str == None:
        return json_dumps(
            {"columns": [], "rows": []}
        )

    lines = csv_str.splitlines()

    logger.debug("csv all lines: %d" % len(lines))

    types = lines[1].split(",")[2:]
    result_columns = lines[3].split(",")[2:]
    row_lines = lines[4:]

    logger.debug("csv all lines: %s" % ', '.join(types))
    logger.debug("data: \n%s", csv_str)

    for line in row_lines:
        if line == None or line == "":
            break
        cols = line.split(",")[2:]
        result_row = {}
        for idx, col in enumerate(cols):
            result_row[result_columns[idx]] = _type_cast(col, types[idx])
        result_rows.append(result_row)

    return json_dumps(
        {"columns": [{"name": c} for c in result_columns], "rows": result_rows}
    )


class InfluxDBv2(BaseQueryRunner):
    should_annotate_query = False
    noop_query = "metrics"

    @ classmethod
    def configuration_schema(cls):
        return {
            "type": "object",
            "properties": {
                "url": {"type": "string"},
                "org": {"type": "string"},
                "token": {"type": "string"},
            },
            "order": ["url", "org", "token"],
            "required": ["url", "org", "token"],
            "secret": ["token"],
        }

    @ classmethod
    def enabled(cls):
        return enabled

    @ classmethod
    def type(cls):
        return "influxdb2"

    def run_query(self, query, user):
        url = self.configuration['url']
        org = self.configuration['org']
        tk = self.configuration['token']
        if query == "metrics":
            try:
                requests.get("%s/metrics" % url)
                return None, None
            except Exception as ex:
                return None, str(ex)

        logger.debug("influxdb2 url: %s", self.configuration["url"])
        logger.debug("influxdb2 got query: %s", query)

        try:
            api = "%s/api/v2/query?org=%s" % (url, org)
            body = {
                "dialect": {"annotations": ["group", "datatype", "default"]},
                "query": query
            }
            headers = {
                "Authorization": "Token %s" % tk
            }
            logger.debug("influxdb2 api: %s headers: %s body: %s", api, headers, body)
            r = requests.post(api, json=body, headers=headers)
            logger.debug("resp status code: %s", r.status_code)
            if r.status_code != 200:
                return None, r.reason
            csv_str = r.text
            json_data = _transform_result(csv_str)
            error = None
        except Exception as ex:
            json_data = None
            error = str(ex)

        return json_data, error


register(InfluxDBv2)

interesting, did you do the pull request with the modification?