编写新的查询运行器
快速导航
简介
Redash 已经连接到许多数据库和 REST API。要在 Redash 中添加对新数据源类型的支持,您需要为其实现一个查询运行器。查询运行器是一个 Python 类。此文档页面展示了编写新查询运行器的过程。它使用 Firebolt 查询运行器作为示例。
首先在 /redash/query_runner
目录中创建一个新的 firebolt.py
文件,并实现 BaseQueryRunner
类
from redash.query_runner import BaseQueryRunner, register
class Firebolt(BaseQueryRunner):
def run_query(self, query, user):
pass
您必须实现的唯一方法是 run_query
方法,它接受一个 query
参数(字符串)和调用此查询的 user
。对于大多数查询运行器,用户是不相关的,可以忽略。
配置
通常,查询运行器需要一些配置才能使用,因此我们需要实现 configuration_schema
类方法。这些字段位于 properties
键下
@classmethod
def configuration_schema(cls):
return {
"type": "object",
"properties": {
"api_endpoint": {"type": "string", "default": DEFAULT_API_URL},
"engine_name": {"type": "string"},
"DB": {"type": "string"},
"user": {"type": "string"},
"password": {"type": "string"}
},
"order": ["user", "password", "api_endpoint", "engine_name", "DB"],
"required": ["user", "password", "engine_name", "DB"],
"secret": ["password"],
}
此方法返回一个 JSON schema 对象。
每个属性必须指定一个 type
。属性支持的类型为 string
、number
和 boolean
。对于类似文件的字段,请参见下一标题。
您还可以选择指定一个 default
值和一个将显示在 UI 中的 title
。如果您没有指定 title
,将使用属性名称。没有默认值的属性将为空白。
另请注意 required
字段,它定义了必需的属性(除了本例中的 api_endpoint
之外的所有属性)和 secret
,它定义了秘密字段(不会发送回 UI)。
这些设置的值可以作为字典在查询运行器对象的 self.configuration
字段中访问。
文件上传
当用户创建数据源实例时,Redash 会将其配置存储在其元数据数据库中。某些数据源将要求用户上传文件(例如 SSL 证书或密钥文件)。要处理此问题,请定义一个名称以 File
结尾的类型为 string
的属性。例如
"properties": {
"someFile": {"type": "string"},
}
Redash 前端会将名称以 File
结尾的任何类型为 string
的属性渲染为文件上传选择器组件。保存时,文件内容将被加密并以字节形式保存到元数据数据库中。在您的查询运行器代码中,您可以将 self.configuration['someFile']
的值读取到 Python 内置的 tempfile
库的 fixture 之一中。从那里,您可以像处理存储在磁盘上的任何文件一样处理这些字节。您可以在 PostgreSQL 查询运行器代码中看到一个示例。
执行查询
现在我们定义了配置,我们可以实现 run_query
方法
def run_query(self, query, user):
connection = connect(
api_endpoint=(self.configuration.get("api_endpoint") or DEFAULT_API_URL),
engine_name=(self.configuration.get("engine_name") or None),
username=(self.configuration.get("user") or None),
password=(self.configuration.get("password") or None),
database=(self.configuration.get("DB") or None),
)
cursor = connection.cursor()
try:
cursor.execute(query)
columns = self.fetch_columns(
[(i[0], TYPES_MAP.get(i[1], None)) for i in cursor.description]
)
rows = [
dict(zip((column["name"] for column in columns), row)) for row in cursor
]
data = {"columns": columns, "rows": rows}
error = None
json_data = json_dumps(data)
finally:
connection.close()
return json_data, error
这是所需的最小代码。它的作用如下
- 连接到配置的 Firebolt 端点,或使用从官方 Firebolt Python API 客户端导入的
DEFAULT_API_URL
。 - 运行查询。
- 将结果转换为 Redash [期望]({% link _kb/data-sources/querying/json-api %}#Required-Data-Structure)的格式。
将列类型映射到 Redash 类型
请注意这些行
columns = self.fetch_columns(
[(i[0], TYPES_MAP.get(i[1], None)) for i in cursor.description]
)
BaseQueryRunner
包含一个帮助函数(fetch_columns
),它可以删除重复的列名并为列分配类型(如果已知)。如果未分配类型,则默认值为字符串。TYPES_MAP
字典是我们在文件顶部定义的自定义字典。它在不同的查询运行器之间会有所不同。
run_query
方法的返回值是一个 JSON 编码的结果和错误字符串的元组。如果您想返回某种自定义错误消息,则可以使用错误字符串,否则您可以让异常传播(这在首次开发查询运行器时很有用)。
获取数据库 Schema
到目前为止,我们展示了运行查询所需的最低限度。如果您还希望 Redash 显示数据库 schema 并启用自动完成,则需要实现 get_schema
方法
def get_schema(self, get_stats=False):
query = """
SELECT TABLE_SCHEMA,
TABLE_NAME,
COLUMN_NAME
FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_SCHEMA <> 'INFORMATION_SCHEMA'
"""
results, error = self.run_query(query, None)
if error is not None:
raise Exception("Failed getting schema.")
schema = {}
results = json_loads(results)
for row in results["rows"]:
table_name = "{}.{}".format(row["table_schema"], row["table_name"])
if table_name not in schema:
schema[table_name] = {"name": table_name, "columns": []}
schema[table_name]["columns"].append(row["column_name"])
return list(schema.values())
get_schema
的实现特定于您正在添加支持的数据源,但返回值需要是字典数组,其中每个字典都有一个 name
键(表名)和一个 columns
键(列名称字符串数组)。
在 Schema 浏览器中包含列类型
如果您希望 Redash schema 浏览器也显示列类型,则可以调整 get_schema
方法,以便 columns
键包含一个字典数组,其中包含键 name
和 type
。
这是一个不包含列类型的示例
[
{
"name": "Table1",
"columns": ["field1", "field2", "field3"]
}
]
这是一个包含列类型的示例
[
{
"name": "Table1",
"columns": [
{
"name": "field1",
"type": "VARCHAR"
},
{
"name": "field2",
"type": "BIGINT"
},
{
"name": "field3",
"type": "DATE"
}
]
}
]
请注意,列类型字符串仅用于协助查询作者。如果它出现在 get_schema
的输出中,Redash 会信任它,并且不会将其与 run_query
返回的类型信息进行比较。因此,schema 浏览器中显示的类型可能与数据库中的列类型不同。我们建议手动针对已知 schema 进行测试,以确保正确的类型显示在 schema 浏览器中。
添加测试连接支持
您还可以实现“测试连接”按钮支持。“测试连接”按钮出现在数据源设置和配置屏幕上。您可以为查询运行器提供一个 noop_query
属性,或者自己实现 test_connection
方法。在此示例中,我们选择了第一个
class Firebolt(BaseQueryRunner):
noop_query = "SELECT 1"
支持 SQL 数据库的自动限制
Redash 前端包含一个勾选框来自动限制查询结果。这有助于避免使用大型结果集来重载 Redash Web 应用程序。对于大多数 SQL 样式的数据库,您可以通过继承 BaseSQLQueryRunner
而不是 BaseQueryRunner
来自动添加自动限制支持。
from redash.query_runner import BaseSQLQueryRunner, register
class Firebolt(BaseSQLQueryRunner):
def run_query(self, query, user):
pass
BaseSQLQueryRunner
使用 sqplarse
在执行之前智能地将 LIMIT 1000
附加到查询,只要选中查询编辑器中的勾选框即可。对于使用不同语法的数据库(特别是 Microsoft SQL Server 或任何 NoSQL 数据库),您可以继续继承 BaseQueryRunner
并实现以下内容
@property
def supports_auto_limit(self):
return True
def apply_auto_limit(self, query_text: str, should_apply_auto_limit: bool):
...
对于 BaseQueryRunner
,supports_auto_limit
属性默认为 false,并且 apply_auto_limit
返回未修改的查询文本。
检查所需的依赖项
如果查询运行器需要一些外部 Python 包,我们会用 try/except 块包裹这些导入,以防止在无法使用该包的部署中崩溃
try:
from firebolt.db import connect
from firebolt.client import DEFAULT_API_URL
enabled = True
except ImportError:
enabled = False
enabled 变量稍后在查询运行器的 enabled 类方法中使用
@classmethod
def enabled(cls):
return enabled
如果它返回 False,则查询运行器将不会被启用。
完成
在您的文件顶部,导入 register
函数并在 firebolt.py
的底部调用它
# top of file
try:
from firebolt.db import connect
from firebolt.client import DEFAULT_API_URL
enabled = True
except ImportError:
enabled = False
from redash.query_runner import BaseQueryRunner, register
from redash.query_runner import TYPE_STRING, TYPE_INTEGER, TYPE_BOOLEAN
from redash.utils import json_dumps, json_loads
TYPES_MAP = {1: TYPE_STRING, 2: TYPE_INTEGER, 3: TYPE_BOOLEAN}
# ... implementation
# bottom of file
register(Firebolt)
通常,连接器将需要一些额外的 Python 包,我们将这些包添加到 requirements_all_ds.txt
文件中。如果所需的 Python 包没有任何特殊的依赖项(例如某些系统包),我们通常会将查询运行器添加到 redash/settings/__init__.py
中的 default_query_runners
。
您可以在 此处 查看 Firebolt 查询运行器的完整拉取请求。
总结
Redash 查询运行器是一个 Python 类,它至少实现了一个 run_query
方法,该方法返回 Redash 期望格式的结果。可配置的数据源设置由 configuration_schema
类方法定义,该方法返回 JSON schema。您可以选择实现连接测试、模式获取和自动限制。您可以通过将数据源添加到设置中的 default_query_runners
列表,或通过设置 ADDITIONAL_QUERY_RUNNERS
环境变量来启用数据源。