1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
|
open StdLabels
module IntSet = ImportContainers.IntSet
module Syntax = ImportConf.Syntax
module Table = ImportDataTypes.Table
module Path = ImportDataTypes.Path
module Expression = ImportExpression.T
(*
Internally, the dependency chain is represented as a graph.
Each file to process (csv or xlsx) is marked as a node, The link between
each node cannot be represented in the graph because each index can imply
multiple tables together (for exemple [join(source.A, other.B)] ). So the
graph is only used in order to get the import process order, and each
information is keeped in a separate table.
*)
type deps = (ImportContainers.Source.t * ImportContainers.Source.t list) list
type key = {
name : string;
expression : Path.column Expression.t;
columns : ImportContainers.IntSet.t Lazy.t;
}
type t = {
table : Table.t;
columns : IntSet.t;
keys : key list;
}
let table t = t.table
let columns t = t.columns
let keys t = t.keys
type build_map = t ImportContainers.Externals.t
(* The expression can be qualified with full path (when we are in the column
definition) or only with a column (in a binding with an externanl table).
This type is here to extract the correct information, according to the type
we are dealing with :
- [to_mapping] allow to pick the right Set in which we need to add the
column pointed by the path (this can be [keys] or [columns]
- [of_path] is need to extract the qualified source from any kind of path.
*)
type 'a expression_extractor = {
to_mapping : t -> Path.column -> t;
of_path : 'a -> string option * Path.column;
}
(** [add_path_in_map f parent path ] Extract the column from element [path] and
process the column in the function [f]
The [path] is abstract, but the function [f.of_path] can extract the needed
elements in order to add it in the mapping.
The function may raise [Unknown_source] if the the path describe an unknown
table. *)
let add_path_in_map :
f:'a expression_extractor -> conf:Syntax.t -> 'a -> build_map -> build_map =
fun ~f ~conf path map ->
let table_source, column = f.of_path path in
let table =
try ImportConf.get_table_for_name conf table_source with
| Not_found -> raise (ImportErrors.Unknown_source (Option.get table_source))
in
ImportContainers.Externals.update map
~key:(ImportContainers.KeyName.from_table table) ~f:(fun v ->
let mapping =
match v with
| None -> raise (ImportErrors.Unknown_source table.name)
| Some mapping -> mapping
in
Some (f.to_mapping mapping column))
let add_expression_in_map :
f:'a expression_extractor ->
conf:Syntax.t ->
'a Expression.t ->
build_map ->
build_map =
fun ~f ~conf expr map ->
Expression.fold_values expr ~init:map ~f:(fun map p ->
add_path_in_map ~f ~conf p map)
let add_columns_in_map :
f:'a expression_extractor ->
conf:Syntax.t ->
'a Expression.t list ->
build_map ->
build_map =
fun ~f ~conf columns map ->
let columns =
List.fold_left columns ~init:map ~f:(fun map expression ->
let new_map = add_expression_in_map ~f ~conf expression map in
new_map)
in
columns
(* [add_dependancies ~conf source map path]
add the dependancy from the table [source] to another one after analysing the
expression and extracting the path contained inside.
This function is called for each path declared inside the expression. *)
let add_dependancies :
conf:Syntax.t -> Syntax.Extern.t -> deps -> Path.t -> deps =
fun ~conf extern graph path ->
let source_table = ImportConf.get_table_for_name conf path.Path.alias in
let source = ImportContainers.Source.from_table source_table in
let target = ImportContainers.Source.from_table extern.target in
match ImportContainers.Source.equal target source with
| true -> graph
| _ -> (target, [ source ]) :: graph
let add_external_in_map :
conf:Syntax.t -> Syntax.Extern.t -> build_map * deps -> build_map * deps =
fun ~conf extern (map, graph) ->
let dest = ImportContainers.KeyName.from_table extern.target in
(* Pre-check that every source is already declared in the configuration. *)
let _ =
Expression.fold_values extern.intern_key ~init:() ~f:(fun () path ->
try
let _ = ImportConf.get_table_for_name conf path.Path.alias in
()
with
| Not_found -> (
match path.alias with
| Some table -> raise (ImportErrors.Unknown_source table)
| None ->
(* This is very unlikely. This error would be raised if we have
no source for this import *)
let root = conf.source in
raise (ImportErrors.Unknown_source root.name)))
in
(* Create the new key with all the expression and all the columns inside it *)
let new_key =
{
name = extern.target.Table.name;
expression = extern.extern_key;
columns =
lazy
(Expression.fold_values extern.extern_key
~f:(fun acc k -> ImportContainers.IntSet.add k acc)
~init:ImportContainers.IntSet.empty);
}
in
let build_map =
ImportContainers.Externals.update map ~key:dest ~f:(function
| None ->
(* Create the entry for the external if it does not exists *)
Some
{
table = extern.target;
columns = IntSet.empty;
keys = [ new_key ];
}
| Some mapping ->
(* Or update the existing one with the key we just created *)
Some { mapping with keys = new_key :: mapping.keys })
in
let graph =
Expression.fold_values extern.intern_key ~init:graph
~f:(add_dependancies ~conf extern)
in
let build_map =
add_expression_in_map extern.intern_key build_map ~conf
~f:
{
of_path =
(fun Path.{ alias; column } ->
let table = ImportConf.get_table_for_name conf alias in
(Some table.name, column));
to_mapping =
(fun mapping column ->
{ mapping with columns = IntSet.add column mapping.columns });
}
in
(build_map, graph)
let mapper =
{
to_mapping =
(fun mapping column ->
{ mapping with columns = IntSet.add column mapping.columns });
of_path = (fun ({ alias; column } : Path.t) -> (alias, column));
}
let get_mapping : Syntax.t -> build_map * deps =
fun conf ->
let root = ImportContainers.Source.from_table (ImportConf.root_table conf)
and root' =
ImportContainers.KeyName.from_table (ImportConf.root_table conf)
in
let graph = [ (root, []) ] in
(* For each external declared in the configuration file, add the columns to
query *)
let init =
( ImportContainers.Externals.singleton root'
{
table = ImportConf.root_table conf;
columns = IntSet.empty;
keys = [];
},
graph )
in
let map, graph =
List.fold_left conf.externals ~init ~f:(fun map extern ->
add_external_in_map ~conf extern map)
in
(* Now we don’t bother anymore with the graph and it’s dependency, we just
collect all the columns in the differents expressions *)
let map =
map
|> add_columns_in_map ~conf ~f:mapper conf.columns
|> add_columns_in_map ~conf ~f:mapper conf.sort
|> add_columns_in_map ~conf ~f:mapper conf.filters
|> add_columns_in_map ~conf ~f:mapper conf.uniq
in
(map, graph)
let get_process_order : Syntax.t -> t list =
fun map ->
let map, graph = get_mapping map in
match Tsort.sort graph with
| Tsort.ErrorCycle l ->
let name_of_key k =
ImportContainers.(Externals.find (Source.name k) map).table.name
in
raise (ImportErrors.Cycle (List.map ~f:name_of_key l))
| Sorted elements ->
(* It’s OK, we know there is no cycles now, we can extract the files to
load from this list.
*)
List.filter_map elements ~f:(fun v ->
ImportContainers.Externals.find_opt
(ImportContainers.Source.name v)
map)
(* This list can still have duplicated values, and we have to remove them
still keeping the order.
*)
|> List.fold_left ~init:[] ~f:(fun acc element ->
(* Prevent the same file to beeing loaded twice *)
match List.mem element ~set:acc with
| true -> acc
| false -> element :: acc)
|