forked from apache/superset
-
Notifications
You must be signed in to change notification settings - Fork 0
/
source_registry.py
68 lines (58 loc) · 2.42 KB
/
source_registry.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
from sqlalchemy.orm import subqueryload
class SourceRegistry(object):
""" Central Registry for all available datasource engines"""
sources = {}
@classmethod
def register_sources(cls, datasource_config):
for module_name, class_names in datasource_config.items():
class_names = [str(s) for s in class_names]
module_obj = __import__(module_name, fromlist=class_names)
for class_name in class_names:
source_class = getattr(module_obj, class_name)
cls.sources[source_class.type] = source_class
@classmethod
def get_datasource(cls, datasource_type, datasource_id, session):
return (
session.query(cls.sources[datasource_type])
.filter_by(id=datasource_id)
.first()
)
@classmethod
def get_all_datasources(cls, session):
datasources = []
for source_type in SourceRegistry.sources:
datasources.extend(
session.query(SourceRegistry.sources[source_type]).all())
return datasources
@classmethod
def get_datasource_by_name(cls, session, datasource_type, datasource_name,
schema, database_name):
datasource_class = SourceRegistry.sources[datasource_type]
datasources = session.query(datasource_class).all()
# Filter datasoures that don't have database.
db_ds = [d for d in datasources if d.database and
d.database.name == database_name and
d.name == datasource_name and schema == schema]
return db_ds[0]
@classmethod
def query_datasources_by_permissions(cls, session, database, permissions):
datasource_class = SourceRegistry.sources[database.type]
return (
session.query(datasource_class)
.filter_by(database_id=database.id)
.filter(datasource_class.perm.in_(permissions))
.all()
)
@classmethod
def get_eager_datasource(cls, session, datasource_type, datasource_id):
"""Returns datasource with columns and metrics."""
datasource_class = SourceRegistry.sources[datasource_type]
return (
session.query(datasource_class)
.options(
subqueryload(datasource_class.columns),
subqueryload(datasource_class.metrics)
)
.filter_by(id=datasource_id)
.one()
)