open StdLabels module Expression = ImportExpression module Q = Expression.Query module Syntax = ImportConf.Syntax module Table = ImportConf.Table module Path = ImportConf.Path let truncate buffer n = Buffer.truncate buffer (Buffer.length buffer - n) (** The module allow to create fragment in the query which keep together the binderd parameters and the text of the query.contents. This is used a lot in order to create the CTE, where you need the create fragment used both in the main request and partially in the CTE itself. The content is mutable and all the functions are returning [unit]. *) module Chunk = struct type t = { b : Buffer.t; parameters : ImportCSV.DataType.t Queue.t; } let create : unit -> t = fun () -> { b = Buffer.create 16; parameters = Queue.create () } let create' : Buffer.t -> ImportCSV.DataType.t Queue.t -> t = fun b parameters -> { b; parameters } (* Append the element from [tail] at the end of [head] Tail is destroyed during the operation. *) let append : head:t -> tail:t -> unit = fun ~head ~tail -> match Buffer.length tail.b with | 0 -> () | _ -> Buffer.add_buffer head.b tail.b; Queue.transfer tail.parameters head.parameters; () (** Add a litteral string in the sequence *) let add_string : t -> string -> unit = fun t v -> Buffer.add_string t.b v let copy : t -> t = fun t -> let b = Buffer.create 16 and parameters = Queue.copy t.parameters in Buffer.add_buffer b t.b; { b; parameters } let add_parameters : t -> ImportCSV.DataType.t Seq.t -> unit = fun t p -> Queue.add_seq t.parameters p end let prepare_key : f:(Format.formatter -> unit) -> Format.formatter -> unit = fun ~f formatter -> Format.fprintf formatter "rtrim(upper(%t))" f (* Collect all the tables pointed by the expression. *) let pointed_tables : Syntax.t -> 'a Expression.T.t -> (Table.t * string) list = fun conf expression -> Expression.T.fold_values expression ~init:[] ~f:(fun acc path -> let table = ImportConf.get_table_for_name conf path.Path.alias in let table_name = Table.name table in (table, table_name) :: acc) |> List.sort_uniq ~cmp:Stdlib.compare (** Represent a column in a safe way in a query *) let print_column : Table.t -> string -> string = fun table column -> String.concat ~sep:"" [ "'"; table.Table.name; "'.'"; column; "'" ] let create_table : Dependency.t -> string = fun mapping -> let b = Buffer.create 64 in Buffer.add_string b "CREATE TABLE '"; Buffer.add_string b (Table.name (Dependency.table mapping)); Buffer.add_string b "' (id INTEGER PRIMARY KEY"; List.iter (Dependency.keys mapping) ~f:(fun { Dependency.name; _ } -> Buffer.add_string b ",'key_"; Buffer.add_string b name; Buffer.add_string b "'"); ImportContainers.IntSet.iter (Dependency.columns mapping) ~f:(fun i -> Buffer.add_string b ",'col_"; Buffer.add_string b (string_of_int i); Buffer.add_string b "'"); Buffer.add_string b ")"; Buffer.contents b let show_path : conf:Syntax.t -> Format.formatter -> Path.t -> unit = fun ~conf buffer { alias; column } -> let table = ImportConf.get_table_for_name conf alias in let table_name = table.Table.name in Format.fprintf buffer "'%s'.col_%d" table_name column (** Extract the informations from the dependancies. We get two informations here : - the join query in order to load the data from the external column - the column corresponding to the key in order to identify the missing links later. *) let query_of_external : conf:Syntax.t -> join_buffer:Chunk.t -> Syntax.extern -> unit = fun ~conf ~join_buffer external_ -> let extern_table = Table.name external_.target in let formatter = Format.formatter_of_buffer join_buffer.b in Format.fprintf formatter "\nLEFT JOIN '%s' AS '%s' ON %t = %s" extern_table external_.target.name (prepare_key ~f:(fun f -> let q = Q.query_of_expression Q.BindParam f (show_path ~conf) external_.intern_key in Chunk.add_parameters join_buffer (Queue.to_seq q))) (print_column external_.Syntax.target ("key_" ^ external_.Syntax.target.name)); Format.pp_print_flush formatter () (** Create the from part of the query, adding all the reuired externals. SQLite is able to optimize the query and do not load the table not used in the select clause. *) let create_from_chunck : Syntax.t -> Chunk.t -> unit = fun conf c -> Chunk.add_string c "\nFROM '"; Chunk.add_string c (Table.name conf.source); Chunk.add_string c "' AS '"; Chunk.add_string c conf.source.name; Chunk.add_string c "'"; (* Add the externals in the query *) List.iter conf.externals ~f:(query_of_external ~conf ~join_buffer:c) (** Build a CTE query in order to use any group function inside the query. Return the binded parameters used in the expression. The buffer given in argument is also modified during the construction. If filters is not None, the clauses are added to the CTE. *) let build_cte : Syntax.t -> expression:'a Expression.T.t -> filters:Chunk.t option -> Chunk.t = fun conf ~expression ~filters -> (* The binded parameters queue will be used later in the full query *) let cte_chunk = Chunk.create () in Chunk.add_string cte_chunk "WITH cte AS (SELECT "; Chunk.add_string cte_chunk conf.source.name; Chunk.add_string cte_chunk ".id, "; let formatter = Format.formatter_of_buffer cte_chunk.b in let p = Q.query_of_expression Q.BindParam formatter (show_path ~conf) expression in Format.pp_print_flush formatter (); Chunk.add_parameters cte_chunk (Queue.to_seq p); (* The name is hardcoded here, and used in [Expression.Filters.window] *) Chunk.add_string cte_chunk " AS group0"; let () = create_from_chunck conf cte_chunk in let () = match filters with | None -> () | Some filters_chunk -> Chunk.append ~head:cte_chunk ~tail:(Chunk.copy filters_chunk) in Chunk.add_string cte_chunk ")\n"; cte_chunk type filter_evaluation = { content : Buffer.t; parameters : ImportCSV.DataType.t Seq.t; cte : (string * Chunk.t) option; } (** Build the filters to apply in the query. We make the difference here between the predicates to apply directly in the query, and the filters associated with a group, which are required to be transformed into a CTE in SQL, and are evaluated before. *) (** Evaluate the filters on the query *) let eval_filters : Syntax.t -> filter_evaluation = fun conf -> match conf.filters with | [] -> let empty_buffer = Buffer.create 0 in { content = empty_buffer; parameters = Seq.empty; cte = None } | filters -> ( (* Create a new queue in order to accumulate all the parameters to bind. This filter will be given to both the CTE if any, or reused in the main query when there is no CTE. *) let chunk_filters = Chunk.create () in Chunk.add_string chunk_filters "\nWHERE "; let group = Chunk.create () in let with_cte, with_exr = List.fold_left filters ~init:(None, false) ~f:(fun (with_cte, with_exr) column -> (* The function will return an option in second position which is None when no Group function where found, and Some Expression otherwise *) let b = Buffer.create 16 in let formatter = Format.formatter_of_buffer b in let queue, group_found = Expression.Filters.query_of_expression Q.BindParam formatter (show_path ~conf) column in Format.pp_print_flush formatter (); let clause = Chunk.create' b queue in match (group_found, with_cte) with | None, _ -> Chunk.append ~head:chunk_filters ~tail:clause; Chunk.add_string chunk_filters "\nAND "; (with_cte, true) | (Some _ as group'), None -> (* We have a group here, we do not add it into the filter_buffer right now. This can occur only once, the second one will raise an error. *) Chunk.append ~head:group ~tail:clause; (group', with_exr) | Some _, Some _ -> raise ImportErrors.MisplacedWindow) in match with_cte with | None -> let content = chunk_filters.b in truncate content 5; { (* There is no group clause in the query *) content; parameters = Queue.to_seq chunk_filters.parameters; cte = None; } | Some expression -> let filters = if with_exr then ( (* If we have additionnals filters from the group clause, we have to report them in the CTE instead of the main query. *) let c' = Chunk.copy chunk_filters in truncate c'.b 5; Some c') else None in (* Create the common expression table *) let cte_parameters = build_cte conf ~expression ~filters in Chunk.append ~head:chunk_filters ~tail:group; { content = chunk_filters.b; parameters = Queue.to_seq chunk_filters.parameters; (* The name is hardcoded here, and used in [Expression.Filters.window] *) cte = Some ("cte", cte_parameters); }) type query = { q : string; parameters : ImportCSV.DataType.t Seq.t; } (** Build the query and return also the mapping in order to identify each external links between files. The select query will name each column with an alias, and the map allow to find which source is pointed by this alias. *) let select : Syntax.t -> query * Path.t ImportExpression.T.t array = fun conf -> (* If the filters contains a group expression, we need to transform this into a CTE, which have to be evaluated before the main query. That’s why we are evaluating the filters right now.*) let filters = eval_filters conf in let b = Buffer.create 256 in let parameters = Queue.create () in Option.iter (fun (_, (cte : Chunk.t)) -> Buffer.add_buffer b cte.b; Queue.add_seq parameters (Queue.to_seq cte.parameters)) filters.cte; (* For each column in the configuration file, add the corresponding element in the query. The Sqlite driver return the elements in an array, we create an array to in order to manage the elements together. *) let headers = Array.make (List.length conf.columns) (Obj.magic None) in let columns = List.to_seq conf.columns |> Seq.mapi (fun i c -> (i, c)) in let formatter = Format.formatter_of_buffer b in let () = Format.fprintf formatter "SELECT %a" (Format.pp_print_seq ~pp_sep:(fun f () -> Format.fprintf f ",\n") (fun formatter (i, column) -> Array.set headers i column; let p = Q.query_of_expression Q.BindParam formatter (show_path ~conf) column in Queue.transfer p parameters; Format.fprintf formatter " AS result_%d" i)) columns in Format.pp_print_flush formatter (); let () = create_from_chunck conf (Chunk.create' b parameters) in (* If the query has a CTE, link it as well. We use an INNER JOIN here because we want to be sure to get all the rows fetched by the CTE *) let () = match filters.cte with | None -> () | Some (name, _) -> Buffer.add_string b "\nINNER JOIN '"; Buffer.add_string b name; Buffer.add_string b "' ON "; Buffer.add_string b name; Buffer.add_string b ".id = "; Buffer.add_string b conf.source.name; Buffer.add_string b ".id" in Buffer.add_buffer b filters.content; Queue.add_seq parameters filters.parameters; let formatter = Format.formatter_of_buffer b in (match conf.Syntax.uniq with | [] -> () | uniq -> Format.fprintf formatter "\nGROUP BY %a" (Format.pp_print_list ~pp_sep:(fun f () -> Format.fprintf f ", ") (fun formatter column -> let seq = Q.query_of_expression Q.BindParam formatter (show_path ~conf) column in Queue.transfer seq parameters)) uniq); (match conf.Syntax.sort with | [] -> () | sort -> Format.fprintf formatter "\nORDER BY %a" (Format.pp_print_list ~pp_sep:(fun f () -> Format.fprintf f ", ") (fun formatter column -> let seq = Q.query_of_expression Q.BindParam formatter (show_path ~conf) column in Queue.transfer seq parameters)) sort); Format.pp_print_flush formatter (); ({ q = Buffer.contents b; parameters = Queue.to_seq parameters }, headers) let check_external : Syntax.t -> Syntax.extern -> query = fun conf external_ -> let extern_table = Table.name external_.target in let parameters = Queue.create () in let internal_key_buffer = Buffer.create 16 in let formatter = Format.formatter_of_buffer internal_key_buffer in let internal_key_seq = Q.query_of_expression Q.BindParam formatter (show_path ~conf) external_.intern_key in Format.pp_print_flush formatter (); let external_key_buffer = Buffer.create 16 in let pointed_tables = pointed_tables conf external_.intern_key in Buffer.add_string external_key_buffer (print_column external_.Syntax.target ("key_" ^ external_.Syntax.target.name)); (* We do a copy before the transfert because the Queue is reused later in the query *) Queue.transfer (Queue.copy internal_key_seq) parameters; let join_content = Buffer.contents external_key_buffer in let inner_content = Buffer.contents internal_key_buffer in let b = Buffer.create 256 in let formatter = Format.formatter_of_buffer b in let () = Format.fprintf formatter "SELECT %a%s FROM%a LEFT JOIN '%s' AS '%s' ON %t = %s WHERE %s IS NULL \ AND %s IS NOT NULL AND %s <> ''" (fun formatter -> function | [ (table, _name) ] -> Format.fprintf formatter "%s, " (print_column table "id") | _ -> Format.fprintf formatter "-1, ") pointed_tables (* *) inner_content (* *) (Format.pp_print_list ~pp_sep:(fun f () -> Format.pp_print_text f ", ") (fun formatter (table, name) -> Format.fprintf formatter "\n'%s' AS '%s'" name table.Table.name)) pointed_tables (* *) extern_table (* *) external_.target.name (prepare_key ~f:(fun b -> Format.pp_print_text b (Buffer.contents internal_key_buffer))) join_content (* *) join_content (* *) inner_content (* *) inner_content in Format.pp_print_flush formatter (); { q = Buffer.contents b; parameters = Queue.to_seq parameters } let build_key_insert : Buffer.t -> Dependency.key -> unit = fun buffer { Dependency.expression; _ } -> let show_column : Format.formatter -> Path.column -> unit = fun formatter column -> Format.fprintf formatter ":col_%d" column in let formatter = Format.formatter_of_buffer buffer in let () = prepare_key formatter ~f:(fun formatter -> Q.query_of_expression Q.NoParam formatter show_column expression) in Format.pp_print_flush formatter (); ()