open StdLabels module Expression = ImportExpression module Q = Expression.Query module Syntax = ImportConf.Syntax module Table = ImportDataTypes.Table module Path = ImportDataTypes.Path (* Collect all the tables pointed by the expression. *) let pointed_tables : Syntax.t -> 'a Expression.T.t -> (Table.t * string) list = fun conf expression -> Expression.T.fold_values expression ~init:[] ~f:(fun acc path -> let table = ImportConf.get_table_for_name conf path.Path.alias in let table_name = Table.name table in (table, table_name) :: acc) |> List.sort_uniq ~cmp:Stdlib.compare let create_table : Dependency.t -> string = fun mapping -> let b = Buffer.create 64 in Buffer.add_string b "CREATE TABLE '"; Buffer.add_string b (Table.name (Dependency.table mapping)); Buffer.add_string b "' (id INTEGER PRIMARY KEY"; List.iter (Dependency.keys mapping) ~f:(fun { Dependency.name; _ } -> Buffer.add_string b ",'key_"; Buffer.add_string b name; Buffer.add_string b "'"); ImportContainers.IntSet.iter (Dependency.columns mapping) ~f:(fun i -> Buffer.add_string b ",'col_"; Buffer.add_string b (string_of_int i); Buffer.add_string b "'"); Buffer.add_string b ")"; Buffer.contents b type query = { q : string; parameters : ImportCSV.DataType.t Seq.t; } let rec take_elements : prefix:'a list -> eq:('a -> 'a -> bool) -> 'a list -> 'a list = fun ~prefix ~eq group -> match (prefix, group) with | [], any -> any | _, [] -> [] | hd1 :: tl1, hd2 :: tl2 when eq hd1 hd2 -> take_elements ~eq ~prefix:tl1 tl2 | _, _ -> raise Not_found (** The window functions shall be grouped in the same way as the uniq property. (We cannot make a partition over a group not kept in the final result). But the SQL query need to remove from the window function the elements already defined in the group by statement, and we need to filter them from the configuration before building the query. *) let clean_window : prefix:Path.t ImportExpression.T.t list -> Path.t ImportExpression.T.t -> Path.t ImportExpression.T.t = fun ~prefix expression -> let open ImportExpression.T in let rec f = function | Expr e -> Expr (f e) | Empty -> Empty | Literal s -> Literal s | Integer i -> Integer i | Path p -> Path p | Concat pp -> Concat (List.map ~f pp) | Function' (name, pp) -> Function' (name, List.map ~f pp) | Function (name, pp) -> Function (name, List.map ~f pp) | Nvl pp -> Nvl (List.map ~f pp) | Join (sep, pp) -> Join (sep, List.map ~f pp) | Window (window_f, group, order) -> let w = map_window ~f window_f in let group = take_elements ~eq:(ImportExpression.T.equal Path.equal) ~prefix group in Window (w, List.map ~f group, List.map ~f order) | BOperator (n, arg1, arg2) -> BOperator (n, f arg1, f arg2) | GEquality (n, arg1, args) -> GEquality (n, f arg1, List.map ~f args) in f expression (** Build the query and return also the mapping in order to identify each external links between files. The select query will name each column with an alias, and the map allow to find which source is pointed by this alias. *) let select : Syntax.t -> query * Path.t ImportExpression.T.t array = fun conf -> let filter = ImportConf.CTE.of_filters conf.filters in (* For each column in the configuration file, add the corresponding element in the query. The Sqlite driver return the elements in an array, we create an array too in order to manage the elements together. *) let headers = Array.make (List.length conf.columns) (Obj.magic None) in (* Transform the columns to extract from the query. - Associate each of them with a number - Clean the window functions and remove the part already defined in the [uniq] parameter. *) let columns = List.to_seq conf.columns |> Seq.mapi (fun i c -> let expression = c in (i, clean_window ~prefix:conf.uniq expression)) in let filters = Chunk.create () in let request_header = Filters.generate_sql ~conf filter filters in let b = request_header.Chunk.b and parameters = request_header.Chunk.parameters in let formatter = Format.formatter_of_buffer b in let () = Format.fprintf formatter "SELECT %a" (Format.pp_print_seq ~pp_sep:(fun f () -> Format.fprintf f ",\n") (fun formatter (i, column) -> Array.set headers i column; let p = Q.query_of_expression Q.BindParam formatter (Printers.path ~conf) column in Queue.transfer p parameters; Format.fprintf formatter " AS result_%d" i)) columns in Format.pp_print_flush formatter (); let () = Chunk.create_from_statement_of_chunck conf request_header in Chunk.append ~head:request_header ~tail:filters; let formatter = Format.formatter_of_buffer b in (match conf.Syntax.uniq with | [] -> () | uniq -> Format.fprintf formatter "\nGROUP BY %a" (Format.pp_print_list ~pp_sep:(fun f () -> Format.fprintf f ", ") (fun formatter column -> let seq = Q.query_of_expression Q.BindParam formatter (Printers.path ~conf) column in Queue.transfer seq parameters)) uniq); (match conf.Syntax.sort with | [] -> () | sort -> Format.fprintf formatter "\nORDER BY %a" (Format.pp_print_list ~pp_sep:(fun f () -> Format.fprintf f ", ") (fun formatter column -> let seq = Q.query_of_expression Q.BindParam formatter (Printers.path ~conf) column in Queue.transfer seq parameters)) sort); Format.pp_print_flush formatter (); ({ q = Buffer.contents b; parameters = Queue.to_seq parameters }, headers) let check_external : Syntax.t -> Syntax.Extern.t -> query = fun conf external_ -> let internal_chunk = Chunk.create () in Chunk.add_expression ~conf internal_chunk external_.Syntax.Extern.intern_key; let external_key_buffer = Buffer.create 16 in Buffer.add_string external_key_buffer (Table.print_column external_.Syntax.Extern.target ("key_" ^ external_.Syntax.Extern.target.name)); let pointed_tables = pointed_tables conf external_.intern_key in let parameters = Queue.create () in (* We do a copy before the transfert because the Queue is reused later in the query *) Queue.transfer (Queue.copy internal_chunk.parameters) parameters; (* We have to link all the tables referenced by the external, we cannot let any table not linked with the source in the request (this would cause a cartesian product request) This not the usual way to proceed (we start from the source and link the externals) *) let rec collect_links : Syntax.Extern.t -> Syntax.Extern.t list -> Syntax.Extern.t list = fun table init -> let res = (* Do not add the same external if the value is already present *) let init = match List.find_opt init ~f:(fun ext -> table == ext) with | None -> table :: init | Some _ -> init in Expression.T.fold_values ~init table.Syntax.Extern.intern_key ~f:(fun acc expr -> match expr.Path.alias with | None -> acc | Some _ as path -> ( let table = ImportConf.get_table_for_name conf path in (* Look for this table in the externals *) let external_opt = List.find_opt conf.Syntax.externals ~f:(fun t -> t.Syntax.Extern.target == table) in match external_opt with | None -> acc | Some ext -> collect_links ext acc)) in res in let dependencies = collect_links external_ [] in let join_content = Buffer.contents external_key_buffer in let request = Chunk.create () in Chunk.add_string request "SELECT "; let () = match pointed_tables with | [] -> (* Otherwise, just return -1 *) Chunk.add_string request "-1" | (table, _name) :: _ -> (* If we have a single source, extract the row number. *) Chunk.add_string request (Table.print_column table "id") in Chunk.add_string request ", "; Chunk.append ~head:request ~tail:(Chunk.copy internal_chunk); Chunk.create_from_statement_of_chunck ~externals:dependencies conf request; Chunk.add_string request " WHERE "; Chunk.add_string request join_content; Chunk.add_string request " IS NULL AND "; Chunk.append ~head:request ~tail:(Chunk.copy internal_chunk); Chunk.add_string request " IS NOT NULL AND "; Chunk.append ~head:request ~tail:(Chunk.copy internal_chunk); Chunk.add_string request " <> ''"; let q = Buffer.contents request.b in { q; parameters = Queue.to_seq request.parameters } let build_key_insert : Buffer.t -> Dependency.key -> unit = fun buffer { Dependency.expression; _ } -> let show_column : Format.formatter -> Path.column -> unit = fun formatter column -> Format.fprintf formatter ":col_%d" column in let formatter = Format.formatter_of_buffer buffer in let () = Printers.prepare_key formatter ~f:(fun formatter -> Q.query_of_expression Q.NoParam formatter show_column expression) in Format.pp_print_flush formatter ()