aboutsummaryrefslogtreecommitdiff
path: root/lib/analysers/query.ml
blob: 7a6dd2a985b8dd783b73872844030cd1d2e4501b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
open StdLabels
module Expression = ImportExpression
module Q = Expression.Query
module Syntax = ImportConf.Syntax
module Table = ImportConf.Table
module Path = ImportConf.Path

let truncate buffer n = Buffer.truncate buffer (Buffer.length buffer - n)

(** The module allow to create fragment in the query which keep together the
    binderd parameters and the text of the query.contents.

    This is used a lot in order to create the CTE, where you need the create
    fragment used both in the main request and partially in the CTE itself.

    The content is mutable and all the functions are returning [unit]. *)
module Chunk = struct
  type t = {
    b : Buffer.t;
    parameters : ImportCSV.DataType.t Queue.t;
  }

  let create : unit -> t =
   fun () -> { b = Buffer.create 16; parameters = Queue.create () }

  let create' : Buffer.t -> ImportCSV.DataType.t Queue.t -> t =
   fun b parameters -> { b; parameters }

  (* Append the element from [tail] at the end of [head]

     Tail is destroyed during the operation.
  *)
  let append : head:t -> tail:t -> unit =
   fun ~head ~tail ->
    match Buffer.length tail.b with
    | 0 -> ()
    | _ ->
        Buffer.add_buffer head.b tail.b;
        Queue.transfer tail.parameters head.parameters;
        ()

  (** Add a litteral string in the sequence *)
  let add_string : t -> string -> unit = fun t v -> Buffer.add_string t.b v

  let copy : t -> t =
   fun t ->
    let b = Buffer.create 16 and parameters = Queue.copy t.parameters in
    Buffer.add_buffer b t.b;
    { b; parameters }

  let add_parameters : t -> ImportCSV.DataType.t Seq.t -> unit =
   fun t p -> Queue.add_seq t.parameters p
end

let prepare_key : f:(Format.formatter -> unit) -> Format.formatter -> unit =
 fun ~f formatter -> Format.fprintf formatter "rtrim(upper(%t))" f

(* Collect all the tables pointed by the expression. *)
let pointed_tables : Syntax.t -> 'a Expression.T.t -> (Table.t * string) list =
 fun conf expression ->
  Expression.T.fold_values expression ~init:[] ~f:(fun acc path ->
      let table = ImportConf.get_table_for_name conf path.Path.alias in
      let table_name = Table.name table in
      (table, table_name) :: acc)
  |> List.sort_uniq ~cmp:Stdlib.compare

(** Represent a column in a safe way in a query *)
let print_column : Table.t -> string -> string =
 fun table column ->
  String.concat ~sep:"" [ "'"; table.Table.name; "'.'"; column; "'" ]

let create_table : Dependency.t -> string =
 fun mapping ->
  let b = Buffer.create 64 in

  Buffer.add_string b "CREATE TABLE '";
  Buffer.add_string b (Table.name (Dependency.table mapping));
  Buffer.add_string b "' (id INTEGER PRIMARY KEY";

  List.iter (Dependency.keys mapping) ~f:(fun { Dependency.name; _ } ->
      Buffer.add_string b ",'key_";
      Buffer.add_string b name;
      Buffer.add_string b "'");

  ImportContainers.IntSet.iter (Dependency.columns mapping) ~f:(fun i ->
      Buffer.add_string b ",'col_";
      Buffer.add_string b (string_of_int i);
      Buffer.add_string b "'");
  Buffer.add_string b ")";

  Buffer.contents b

let show_path : conf:Syntax.t -> Format.formatter -> Path.t -> unit =
 fun ~conf buffer { alias; column } ->
  let table = ImportConf.get_table_for_name conf alias in
  let table_name = table.Table.name in
  Format.fprintf buffer "'%s'.col_%d" table_name column

(** Extract the informations from the dependancies. We get two informations here :

    - the join query in order to load the data from the external column 
    - the column corresponding to the key in order to identify the missing
      links later. 
    *)
let query_of_external :
    conf:Syntax.t -> join_buffer:Chunk.t -> Syntax.extern -> unit =
 fun ~conf ~join_buffer external_ ->
  let extern_table = Table.name external_.target in

  let formatter = Format.formatter_of_buffer join_buffer.b in
  Format.fprintf formatter "\nLEFT JOIN '%s' AS '%s' ON %t = %s" extern_table
    external_.target.name
    (prepare_key ~f:(fun f ->
         let q =
           Q.query_of_expression Q.BindParam f (show_path ~conf)
             external_.intern_key
         in

         Chunk.add_parameters join_buffer (Queue.to_seq q)))
    (print_column external_.Syntax.target
       ("key_" ^ external_.Syntax.target.name));

  Format.pp_print_flush formatter ()

(** Create the from part of the query, adding all the reuired externals. 

    SQLite is able to optimize the query and do not load the table not used in
    the select clause. *)
let create_from_chunck : Syntax.t -> Chunk.t -> unit =
 fun conf c ->
  Chunk.add_string c "\nFROM '";
  Chunk.add_string c (Table.name conf.source);
  Chunk.add_string c "' AS '";
  Chunk.add_string c conf.source.name;
  Chunk.add_string c "'";

  (* Add the externals in the query *)
  List.iter conf.externals ~f:(query_of_external ~conf ~join_buffer:c)

(** Build a CTE query in order to use any group function inside the query.
    Return the binded parameters used in the expression. The buffer given in
    argument is also modified during the construction. 

    If filters is not None, the clauses are added to the CTE. *)
let build_cte :
    Syntax.t ->
    expression:'a Expression.T.t ->
    filters:Chunk.t option ->
    Chunk.t =
 fun conf ~expression ~filters ->
  (* The binded parameters queue will be used later in the full query *)
  let cte_chunk = Chunk.create () in

  Chunk.add_string cte_chunk "WITH cte AS (SELECT ";
  Chunk.add_string cte_chunk conf.source.name;
  Chunk.add_string cte_chunk ".id, ";

  let formatter = Format.formatter_of_buffer cte_chunk.b in

  let p =
    Q.query_of_expression Q.BindParam formatter (show_path ~conf) expression
  in
  Format.pp_print_flush formatter ();
  Chunk.add_parameters cte_chunk (Queue.to_seq p);
  (* The name is hardcoded here, and used in [Expression.Filters.window] *)
  Chunk.add_string cte_chunk " AS group0";

  let () = create_from_chunck conf cte_chunk in
  let () =
    match filters with
    | None -> ()
    | Some filters_chunk ->
        Chunk.append ~head:cte_chunk ~tail:(Chunk.copy filters_chunk)
  in

  Chunk.add_string cte_chunk ")\n";
  cte_chunk

type filter_evaluation = {
  content : Buffer.t;
  parameters : ImportCSV.DataType.t Seq.t;
  cte : (string * Chunk.t) option;
}
(** Build the filters to apply in the query. We make the difference here
      between the predicates to apply directly in the query, and the filters
      associated with a group, which are required to be transformed into a CTE
in SQL, and are evaluated before. *)

(** Evaluate the filters on the query *)
let eval_filters : Syntax.t -> filter_evaluation =
 fun conf ->
  match conf.filters with
  | [] ->
      let empty_buffer = Buffer.create 0 in
      { content = empty_buffer; parameters = Seq.empty; cte = None }
  | filters -> (
      (* Create a new queue in order to accumulate all the parameters to bind.
         This filter will be given to both the CTE if any, or reused in the
         main query when there is no CTE.
      *)
      let chunk_filters = Chunk.create () in
      Chunk.add_string chunk_filters "\nWHERE ";

      let group = Chunk.create () in

      let with_cte, with_exr =
        List.fold_left filters ~init:(None, false)
          ~f:(fun (with_cte, with_exr) column ->
            (* The function will return an option in second position which is
               None when no Group function where found, and Some Expression
               otherwise *)
            let b = Buffer.create 16 in

            let formatter = Format.formatter_of_buffer b in
            let queue, group_found =
              Expression.Filters.query_of_expression Q.BindParam formatter
                (show_path ~conf) column
            in
            Format.pp_print_flush formatter ();
            let clause = Chunk.create' b queue in

            match (group_found, with_cte) with
            | None, _ ->
                Chunk.append ~head:chunk_filters ~tail:clause;
                Chunk.add_string chunk_filters "\nAND ";
                (with_cte, true)
            | (Some _ as group'), None ->
                (* We have a group here, we do not add it into the
                   filter_buffer right now.

                   This can occur only once, the second one will raise
                   an error. *)
                Chunk.append ~head:group ~tail:clause;
                (group', with_exr)
            | Some _, Some _ -> raise ImportErrors.MisplacedWindow)
      in

      match with_cte with
      | None ->
          let content = chunk_filters.b in
          truncate content 5;
          {
            (* There is no group clause in the query *)
            content;
            parameters = Queue.to_seq chunk_filters.parameters;
            cte = None;
          }
      | Some expression ->
          let filters =
            if with_exr then (
              (* If we have additionnals filters from the group clause, we
                 have to report them in the CTE instead of the main query. *)
              let c' = Chunk.copy chunk_filters in
              truncate c'.b 5;
              Some c')
            else None
          in

          (* Create the common expression table *)
          let cte_parameters = build_cte conf ~expression ~filters in
          Chunk.append ~head:chunk_filters ~tail:group;

          {
            content = chunk_filters.b;
            parameters = Queue.to_seq chunk_filters.parameters;
            (* The name is hardcoded here, and used in
               [Expression.Filters.window] *)
            cte = Some ("cte", cte_parameters);
          })

type query = {
  q : string;
  parameters : ImportCSV.DataType.t Seq.t;
}

(** Build the query and return also the mapping in order to identify each
    external links between files. 

    The select query will name each column with an alias, and the map allow to
    find which source is pointed by this alias. *)
let select : Syntax.t -> query * Path.t ImportExpression.T.t array =
 fun conf ->
  (* If the filters contains a group expression, we need to transform this into
     a CTE, which have to be evaluated before the main query. That’s why we are
     evaluating the filters right now.*)
  let filters = eval_filters conf in
  let b = Buffer.create 256 in
  let parameters = Queue.create () in

  Option.iter
    (fun (_, (cte : Chunk.t)) ->
      Buffer.add_buffer b cte.b;
      Queue.add_seq parameters (Queue.to_seq cte.parameters))
    filters.cte;

  (* For each column in the configuration file, add the corresponding element
     in the query.

     The Sqlite driver return the elements in an array, we create  an array to
     in order to manage the elements together.
  *)
  let headers = Array.make (List.length conf.columns) (Obj.magic None) in

  let columns = List.to_seq conf.columns |> Seq.mapi (fun i c -> (i, c)) in
  let formatter = Format.formatter_of_buffer b in
  let () =
    Format.fprintf formatter "SELECT %a"
      (Format.pp_print_seq
         ~pp_sep:(fun f () -> Format.fprintf f ",\n")
         (fun formatter (i, column) ->
           Array.set headers i column;
           let p =
             Q.query_of_expression Q.BindParam formatter (show_path ~conf)
               column
           in
           Queue.transfer p parameters;
           Format.fprintf formatter " AS result_%d" i))
      columns
  in
  Format.pp_print_flush formatter ();

  let () = create_from_chunck conf (Chunk.create' b parameters) in

  (* If the query has a CTE, link it as well. We use an INNER JOIN here because
     we want to be sure to get all the rows fetched by the CTE
  *)
  let () =
    match filters.cte with
    | None -> ()
    | Some (name, _) ->
        Buffer.add_string b "\nINNER JOIN '";
        Buffer.add_string b name;
        Buffer.add_string b "' ON ";
        Buffer.add_string b name;
        Buffer.add_string b ".id = ";
        Buffer.add_string b conf.source.name;
        Buffer.add_string b ".id"
  in

  Buffer.add_buffer b filters.content;
  Queue.add_seq parameters filters.parameters;

  let formatter = Format.formatter_of_buffer b in
  (match conf.Syntax.uniq with
  | [] -> ()
  | uniq ->
      Format.fprintf formatter "\nGROUP BY %a"
        (Format.pp_print_list
           ~pp_sep:(fun f () -> Format.fprintf f ", ")
           (fun formatter column ->
             let seq =
               Q.query_of_expression Q.BindParam formatter (show_path ~conf)
                 column
             in
             Queue.transfer seq parameters))
        uniq);
  (match conf.Syntax.sort with
  | [] -> ()
  | sort ->
      Format.fprintf formatter "\nORDER BY %a"
        (Format.pp_print_list
           ~pp_sep:(fun f () -> Format.fprintf f ", ")
           (fun formatter column ->
             let seq =
               Q.query_of_expression Q.BindParam formatter (show_path ~conf)
                 column
             in
             Queue.transfer seq parameters))
        sort);
  Format.pp_print_flush formatter ();

  ({ q = Buffer.contents b; parameters = Queue.to_seq parameters }, headers)

let check_external : Syntax.t -> Syntax.extern -> query =
 fun conf external_ ->
  let extern_table = Table.name external_.target in

  let parameters = Queue.create () in
  let internal_key_buffer = Buffer.create 16 in
  let formatter = Format.formatter_of_buffer internal_key_buffer in
  let internal_key_seq =
    Q.query_of_expression Q.BindParam formatter (show_path ~conf)
      external_.intern_key
  in
  Format.pp_print_flush formatter ();

  let external_key_buffer = Buffer.create 16 in
  let pointed_tables = pointed_tables conf external_.intern_key in
  Buffer.add_string external_key_buffer
    (print_column external_.Syntax.target
       ("key_" ^ external_.Syntax.target.name));

  (* We do a copy before the transfert because the Queue is reused later in the
     query *)
  Queue.transfer (Queue.copy internal_key_seq) parameters;

  let join_content = Buffer.contents external_key_buffer in
  let inner_content = Buffer.contents internal_key_buffer in
  let b = Buffer.create 256 in
  let formatter = Format.formatter_of_buffer b in
  let () =
    Format.fprintf formatter
      "SELECT %a%s FROM%a LEFT JOIN '%s' AS '%s' ON %t = %s WHERE %s IS NULL \
       AND %s IS NOT NULL AND %s <> ''"
      (fun formatter -> function
        | [ (table, _name) ] ->
            Format.fprintf formatter "%s, " (print_column table "id")
        | _ -> Format.fprintf formatter "-1, ")
      pointed_tables (* *)
      inner_content (* *)
      (Format.pp_print_list
         ~pp_sep:(fun f () -> Format.pp_print_text f ", ")
         (fun formatter (table, name) ->
           Format.fprintf formatter "\n'%s' AS '%s'" name table.Table.name))
      pointed_tables (* *)
      extern_table (* *)
      external_.target.name
      (prepare_key ~f:(fun b ->
           Format.pp_print_text b (Buffer.contents internal_key_buffer)))
      join_content (* *)
      join_content (* *)
      inner_content (* *)
      inner_content
  in

  Format.pp_print_flush formatter ();

  { q = Buffer.contents b; parameters = Queue.to_seq parameters }

let build_key_insert : Buffer.t -> Dependency.key -> unit =
 fun buffer { Dependency.expression; _ } ->
  let show_column : Format.formatter -> Path.column -> unit =
   fun formatter column -> Format.fprintf formatter ":col_%d" column
  in

  let formatter = Format.formatter_of_buffer buffer in

  let () =
    prepare_key formatter ~f:(fun formatter ->
        Q.query_of_expression Q.NoParam formatter show_column expression)
  in

  Format.pp_print_flush formatter ();

  ()