open StdLabels module CSV = ImportCSV module Syntax = ImportConf.Syntax module Table = ImportDataTypes.Table module Path = ImportDataTypes.Path type 'a t = 'a T.t let ( let* ) res cont = Result.bind (T.to_result res) cont let ( let** ) res cont = Result.bind res cont let begin_transaction = T.begin_transaction let rollback = T.rollback let commit = T.commit let finalize = T.finalize let reset = T.reset let insert_header = Header.insert_header let query_headers = Header.query_headers let with_db : string -> (Sqlite3.db -> unit T.result) -> unit T.result = fun filename f -> let db = Sqlite3.db_open filename in Match.register db; Date.register db; Year.register db; Join.register db; Math.register db; Trim.register db; (*let* _ = Sqlite3.exec db "PRAGMA foreign_keys = ON" |> to_result in*) Sqlite3.( let& ) db f let create_table : 'a t -> ImportAnalyser.Dependency.t -> unit T.result = fun db table -> let source = ImportAnalyser.Dependency.table table in let name = Table.name source in let* _ = Sqlite3.exec db (String.concat ~sep:"" [ "DROP TABLE IF EXISTS '"; name; "'" ]) in let query = ImportAnalyser.Query.create_table table in let* _ = Sqlite3.exec db query in match Header.create_table db with | Ok () -> Hashs.create_table db | e -> e let update_hash : 'a t -> ImportAnalyser.Dependency.t -> unit T.result = fun db mapping -> match Hashs.insert db mapping with | Ok () -> Ok () | Error _ -> let _ = Hashs.create_table db in Hashs.insert db mapping let check_table_schema : 'a t -> ImportAnalyser.Dependency.t -> bool T.result = fun db table -> let source = ImportAnalyser.Dependency.table table in let name = Table.name source in let query = String.concat ~sep:"" [ "SELECT sql FROM sqlite_schema WHERE name = '"; name; "'" ] in let stmt = Sqlite3.prepare db query in let rc, result = Sqlite3.fold stmt ~init:false ~f:(fun value row -> if Array.length row <> 1 then value else match Sqlite3.Data.to_string (Array.get row 0) with | Some s -> let query = ImportAnalyser.Query.create_table table in String.equal s query | None -> value) in let* _ = rc in (* The schema is the same, now check the hash in case the indexes changed *) let rc_hash = Hashs.query db source in match rc_hash with | Ok (Some i) -> let hash = Hashs.evaluate table in begin if i == hash then Ok result else let _ = update_hash db table in Ok false end | _ -> let _ = update_hash db table in Ok result let prepare_insert : Sqlite3.db -> ImportAnalyser.Dependency.t -> Sqlite3.stmt T.result = fun db mapping -> (* Get the list of columns from the table configuration *) let columns = ImportAnalyser.Dependency.columns mapping |> ImportContainers.IntSet.elements in let table_name = Table.name (ImportAnalyser.Dependency.table mapping) in let open Buffer in let buff = create 20 and value_buff = create 10 and col_buff = create 10 in add_string col_buff "'id',"; (* Add the key name if present *) List.iter (ImportAnalyser.Dependency.keys mapping) ~f:(fun { ImportAnalyser.Dependency.name; _ } -> add_string col_buff "'key_"; add_string col_buff name; add_string col_buff "',"); add_string value_buff ":id,"; (* Add the key settings if presents *) List.iter (ImportAnalyser.Dependency.keys mapping) ~f:(fun key -> ImportAnalyser.Query.build_key_insert value_buff key; add_string value_buff ","); List.iter columns ~f:(fun id -> add_string col_buff "'col_"; add_string col_buff (string_of_int id); add_string col_buff "',"; let col_name = ":col_" ^ string_of_int id in add_string value_buff col_name; add_string value_buff ","); truncate col_buff (length col_buff - 1); truncate value_buff (length value_buff - 1); add_string buff "INSERT INTO '"; add_string buff table_name; add_string buff "' ("; add_buffer buff col_buff; add_string buff " ) VALUES ("; add_buffer buff value_buff; add_string buff " )"; let query = contents buff in try Ok (Sqlite3.prepare db query) with | e -> print_endline "Error during this query :"; print_endline query; Error e let eval_key : 'a t -> Sqlite3.stmt option -> ImportAnalyser.Dependency.key list -> (int * CSV.DataType.t) list -> (Sqlite3.stmt option * Sqlite3.Data.t list) T.result = fun db stmt keys values -> match keys with | [] -> Ok (None, []) | _ -> let buffer = Buffer.create 16 in Buffer.add_string buffer "SELECT "; List.iter keys ~f:(fun key -> ImportAnalyser.Query.build_key_insert buffer key; Buffer.add_string buffer ","); Buffer.truncate buffer (Buffer.length buffer - 1); let query = Buffer.contents buffer in let statement = Sqlite3.prepare_or_reset db (ref stmt) query in (* Extract all the column id used in the keys. *) let keys_id = List.fold_left keys ~init:ImportContainers.IntSet.empty ~f:(fun acc (keys : ImportAnalyser.Dependency.key) -> let columns = Lazy.force keys.ImportAnalyser.Dependency.columns in ImportContainers.IntSet.union acc columns) in let** _ = List.fold_left values ~init:(Ok 1) ~f:(fun idx (id, value) -> let** idx = idx in (* Ensure the column is required in the keys *) match ImportContainers.IntSet.mem (1 + id) keys_id with | false -> Ok (idx + 1) | true -> let sql_data = T.of_datatype value in let col_name = ":col_" ^ string_of_int (1 + id) in let* _ = Sqlite3.bind_name statement col_name sql_data in Ok (idx + 1)) in let result, evaluated_keys = Sqlite3.fold statement ~init:[] ~f:(fun _ v -> Array.to_list v) in let* _ = result in Ok (Some statement, evaluated_keys) let insert : Sqlite3.db -> Sqlite3.stmt -> id:int -> (int * CSV.DataType.t) list -> unit T.result = fun db statement ~id values -> let** _ = T.savepoint db "PREVIOUS" in let* _ = Sqlite3.bind_name statement ":id" (Sqlite3.Data.INT (Int64.of_int id)) in let** _ = List.fold_left values ~init:(Ok 1) ~f:(fun idx (id, value) -> let** idx = idx in let sql_data = T.of_datatype value in let col_name = ":col_" ^ string_of_int (1 + id) in let* _ = Sqlite3.bind_name statement col_name sql_data in Ok (idx + 1)) in match T.to_result (Sqlite3.step statement) with | Ok () -> T.release db "PREVIOUS" | Error e -> (* I intentionnaly ignore any error here, as we are already in an error case *) ignore (Sqlite3.exec db "ROLLBACK TRANSACTION TO SAVEPOINT PREVIOUS"); Error e (** This simple function convert a query generated by the application into a statement executed with sqlite. The function expect a perfect match between the query and the parameters. *) let execute_query : Sqlite3.db -> ImportAnalyser.Query.query -> Sqlite3.stmt T.result = fun db query -> let statement = try Sqlite3.prepare db query.q with | e -> print_endline "Error during this query :"; print_endline query.q; raise e in (* Add the binded parameters *) let values = Seq.map (fun v -> T.of_datatype v) query.parameters |> List.of_seq in let* _ = Sqlite3.bind_values statement values in Ok statement let query : f:((Path.t ImportExpression.T.t * CSV.DataType.t) array -> unit) -> Sqlite3.db -> Syntax.t -> unit T.result = fun ~f db output -> (* Extract the query from the configuration. *) let** query_analysis = match ImportAnalyser.Query.select output with | exception e -> Error e | other -> Ok other in let query, columns = query_analysis in let** statement = execute_query db query in let* _ = Sqlite3.iter statement ~f:(fun data -> let values = Array.map2 data columns ~f:(fun value column -> (column, T.to_datatype value)) in f values) in Ok () let create_view : Sqlite3.db -> Syntax.t -> unit T.result = fun db output -> ignore output; let* drop = Sqlite3.exec db "DROP VIEW IF EXISTS 'result'" in ignore drop; Ok () (* let query, _ = ImportAnalyser.Query.select output in let query = { query with q = Printf.sprintf "CREATE VIEW result AS %s" query.q } in let** statement = execute_query db query in Sqlite3.step statement |> T.to_result *) let check_foreign : f:((string * CSV.DataType.t) array -> unit) -> Sqlite3.db -> Syntax.t -> Syntax.extern -> unit T.result = fun ~f db conf external_ -> let query = ImportAnalyser.Query.check_external conf external_ in let** statement = execute_query db query in Sqlite3.iter statement ~f:(fun data -> let values = Array.mapi data ~f:(fun i value -> (Sqlite3.column_name statement i, T.to_datatype value)) in f values) |> T.to_result let clear_duplicates : f:((string * ImportCSV.DataType.t) array -> unit) -> 'a t -> ImportDataTypes.Table.t -> ImportAnalyser.Dependency.key list -> unit T.result = fun ~f db table keys -> let table_name = Table.name table in List.fold_left keys ~init:(Ok ()) ~f:(fun state { ImportAnalyser.Dependency.name; _ } -> let** _ = state in let select_query = String.concat ~sep:"" [ "SELECT '"; table_name; "'.id, '"; table_name; "'.'key_"; name; "', '"; name; "' FROM '"; table_name; "' INNER JOIN (SELECT id, row_number() OVER(PARTITION BY '"; table_name; "'.'key_"; name; "' ORDER BY (id)) AS row_num from '"; table_name; "') other_table WHERE other_table.row_num <> 1 and '"; table_name; "'.id = other_table.id"; ] in let stmt = Sqlite3.prepare db select_query in ignore @@ Sqlite3.iter stmt ~f:(fun data -> let values = Array.mapi data ~f:(fun i value -> (Sqlite3.column_name stmt i, T.to_datatype value)) in f values); let delete_query = Printf.sprintf {|UPDATE '%s' SET key_%s = NULL FROM ( SELECT id, row_number() OVER(PARTITION BY key_%s ORDER BY (id)) AS row_num from '%s' ) other_table WHERE other_table.row_num <> 1 and '%s'.id = other_table.id|} table_name name name table_name table_name in Sqlite3.exec db delete_query |> T.to_result) type 'a result = ('a, exn) Result.t