+open StdLabels
+module Expression = ImportExpression
+module Q = Expression.Query
+module Syntax = ImportConf.Syntax
+module Table = ImportConf.Table
+module Path = ImportConf.Path
+let truncate buffer n = Buffer.truncate buffer (Buffer.length buffer - n)
+(** The module allow to create fragment in the query which keep together the
+ binderd parameters and the text of the query.contents.
+ This is used a lot in order to create the CTE, where you need the create
+ fragment used both in the main request and partially in the CTE itself.
+ The content is mutable and all the functions are returning [unit]. *)
+module Chunk = struct
+ type t = {
+ b : Buffer.t;
+ parameters : ImportCSV.DataType.t Queue.t;
+ }
+ let create : unit -> t =
+ fun () -> { b = Buffer.create 16; parameters = Queue.create () }
+ let create' : Buffer.t -> ImportCSV.DataType.t Queue.t -> t =
+ fun b parameters -> { b; parameters }
+ (* Append the element from [tail] at the end of [head]
+ Tail is destroyed during the operation.
+ *)
+ let append : head:t -> tail:t -> unit =
+ fun ~head ~tail ->
+ match Buffer.length tail.b with
+ | 0 -> ()
+ | _ ->
+ Buffer.add_buffer head.b tail.b;
+ Queue.transfer tail.parameters head.parameters;
+ ()
+ (** Add a litteral string in the sequence *)
+ let add_string : t -> string -> unit = fun t v -> Buffer.add_string t.b v
+ let copy : t -> t =
+ fun t ->
+ let b = Buffer.create 16 and parameters = Queue.copy t.parameters in
+ Buffer.add_buffer b t.b;
+ { b; parameters }
+ let add_parameters : t -> ImportCSV.DataType.t Seq.t -> unit =
+ fun t p -> Queue.add_seq t.parameters p
+let prepare_key : f:(Format.formatter -> unit) -> Format.formatter -> unit =
+ fun ~f formatter -> Format.fprintf formatter "rtrim(upper(%t))" f
+(* Collect all the tables pointed by the expression. *)
+let pointed_tables : Syntax.t -> 'a Expression.T.t -> (Table.t * string) list =
+ fun conf expression ->
+ Expression.T.fold_values expression ~init:[] ~f:(fun acc path ->
+ let table = ImportConf.get_table_for_name conf path.Path.alias in
+ let table_name = Table.name table in
+ (table, table_name) :: acc)
+ |> List.sort_uniq ~cmp:Stdlib.compare
+(** Represent a column in a safe way in a query *)
+let print_column : Table.t -> string -> string =
+ fun table column ->
+ String.concat ~sep:"" [ "'"; table.Table.name; "'.'"; column; "'" ]
+let create_table : Dependency.t -> string =
+ fun mapping ->
+ let b = Buffer.create 64 in
+ Buffer.add_string b "CREATE TABLE '";
+ Buffer.add_string b (Table.name (Dependency.table mapping));
+ Buffer.add_string b "' (id INTEGER PRIMARY KEY";
+ List.iter (Dependency.keys mapping) ~f:(fun { Dependency.name; _ } ->
+ Buffer.add_string b ",'key_";
+ Buffer.add_string b name;
+ Buffer.add_string b "'");
+ ImportContainers.IntSet.iter (Dependency.columns mapping) ~f:(fun i ->
+ Buffer.add_string b ",'col_";
+ Buffer.add_string b (string_of_int i);
+ Buffer.add_string b "'");
+ Buffer.add_string b ")";
+ Buffer.contents b
+let show_path : conf:Syntax.t -> Format.formatter -> Path.t -> unit =
+ fun ~conf buffer { alias; column } ->
+ let table = ImportConf.get_table_for_name conf alias in
+ let table_name = table.Table.name in
+ Format.fprintf buffer "'%s'.col_%d" table_name column
+(** Extract the informations from the dependancies. We get two informations here :
+ - the join query in order to load the data from the external column
+ - the column corresponding to the key in order to identify the missing
+ links later.
+ *)
+let query_of_external :
+ conf:Syntax.t -> join_buffer:Chunk.t -> Syntax.extern -> unit =
+ fun ~conf ~join_buffer external_ ->
+ let extern_table = Table.name external_.target in
+ let formatter = Format.formatter_of_buffer join_buffer.b in
+ Format.fprintf formatter "\nLEFT JOIN '%s' AS '%s' ON %t = %s" extern_table
+ external_.target.name
+ (prepare_key ~f:(fun f ->
+ let q =
+ Q.query_of_expression Q.BindParam f (show_path ~conf)
+ external_.intern_key
+ in
+ Chunk.add_parameters join_buffer (Queue.to_seq q)))
+ (print_column external_.Syntax.target
+ ("key_" ^ external_.Syntax.target.name));
+ Format.pp_print_flush formatter ()
+(** Create the from part of the query, adding all the reuired externals.
+ SQLite is able to optimize the query and do not load the table not used in
+ the select clause. *)
+let create_from_chunck : Syntax.t -> Chunk.t -> unit =
+ fun conf c ->
+ Chunk.add_string c "\nFROM '";
+ Chunk.add_string c (Table.name conf.source);
+ Chunk.add_string c "' AS '";
+ Chunk.add_string c conf.source.name;
+ Chunk.add_string c "'";
+ (* Add the externals in the query *)
+ List.iter conf.externals ~f:(query_of_external ~conf ~join_buffer:c)
+(** Build a CTE query in order to use any group function inside the query.
+ Return the binded parameters used in the expression. The buffer given in
+ argument is also modified during the construction.
+ If filters is not None, the clauses are added to the CTE. *)
+let build_cte :
+ Syntax.t ->
+ expression:'a Expression.T.t ->
+ filters:Chunk.t option ->
+ Chunk.t =
+ fun conf ~expression ~filters ->
+ (* The binded parameters queue will be used later in the full query *)
+ let cte_chunk = Chunk.create () in
+ Chunk.add_string cte_chunk "WITH cte AS (SELECT ";
+ Chunk.add_string cte_chunk conf.source.name;
+ Chunk.add_string cte_chunk ".id, ";
+ let formatter = Format.formatter_of_buffer cte_chunk.b in
+ let p =
+ Q.query_of_expression Q.BindParam formatter (show_path ~conf) expression
+ in
+ Format.pp_print_flush formatter ();
+ Chunk.add_parameters cte_chunk (Queue.to_seq p);
+ (* The name is hardcoded here, and used in [Expression.Filters.window] *)
+ Chunk.add_string cte_chunk " AS group0";
+ let () = create_from_chunck conf cte_chunk in
+ let () =
+ match filters with
+ | None -> ()
+ | Some filters_chunk ->
+ Chunk.append ~head:cte_chunk ~tail:(Chunk.copy filters_chunk)
+ in
+ Chunk.add_string cte_chunk ")\n";
+ cte_chunk
+type filter_evaluation = {
+ content : Buffer.t;
+ parameters : ImportCSV.DataType.t Seq.t;
+ cte : (string * Chunk.t) option;
+(** Build the filters to apply in the query. We make the difference here
+ between the predicates to apply directly in the query, and the filters
+ associated with a group, which are required to be transformed into a CTE
+in SQL, and are evaluated before. *)
+(** Evaluate the filters on the query *)
+let eval_filters : Syntax.t -> filter_evaluation =
+ fun conf ->
+ match conf.filters with
+ | [] ->
+ let empty_buffer = Buffer.create 0 in
+ { content = empty_buffer; parameters = Seq.empty; cte = None }
+ | filters -> (
+ (* Create a new queue in order to accumulate all the parameters to bind.
+ This filter will be given to both the CTE if any, or reused in the
+ main query when there is no CTE.
+ *)
+ let chunk_filters = Chunk.create () in
+ Chunk.add_string chunk_filters "\nWHERE ";
+ let group = Chunk.create () in
+ let with_cte, with_exr =
+ List.fold_left filters ~init:(None, false)
+ ~f:(fun (with_cte, with_exr) column ->
+ (* The function will return an option in second position which is
+ None when no Group function where found, and Some Expression
+ otherwise *)
+ let b = Buffer.create 16 in
+ let formatter = Format.formatter_of_buffer b in
+ let queue, group_found =
+ Expression.Filters.query_of_expression Q.BindParam formatter
+ (show_path ~conf) column
+ in
+ Format.pp_print_flush formatter ();
+ let clause = Chunk.create' b queue in
+ match (group_found, with_cte) with
+ | None, _ ->
+ Chunk.append ~head:chunk_filters ~tail:clause;
+ Chunk.add_string chunk_filters "\nAND ";
+ (with_cte, true)
+ | (Some _ as group'), None ->
+ (* We have a group here, we do not add it into the
+ filter_buffer right now.
+ This can occur only once, the second one will raise
+ an error. *)
+ Chunk.append ~head:group ~tail:clause;
+ (group', with_exr)
+ | Some _, Some _ -> raise ImportErrors.MisplacedWindow)
+ in
+ match with_cte with
+ | None ->
+ let content = chunk_filters.b in
+ truncate content 5;
+ {
+ (* There is no group clause in the query *)
+ content;
+ parameters = Queue.to_seq chunk_filters.parameters;
+ cte = None;
+ }
+ | Some expression ->
+ let filters =
+ if with_exr then (
+ (* If we have additionnals filters from the group clause, we
+ have to report them in the CTE instead of the main query. *)
+ let c' = Chunk.copy chunk_filters in
+ truncate c'.b 5;
+ Some c')
+ else None
+ in
+ (* Create the common expression table *)
+ let cte_parameters = build_cte conf ~expression ~filters in
+ Chunk.append ~head:chunk_filters ~tail:group;
+ {
+ content = chunk_filters.b;
+ parameters = Queue.to_seq chunk_filters.parameters;
+ (* The name is hardcoded here, and used in
+ [Expression.Filters.window] *)
+ cte = Some ("cte", cte_parameters);
+ })
+type query = {
+ q : string;
+ parameters : ImportCSV.DataType.t Seq.t;
+(** Build the query and return also the mapping in order to identify each
+ external links between files.
+ The select query will name each column with an alias, and the map allow to
+ find which source is pointed by this alias. *)
+let select : Syntax.t -> query * Path.t ImportExpression.T.t array =
+ fun conf ->
+ (* If the filters contains a group expression, we need to transform this into
+ a CTE, which have to be evaluated before the main query. That’s why we are
+ evaluating the filters right now.*)
+ let filters = eval_filters conf in
+ let b = Buffer.create 256 in
+ let parameters = Queue.create () in
+ Option.iter
+ (fun (_, (cte : Chunk.t)) ->
+ Buffer.add_buffer b cte.b;
+ Queue.add_seq parameters (Queue.to_seq cte.parameters))
+ filters.cte;
+ (* For each column in the configuration file, add the corresponding element
+ in the query.
+ The Sqlite driver return the elements in an array, we create an array to
+ in order to manage the elements together.
+ *)
+ let headers = Array.make (List.length conf.columns) (Obj.magic None) in
+ let columns = List.to_seq conf.columns |> Seq.mapi (fun i c -> (i, c)) in
+ let formatter = Format.formatter_of_buffer b in
+ let () =
+ Format.fprintf formatter "SELECT %a"
+ (Format.pp_print_seq
+ ~pp_sep:(fun f () -> Format.fprintf f ",\n")
+ (fun formatter (i, column) ->
+ Array.set headers i column;
+ let p =
+ Q.query_of_expression Q.BindParam formatter (show_path ~conf)
+ column
+ in
+ Queue.transfer p parameters;
+ Format.fprintf formatter " AS result_%d" i))
+ columns
+ in
+ Format.pp_print_flush formatter ();
+ let () = create_from_chunck conf (Chunk.create' b parameters) in
+ (* If the query has a CTE, link it as well. We use an INNER JOIN here because
+ we want to be sure to get all the rows fetched by the CTE
+ *)
+ let () =
+ match filters.cte with
+ | None -> ()
+ | Some (name, _) ->
+ Buffer.add_string b "\nINNER JOIN '";
+ Buffer.add_string b name;
+ Buffer.add_string b "' ON ";
+ Buffer.add_string b name;
+ Buffer.add_string b ".id = ";
+ Buffer.add_string b conf.source.name;
+ Buffer.add_string b ".id"
+ in
+ Buffer.add_buffer b filters.content;
+ Queue.add_seq parameters filters.parameters;
+ let formatter = Format.formatter_of_buffer b in
+ (match conf.Syntax.uniq with
+ | [] -> ()
+ | uniq ->
+ Format.fprintf formatter "\nGROUP BY %a"
+ (Format.pp_print_list
+ ~pp_sep:(fun f () -> Format.fprintf f ", ")
+ (fun formatter column ->
+ let seq =
+ Q.query_of_expression Q.BindParam formatter (show_path ~conf)
+ column
+ in
+ Queue.transfer seq parameters))
+ uniq);
+ (match conf.Syntax.sort with
+ | [] -> ()
+ | sort ->
+ Format.fprintf formatter "\nORDER BY %a"
+ (Format.pp_print_list
+ ~pp_sep:(fun f () -> Format.fprintf f ", ")
+ (fun formatter column ->
+ let seq =
+ Q.query_of_expression Q.BindParam formatter (show_path ~conf)
+ column
+ in
+ Queue.transfer seq parameters))
+ sort);
+ Format.pp_print_flush formatter ();
+ ({ q = Buffer.contents b; parameters = Queue.to_seq parameters }, headers)
+let check_external : Syntax.t -> Syntax.extern -> query =
+ fun conf external_ ->
+ let extern_table = Table.name external_.target in
+ let parameters = Queue.create () in
+ let internal_key_buffer = Buffer.create 16 in
+ let formatter = Format.formatter_of_buffer internal_key_buffer in
+ let internal_key_seq =
+ Q.query_of_expression Q.BindParam formatter (show_path ~conf)
+ external_.intern_key
+ in
+ Format.pp_print_flush formatter ();
+ let external_key_buffer = Buffer.create 16 in
+ let pointed_tables = pointed_tables conf external_.intern_key in
+ Buffer.add_string external_key_buffer
+ (print_column external_.Syntax.target
+ ("key_" ^ external_.Syntax.target.name));
+ (* We do a copy before the transfert because the Queue is reused later in the
+ query *)
+ Queue.transfer (Queue.copy internal_key_seq) parameters;
+ let join_content = Buffer.contents external_key_buffer in
+ let inner_content = Buffer.contents internal_key_buffer in
+ let b = Buffer.create 256 in
+ let formatter = Format.formatter_of_buffer b in
+ let () =
+ Format.fprintf formatter
+ "SELECT %a%s FROM%a LEFT JOIN '%s' AS '%s' ON %t = %s WHERE %s IS NULL \
+ AND %s IS NOT NULL AND %s <> ''"
+ (fun formatter -> function
+ | [ (table, _name) ] ->
+ Format.fprintf formatter "%s, " (print_column table "id")
+ | _ -> Format.fprintf formatter "-1, ")
+ pointed_tables (* *)
+ inner_content (* *)
+ (Format.pp_print_list
+ ~pp_sep:(fun f () -> Format.pp_print_text f ", ")
+ (fun formatter (table, name) ->
+ Format.fprintf formatter "\n'%s' AS '%s'" name table.Table.name))
+ pointed_tables (* *)
+ extern_table (* *)
+ external_.target.name
+ (prepare_key ~f:(fun b ->
+ Format.pp_print_text b (Buffer.contents internal_key_buffer)))
+ join_content (* *)
+ join_content (* *)
+ inner_content (* *)
+ inner_content
+ in
+ Format.pp_print_flush formatter ();
+ { q = Buffer.contents b; parameters = Queue.to_seq parameters }
+let build_key_insert : Buffer.t -> Dependency.key -> unit =
+ fun buffer { Dependency.expression; _ } ->
+ let show_column : Format.formatter -> Path.column -> unit =
+ fun formatter column -> Format.fprintf formatter ":col_%d" column
+ in
+ let formatter = Format.formatter_of_buffer buffer in
+ let () =
+ prepare_key formatter ~f:(fun formatter ->
+ Q.query_of_expression Q.NoParam formatter show_column expression)
+ in
+ Format.pp_print_flush formatter ();
+ ()