Hi, do you know when the support for influxDB 2 will be available?
Thanks
This site is in read only mode. Please continue to browse, but replying, likes,
                        and other actions are disabled for now.
Most data sources are made by users in the open source community and contributed. I don’t think there’s a “schedule” for them. It’s just whenever someone makes and shares it.
I have write one datasource for influxdb 2:
import logging
from redash.query_runner import *
from redash.utils import json_dumps
import requests
from dateutil import parser
from pytz import timezone
logger = logging.getLogger(__name__)
enabled = True
# example:
#
#   #group,false,false,true,true,false,false,true,true
#   #datatype,string,long,dateTime:RFC3339,dateTime:RFC3339,dateTime:RFC3339,double,string,string
#   #default,mean,,,,,,,
#   ,result,table,_start,_stop,_time,_value,_field,_measurement
#   ,,0,2021-02-09T06:16:54.252458367Z,2021-02-09T09:16:54.252458367Z,2021-02-09T06:20:00Z,,_3V3,88
#   ,,0,2021-02-09T06:16:54.252458367Z,2021-02-09T09:16:54.252458367Z,2021-02-09T06:30:00Z,3.3112,_3V3,88
influx_type_map = {
    "boolean": "bool",
    "unsignedLong": "int",
    "long": "int",
    "double": "float",
    "string": "string",
    "base64Binary": "string",
    "duration": "string",
    "dateTime:RFC3339": "date",
}
type_cast_map = {
    "bool": lambda v: True if v == "true" else False if v == "false" else None,
    "int": lambda v: None if v == "" else int(v),
    "float": lambda v: None if v == "" else float(v),
    "string": lambda v: v,
    "date": lambda v: parser.parse(v).astimezone(timezone("Asia/Shanghai")).strftime("%Y-%m-%dT%H:%M:%S.%f")
}
def _type_cast(v, t):
    t = influx_type_map[t]
    return type_cast_map[t](v)
def _transform_result(csv_str):
    result_columns = []
    result_rows = []
    if csv_str == "" or csv_str == None:
        return json_dumps(
            {"columns": [], "rows": []}
        )
    lines = csv_str.splitlines()
    logger.debug("csv all lines: %d" % len(lines))
    types = lines[1].split(",")[2:]
    result_columns = lines[3].split(",")[2:]
    row_lines = lines[4:]
    logger.debug("csv all lines: %s" % ', '.join(types))
    logger.debug("data: \n%s", csv_str)
    for line in row_lines:
        if line == None or line == "":
            break
        cols = line.split(",")[2:]
        result_row = {}
        for idx, col in enumerate(cols):
            result_row[result_columns[idx]] = _type_cast(col, types[idx])
        result_rows.append(result_row)
    return json_dumps(
        {"columns": [{"name": c} for c in result_columns], "rows": result_rows}
    )
class InfluxDBv2(BaseQueryRunner):
    should_annotate_query = False
    noop_query = "metrics"
    @ classmethod
    def configuration_schema(cls):
        return {
            "type": "object",
            "properties": {
                "url": {"type": "string"},
                "org": {"type": "string"},
                "token": {"type": "string"},
            },
            "order": ["url", "org", "token"],
            "required": ["url", "org", "token"],
            "secret": ["token"],
        }
    @ classmethod
    def enabled(cls):
        return enabled
    @ classmethod
    def type(cls):
        return "influxdb2"
    def run_query(self, query, user):
        url = self.configuration['url']
        org = self.configuration['org']
        tk = self.configuration['token']
        if query == "metrics":
            try:
                requests.get("%s/metrics" % url)
                return None, None
            except Exception as ex:
                return None, str(ex)
        logger.debug("influxdb2 url: %s", self.configuration["url"])
        logger.debug("influxdb2 got query: %s", query)
        try:
            api = "%s/api/v2/query?org=%s" % (url, org)
            body = {
                "dialect": {"annotations": ["group", "datatype", "default"]},
                "query": query
            }
            headers = {
                "Authorization": "Token %s" % tk
            }
            logger.debug("influxdb2 api: %s headers: %s body: %s", api, headers, body)
            r = requests.post(api, json=body, headers=headers)
            logger.debug("resp status code: %s", r.status_code)
            if r.status_code != 200:
                return None, r.reason
            csv_str = r.text
            json_data = _transform_result(csv_str)
            error = None
        except Exception as ex:
            json_data = None
            error = str(ex)
        return json_data, error
register(InfluxDBv2)
interesting, did you do the pull request with the modification?



