1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
|
open StdLabels
module Expression = ImportExpression
module Q = Expression.Query
module Syntax = ImportConf.Syntax
module Table = ImportDataTypes.Table
module Path = ImportDataTypes.Path
(* Collect all the tables pointed by the expression. *)
let pointed_tables : Syntax.t -> 'a Expression.T.t -> (Table.t * string) list =
fun conf expression ->
Expression.T.fold_values expression ~init:[] ~f:(fun acc path ->
let table = ImportConf.get_table_for_name conf path.Path.alias in
let table_name = Table.name table in
(table, table_name) :: acc)
|> List.sort_uniq ~cmp:Stdlib.compare
let create_table : Dependency.t -> string =
fun mapping ->
let b = Buffer.create 64 in
Buffer.add_string b "CREATE TABLE '";
Buffer.add_string b (Table.name (Dependency.table mapping));
Buffer.add_string b "' (id INTEGER PRIMARY KEY";
List.iter (Dependency.keys mapping) ~f:(fun { Dependency.name; _ } ->
Buffer.add_string b ",'key_";
Buffer.add_string b name;
Buffer.add_string b "'");
ImportContainers.IntSet.iter (Dependency.columns mapping) ~f:(fun i ->
Buffer.add_string b ",'col_";
Buffer.add_string b (string_of_int i);
Buffer.add_string b "'");
Buffer.add_string b ")";
Buffer.contents b
type query = {
q : string;
parameters : ImportCSV.DataType.t Seq.t;
}
let rec take_elements :
prefix:'a list -> eq:('a -> 'a -> bool) -> 'a list -> 'a list =
fun ~prefix ~eq group ->
match (prefix, group) with
| [], any -> any
| _, [] -> []
| hd1 :: tl1, hd2 :: tl2 when eq hd1 hd2 -> take_elements ~eq ~prefix:tl1 tl2
| _, _ -> raise Not_found
(** The window functions shall be grouped in the same way as the uniq property.
(We cannot make a partition over a group not kept in the final result).
But the SQL query need to remove from the window function the elements
already defined in the group by statement, and we need to filter them from
the configuration before building the query. *)
let clean_window :
prefix:Path.t ImportExpression.T.t list ->
Path.t ImportExpression.T.t ->
Path.t ImportExpression.T.t =
fun ~prefix expression ->
let open ImportExpression.T in
let rec f = function
| Expr e -> Expr (f e)
| Empty -> Empty
| Literal s -> Literal s
| Integer i -> Integer i
| Path p -> Path p
| Concat pp -> Concat (List.map ~f pp)
| Function' (name, pp) -> Function' (name, List.map ~f pp)
| Function (name, pp) -> Function (name, List.map ~f pp)
| Nvl pp -> Nvl (List.map ~f pp)
| Join (sep, pp) -> Join (sep, List.map ~f pp)
| Window (window_f, group, order) ->
let w = map_window ~f window_f in
let group =
take_elements ~eq:(ImportExpression.T.equal Path.equal) ~prefix group
in
Window (w, List.map ~f group, List.map ~f order)
| BOperator (n, arg1, arg2) -> BOperator (n, f arg1, f arg2)
| GEquality (n, arg1, args) -> GEquality (n, f arg1, List.map ~f args)
in
f expression
(** Build the query and return also the mapping in order to identify each
external links between files.
The select query will name each column with an alias, and the map allow to
find which source is pointed by this alias. *)
let select : Syntax.t -> query * Path.t ImportExpression.T.t array =
fun conf ->
let filter = ImportConf.CTE.of_filters conf.filters in
(* For each column in the configuration file, add the corresponding element
in the query.
The Sqlite driver return the elements in an array, we create an array too
in order to manage the elements together.
*)
let headers = Array.make (List.length conf.columns) (Obj.magic None) in
(* Transform the columns to extract from the query.
- Associate each of them with a number
- Clean the window functions and remove the part already defined in the
[uniq] parameter.
*)
let columns =
List.to_seq conf.columns
|> Seq.mapi (fun i c ->
let expression = c in
(i, clean_window ~prefix:conf.uniq expression))
in
let filters = Chunk.create () in
let request_header = Filters.generate_sql ~conf filter filters in
let b = request_header.Chunk.b
and parameters = request_header.Chunk.parameters in
let formatter = Format.formatter_of_buffer b in
let () =
Format.fprintf formatter "SELECT %a"
(Format.pp_print_seq
~pp_sep:(fun f () -> Format.fprintf f ",\n")
(fun formatter (i, column) ->
Array.set headers i column;
let p =
Q.query_of_expression Q.BindParam formatter (Printers.path ~conf)
column
in
Queue.transfer p parameters;
Format.fprintf formatter " AS result_%d" i))
columns
in
Format.pp_print_flush formatter ();
let () = Chunk.create_from_statement_of_chunck conf request_header in
Chunk.append ~head:request_header ~tail:filters;
let formatter = Format.formatter_of_buffer b in
(match conf.Syntax.uniq with
| [] -> ()
| uniq ->
Format.fprintf formatter "\nGROUP BY %a"
(Format.pp_print_list
~pp_sep:(fun f () -> Format.fprintf f ", ")
(fun formatter column ->
let seq =
Q.query_of_expression Q.BindParam formatter (Printers.path ~conf)
column
in
Queue.transfer seq parameters))
uniq);
(match conf.Syntax.sort with
| [] -> ()
| sort ->
Format.fprintf formatter "\nORDER BY %a"
(Format.pp_print_list
~pp_sep:(fun f () -> Format.fprintf f ", ")
(fun formatter column ->
let seq =
Q.query_of_expression Q.BindParam formatter (Printers.path ~conf)
column
in
Queue.transfer seq parameters))
sort);
Format.pp_print_flush formatter ();
({ q = Buffer.contents b; parameters = Queue.to_seq parameters }, headers)
let check_external : Syntax.t -> Syntax.Extern.t -> query =
fun conf external_ ->
let internal_chunk = Chunk.create () in
Chunk.add_expression ~conf internal_chunk external_.Syntax.Extern.intern_key;
let external_key_buffer = Buffer.create 16 in
Buffer.add_string external_key_buffer
(Table.print_column external_.Syntax.Extern.target
("key_" ^ external_.Syntax.Extern.target.name));
let pointed_tables = pointed_tables conf external_.intern_key in
let parameters = Queue.create () in
(* We do a copy before the transfert because the Queue is reused later in the
query *)
Queue.transfer (Queue.copy internal_chunk.parameters) parameters;
(* We have to link all the tables referenced by the external, we cannot let
any table not linked with the source in the request (this would cause a
cartesian product request)
This not the usual way to proceed (we start from the source and link the externals)
*)
let rec collect_links :
Syntax.Extern.t -> Syntax.Extern.t list -> Syntax.Extern.t list =
fun table init ->
let res =
(* Do not add the same external if the value is already present *)
let init =
match List.find_opt init ~f:(fun ext -> table == ext) with
| None -> table :: init
| Some _ -> init
in
Expression.T.fold_values ~init table.Syntax.Extern.intern_key
~f:(fun acc expr ->
match expr.Path.alias with
| None -> acc
| Some _ as path -> (
let table = ImportConf.get_table_for_name conf path in
(* Look for this table in the externals *)
let external_opt =
List.find_opt conf.Syntax.externals ~f:(fun t ->
t.Syntax.Extern.target == table)
in
match external_opt with
| None -> acc
| Some ext -> collect_links ext acc))
in
res
in
let dependencies = collect_links external_ [] in
let join_content = Buffer.contents external_key_buffer in
let request = Chunk.create () in
Chunk.add_string request "SELECT ";
let () =
match pointed_tables with
| [] ->
(* Otherwise, just return -1 *)
Chunk.add_string request "-1"
| (table, _name) :: _ ->
(* If we have a single source, extract the row number. *)
Chunk.add_string request (Table.print_column table "id")
in
Chunk.add_string request ", ";
Chunk.append ~head:request ~tail:(Chunk.copy internal_chunk);
Chunk.create_from_statement_of_chunck ~externals:dependencies conf request;
Chunk.add_string request " WHERE ";
Chunk.add_string request join_content;
Chunk.add_string request " IS NULL AND ";
Chunk.append ~head:request ~tail:(Chunk.copy internal_chunk);
Chunk.add_string request " IS NOT NULL AND ";
Chunk.append ~head:request ~tail:(Chunk.copy internal_chunk);
Chunk.add_string request " <> ''";
let q = Buffer.contents request.b in
{ q; parameters = Queue.to_seq request.parameters }
let build_key_insert : Buffer.t -> Dependency.key -> unit =
fun buffer { Dependency.expression; _ } ->
let show_column : Format.formatter -> Path.column -> unit =
fun formatter column -> Format.fprintf formatter ":col_%d" column
in
let formatter = Format.formatter_of_buffer buffer in
let () =
Printers.prepare_key formatter ~f:(fun formatter ->
Q.query_of_expression Q.NoParam formatter show_column expression)
in
Format.pp_print_flush formatter ()
|