From 6b377719c10d5ab3343fd5221f99a4a21008e25a Mon Sep 17 00:00:00 2001 From: Sébastien Dailly Date: Thu, 14 Mar 2024 08:26:58 +0100 Subject: Initial commit --- lib/analysers/query.ml | 445 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 445 insertions(+) create mode 100644 lib/analysers/query.ml (limited to 'lib/analysers/query.ml') diff --git a/lib/analysers/query.ml b/lib/analysers/query.ml new file mode 100644 index 0000000..7a6dd2a --- /dev/null +++ b/lib/analysers/query.ml @@ -0,0 +1,445 @@ +open StdLabels +module Expression = ImportExpression +module Q = Expression.Query +module Syntax = ImportConf.Syntax +module Table = ImportConf.Table +module Path = ImportConf.Path + +let truncate buffer n = Buffer.truncate buffer (Buffer.length buffer - n) + +(** The module allow to create fragment in the query which keep together the + binderd parameters and the text of the query.contents. + + This is used a lot in order to create the CTE, where you need the create + fragment used both in the main request and partially in the CTE itself. + + The content is mutable and all the functions are returning [unit]. *) +module Chunk = struct + type t = { + b : Buffer.t; + parameters : ImportCSV.DataType.t Queue.t; + } + + let create : unit -> t = + fun () -> { b = Buffer.create 16; parameters = Queue.create () } + + let create' : Buffer.t -> ImportCSV.DataType.t Queue.t -> t = + fun b parameters -> { b; parameters } + + (* Append the element from [tail] at the end of [head] + + Tail is destroyed during the operation. + *) + let append : head:t -> tail:t -> unit = + fun ~head ~tail -> + match Buffer.length tail.b with + | 0 -> () + | _ -> + Buffer.add_buffer head.b tail.b; + Queue.transfer tail.parameters head.parameters; + () + + (** Add a litteral string in the sequence *) + let add_string : t -> string -> unit = fun t v -> Buffer.add_string t.b v + + let copy : t -> t = + fun t -> + let b = Buffer.create 16 and parameters = Queue.copy t.parameters in + Buffer.add_buffer b t.b; + { b; parameters } + + let add_parameters : t -> ImportCSV.DataType.t Seq.t -> unit = + fun t p -> Queue.add_seq t.parameters p +end + +let prepare_key : f:(Format.formatter -> unit) -> Format.formatter -> unit = + fun ~f formatter -> Format.fprintf formatter "rtrim(upper(%t))" f + +(* Collect all the tables pointed by the expression. *) +let pointed_tables : Syntax.t -> 'a Expression.T.t -> (Table.t * string) list = + fun conf expression -> + Expression.T.fold_values expression ~init:[] ~f:(fun acc path -> + let table = ImportConf.get_table_for_name conf path.Path.alias in + let table_name = Table.name table in + (table, table_name) :: acc) + |> List.sort_uniq ~cmp:Stdlib.compare + +(** Represent a column in a safe way in a query *) +let print_column : Table.t -> string -> string = + fun table column -> + String.concat ~sep:"" [ "'"; table.Table.name; "'.'"; column; "'" ] + +let create_table : Dependency.t -> string = + fun mapping -> + let b = Buffer.create 64 in + + Buffer.add_string b "CREATE TABLE '"; + Buffer.add_string b (Table.name (Dependency.table mapping)); + Buffer.add_string b "' (id INTEGER PRIMARY KEY"; + + List.iter (Dependency.keys mapping) ~f:(fun { Dependency.name; _ } -> + Buffer.add_string b ",'key_"; + Buffer.add_string b name; + Buffer.add_string b "'"); + + ImportContainers.IntSet.iter (Dependency.columns mapping) ~f:(fun i -> + Buffer.add_string b ",'col_"; + Buffer.add_string b (string_of_int i); + Buffer.add_string b "'"); + Buffer.add_string b ")"; + + Buffer.contents b + +let show_path : conf:Syntax.t -> Format.formatter -> Path.t -> unit = + fun ~conf buffer { alias; column } -> + let table = ImportConf.get_table_for_name conf alias in + let table_name = table.Table.name in + Format.fprintf buffer "'%s'.col_%d" table_name column + +(** Extract the informations from the dependancies. We get two informations here : + + - the join query in order to load the data from the external column + - the column corresponding to the key in order to identify the missing + links later. + *) +let query_of_external : + conf:Syntax.t -> join_buffer:Chunk.t -> Syntax.extern -> unit = + fun ~conf ~join_buffer external_ -> + let extern_table = Table.name external_.target in + + let formatter = Format.formatter_of_buffer join_buffer.b in + Format.fprintf formatter "\nLEFT JOIN '%s' AS '%s' ON %t = %s" extern_table + external_.target.name + (prepare_key ~f:(fun f -> + let q = + Q.query_of_expression Q.BindParam f (show_path ~conf) + external_.intern_key + in + + Chunk.add_parameters join_buffer (Queue.to_seq q))) + (print_column external_.Syntax.target + ("key_" ^ external_.Syntax.target.name)); + + Format.pp_print_flush formatter () + +(** Create the from part of the query, adding all the reuired externals. + + SQLite is able to optimize the query and do not load the table not used in + the select clause. *) +let create_from_chunck : Syntax.t -> Chunk.t -> unit = + fun conf c -> + Chunk.add_string c "\nFROM '"; + Chunk.add_string c (Table.name conf.source); + Chunk.add_string c "' AS '"; + Chunk.add_string c conf.source.name; + Chunk.add_string c "'"; + + (* Add the externals in the query *) + List.iter conf.externals ~f:(query_of_external ~conf ~join_buffer:c) + +(** Build a CTE query in order to use any group function inside the query. + Return the binded parameters used in the expression. The buffer given in + argument is also modified during the construction. + + If filters is not None, the clauses are added to the CTE. *) +let build_cte : + Syntax.t -> + expression:'a Expression.T.t -> + filters:Chunk.t option -> + Chunk.t = + fun conf ~expression ~filters -> + (* The binded parameters queue will be used later in the full query *) + let cte_chunk = Chunk.create () in + + Chunk.add_string cte_chunk "WITH cte AS (SELECT "; + Chunk.add_string cte_chunk conf.source.name; + Chunk.add_string cte_chunk ".id, "; + + let formatter = Format.formatter_of_buffer cte_chunk.b in + + let p = + Q.query_of_expression Q.BindParam formatter (show_path ~conf) expression + in + Format.pp_print_flush formatter (); + Chunk.add_parameters cte_chunk (Queue.to_seq p); + (* The name is hardcoded here, and used in [Expression.Filters.window] *) + Chunk.add_string cte_chunk " AS group0"; + + let () = create_from_chunck conf cte_chunk in + let () = + match filters with + | None -> () + | Some filters_chunk -> + Chunk.append ~head:cte_chunk ~tail:(Chunk.copy filters_chunk) + in + + Chunk.add_string cte_chunk ")\n"; + cte_chunk + +type filter_evaluation = { + content : Buffer.t; + parameters : ImportCSV.DataType.t Seq.t; + cte : (string * Chunk.t) option; +} +(** Build the filters to apply in the query. We make the difference here + between the predicates to apply directly in the query, and the filters + associated with a group, which are required to be transformed into a CTE +in SQL, and are evaluated before. *) + +(** Evaluate the filters on the query *) +let eval_filters : Syntax.t -> filter_evaluation = + fun conf -> + match conf.filters with + | [] -> + let empty_buffer = Buffer.create 0 in + { content = empty_buffer; parameters = Seq.empty; cte = None } + | filters -> ( + (* Create a new queue in order to accumulate all the parameters to bind. + This filter will be given to both the CTE if any, or reused in the + main query when there is no CTE. + *) + let chunk_filters = Chunk.create () in + Chunk.add_string chunk_filters "\nWHERE "; + + let group = Chunk.create () in + + let with_cte, with_exr = + List.fold_left filters ~init:(None, false) + ~f:(fun (with_cte, with_exr) column -> + (* The function will return an option in second position which is + None when no Group function where found, and Some Expression + otherwise *) + let b = Buffer.create 16 in + + let formatter = Format.formatter_of_buffer b in + let queue, group_found = + Expression.Filters.query_of_expression Q.BindParam formatter + (show_path ~conf) column + in + Format.pp_print_flush formatter (); + let clause = Chunk.create' b queue in + + match (group_found, with_cte) with + | None, _ -> + Chunk.append ~head:chunk_filters ~tail:clause; + Chunk.add_string chunk_filters "\nAND "; + (with_cte, true) + | (Some _ as group'), None -> + (* We have a group here, we do not add it into the + filter_buffer right now. + + This can occur only once, the second one will raise + an error. *) + Chunk.append ~head:group ~tail:clause; + (group', with_exr) + | Some _, Some _ -> raise ImportErrors.MisplacedWindow) + in + + match with_cte with + | None -> + let content = chunk_filters.b in + truncate content 5; + { + (* There is no group clause in the query *) + content; + parameters = Queue.to_seq chunk_filters.parameters; + cte = None; + } + | Some expression -> + let filters = + if with_exr then ( + (* If we have additionnals filters from the group clause, we + have to report them in the CTE instead of the main query. *) + let c' = Chunk.copy chunk_filters in + truncate c'.b 5; + Some c') + else None + in + + (* Create the common expression table *) + let cte_parameters = build_cte conf ~expression ~filters in + Chunk.append ~head:chunk_filters ~tail:group; + + { + content = chunk_filters.b; + parameters = Queue.to_seq chunk_filters.parameters; + (* The name is hardcoded here, and used in + [Expression.Filters.window] *) + cte = Some ("cte", cte_parameters); + }) + +type query = { + q : string; + parameters : ImportCSV.DataType.t Seq.t; +} + +(** Build the query and return also the mapping in order to identify each + external links between files. + + The select query will name each column with an alias, and the map allow to + find which source is pointed by this alias. *) +let select : Syntax.t -> query * Path.t ImportExpression.T.t array = + fun conf -> + (* If the filters contains a group expression, we need to transform this into + a CTE, which have to be evaluated before the main query. That’s why we are + evaluating the filters right now.*) + let filters = eval_filters conf in + let b = Buffer.create 256 in + let parameters = Queue.create () in + + Option.iter + (fun (_, (cte : Chunk.t)) -> + Buffer.add_buffer b cte.b; + Queue.add_seq parameters (Queue.to_seq cte.parameters)) + filters.cte; + + (* For each column in the configuration file, add the corresponding element + in the query. + + The Sqlite driver return the elements in an array, we create an array to + in order to manage the elements together. + *) + let headers = Array.make (List.length conf.columns) (Obj.magic None) in + + let columns = List.to_seq conf.columns |> Seq.mapi (fun i c -> (i, c)) in + let formatter = Format.formatter_of_buffer b in + let () = + Format.fprintf formatter "SELECT %a" + (Format.pp_print_seq + ~pp_sep:(fun f () -> Format.fprintf f ",\n") + (fun formatter (i, column) -> + Array.set headers i column; + let p = + Q.query_of_expression Q.BindParam formatter (show_path ~conf) + column + in + Queue.transfer p parameters; + Format.fprintf formatter " AS result_%d" i)) + columns + in + Format.pp_print_flush formatter (); + + let () = create_from_chunck conf (Chunk.create' b parameters) in + + (* If the query has a CTE, link it as well. We use an INNER JOIN here because + we want to be sure to get all the rows fetched by the CTE + *) + let () = + match filters.cte with + | None -> () + | Some (name, _) -> + Buffer.add_string b "\nINNER JOIN '"; + Buffer.add_string b name; + Buffer.add_string b "' ON "; + Buffer.add_string b name; + Buffer.add_string b ".id = "; + Buffer.add_string b conf.source.name; + Buffer.add_string b ".id" + in + + Buffer.add_buffer b filters.content; + Queue.add_seq parameters filters.parameters; + + let formatter = Format.formatter_of_buffer b in + (match conf.Syntax.uniq with + | [] -> () + | uniq -> + Format.fprintf formatter "\nGROUP BY %a" + (Format.pp_print_list + ~pp_sep:(fun f () -> Format.fprintf f ", ") + (fun formatter column -> + let seq = + Q.query_of_expression Q.BindParam formatter (show_path ~conf) + column + in + Queue.transfer seq parameters)) + uniq); + (match conf.Syntax.sort with + | [] -> () + | sort -> + Format.fprintf formatter "\nORDER BY %a" + (Format.pp_print_list + ~pp_sep:(fun f () -> Format.fprintf f ", ") + (fun formatter column -> + let seq = + Q.query_of_expression Q.BindParam formatter (show_path ~conf) + column + in + Queue.transfer seq parameters)) + sort); + Format.pp_print_flush formatter (); + + ({ q = Buffer.contents b; parameters = Queue.to_seq parameters }, headers) + +let check_external : Syntax.t -> Syntax.extern -> query = + fun conf external_ -> + let extern_table = Table.name external_.target in + + let parameters = Queue.create () in + let internal_key_buffer = Buffer.create 16 in + let formatter = Format.formatter_of_buffer internal_key_buffer in + let internal_key_seq = + Q.query_of_expression Q.BindParam formatter (show_path ~conf) + external_.intern_key + in + Format.pp_print_flush formatter (); + + let external_key_buffer = Buffer.create 16 in + let pointed_tables = pointed_tables conf external_.intern_key in + Buffer.add_string external_key_buffer + (print_column external_.Syntax.target + ("key_" ^ external_.Syntax.target.name)); + + (* We do a copy before the transfert because the Queue is reused later in the + query *) + Queue.transfer (Queue.copy internal_key_seq) parameters; + + let join_content = Buffer.contents external_key_buffer in + let inner_content = Buffer.contents internal_key_buffer in + let b = Buffer.create 256 in + let formatter = Format.formatter_of_buffer b in + let () = + Format.fprintf formatter + "SELECT %a%s FROM%a LEFT JOIN '%s' AS '%s' ON %t = %s WHERE %s IS NULL \ + AND %s IS NOT NULL AND %s <> ''" + (fun formatter -> function + | [ (table, _name) ] -> + Format.fprintf formatter "%s, " (print_column table "id") + | _ -> Format.fprintf formatter "-1, ") + pointed_tables (* *) + inner_content (* *) + (Format.pp_print_list + ~pp_sep:(fun f () -> Format.pp_print_text f ", ") + (fun formatter (table, name) -> + Format.fprintf formatter "\n'%s' AS '%s'" name table.Table.name)) + pointed_tables (* *) + extern_table (* *) + external_.target.name + (prepare_key ~f:(fun b -> + Format.pp_print_text b (Buffer.contents internal_key_buffer))) + join_content (* *) + join_content (* *) + inner_content (* *) + inner_content + in + + Format.pp_print_flush formatter (); + + { q = Buffer.contents b; parameters = Queue.to_seq parameters } + +let build_key_insert : Buffer.t -> Dependency.key -> unit = + fun buffer { Dependency.expression; _ } -> + let show_column : Format.formatter -> Path.column -> unit = + fun formatter column -> Format.fprintf formatter ":col_%d" column + in + + let formatter = Format.formatter_of_buffer buffer in + + let () = + prepare_key formatter ~f:(fun formatter -> + Q.query_of_expression Q.NoParam formatter show_column expression) + in + + Format.pp_print_flush formatter (); + + () -- cgit v1.2.3