From 6b377719c10d5ab3343fd5221f99a4a21008e25a Mon Sep 17 00:00:00 2001 From: Sébastien Dailly Date: Thu, 14 Mar 2024 08:26:58 +0100 Subject: Initial commit --- lib/sql/db.ml | 383 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 383 insertions(+) create mode 100644 lib/sql/db.ml (limited to 'lib/sql/db.ml') diff --git a/lib/sql/db.ml b/lib/sql/db.ml new file mode 100644 index 0000000..89431b1 --- /dev/null +++ b/lib/sql/db.ml @@ -0,0 +1,383 @@ +open StdLabels +module CSV = ImportCSV +module Syntax = ImportConf.Syntax +module Table = ImportDataTypes.Table +module Path = ImportDataTypes.Path + +type 'a t = 'a T.t + +let ( let* ) res cont = Result.bind (T.to_result res) cont +let ( let** ) res cont = Result.bind res cont +let begin_transaction = T.begin_transaction +let rollback = T.rollback +let commit = T.commit +let finalize = T.finalize +let reset = T.reset +let insert_header = Header.insert_header +let query_headers = Header.query_headers + +let with_db : string -> (Sqlite3.db -> unit T.result) -> unit T.result = + fun filename f -> + let db = Sqlite3.db_open filename in + + Match.register db; + Date.register db; + Year.register db; + Join.register db; + Math.register db; + Trim.register db; + + (*let* _ = Sqlite3.exec db "PRAGMA foreign_keys = ON" |> to_result in*) + Sqlite3.( let& ) db f + +let create_table : 'a t -> ImportAnalyser.Dependency.t -> unit T.result = + fun db table -> + let source = ImportAnalyser.Dependency.table table in + let name = Table.name source in + + let* _ = + Sqlite3.exec db + (String.concat ~sep:"" [ "DROP TABLE IF EXISTS '"; name; "'" ]) + in + + let query = ImportAnalyser.Query.create_table table in + let* _ = Sqlite3.exec db query in + match Header.create_table db with + | Ok () -> Hashs.create_table db + | e -> e + +let update_hash : 'a t -> ImportAnalyser.Dependency.t -> unit T.result = + fun db mapping -> + match Hashs.insert db mapping with + | Ok () -> Ok () + | Error _ -> + let _ = Hashs.create_table db in + Hashs.insert db mapping + +let check_table_schema : 'a t -> ImportAnalyser.Dependency.t -> bool T.result = + fun db table -> + let source = ImportAnalyser.Dependency.table table in + let name = Table.name source in + let query = + String.concat ~sep:"" + [ "SELECT sql FROM sqlite_schema WHERE name = '"; name; "'" ] + in + let stmt = Sqlite3.prepare db query in + + let rc, result = + Sqlite3.fold stmt ~init:false ~f:(fun value row -> + if Array.length row <> 1 then value + else + match Sqlite3.Data.to_string (Array.get row 0) with + | Some s -> + let query = ImportAnalyser.Query.create_table table in + String.equal s query + | None -> value) + in + let* _ = rc in + + (* The schema is the same, now check the hash in case the indexes changed *) + let rc_hash = Hashs.query db source in + match rc_hash with + | Ok (Some i) -> + let hash = Hashs.evaluate table in + begin + if i == hash then Ok result + else + let _ = update_hash db table in + Ok false + end + | _ -> + let _ = update_hash db table in + Ok result + +let prepare_insert : + Sqlite3.db -> ImportAnalyser.Dependency.t -> Sqlite3.stmt T.result = + fun db mapping -> + (* Get the list of columns from the table configuration *) + let columns = + ImportAnalyser.Dependency.columns mapping + |> ImportContainers.IntSet.elements + in + let table_name = Table.name (ImportAnalyser.Dependency.table mapping) in + + let open Buffer in + let buff = create 20 and value_buff = create 10 and col_buff = create 10 in + + add_string col_buff "'id',"; + + (* Add the key name if present *) + List.iter (ImportAnalyser.Dependency.keys mapping) + ~f:(fun { ImportAnalyser.Dependency.name; _ } -> + add_string col_buff "'key_"; + + add_string col_buff name; + add_string col_buff "',"); + + add_string value_buff ":id,"; + + (* Add the key settings if presents *) + List.iter (ImportAnalyser.Dependency.keys mapping) ~f:(fun key -> + ImportAnalyser.Query.build_key_insert value_buff key; + add_string value_buff ","); + + List.iter columns ~f:(fun id -> + add_string col_buff "'col_"; + add_string col_buff (string_of_int id); + add_string col_buff "',"; + + let col_name = ":col_" ^ string_of_int id in + + add_string value_buff col_name; + add_string value_buff ","); + + truncate col_buff (length col_buff - 1); + truncate value_buff (length value_buff - 1); + add_string buff "INSERT INTO '"; + add_string buff table_name; + add_string buff "' ("; + add_buffer buff col_buff; + add_string buff " ) VALUES ("; + add_buffer buff value_buff; + add_string buff " )"; + + let query = contents buff in + + try Ok (Sqlite3.prepare db query) with + | e -> + print_endline "Error during this query :"; + print_endline query; + Error e + +let eval_key : + 'a t -> + Sqlite3.stmt option -> + ImportAnalyser.Dependency.key list -> + (int * CSV.DataType.t) list -> + (Sqlite3.stmt option * Sqlite3.Data.t list) T.result = + fun db stmt keys values -> + match keys with + | [] -> Ok (None, []) + | _ -> + let buffer = Buffer.create 16 in + Buffer.add_string buffer "SELECT "; + List.iter keys ~f:(fun key -> + ImportAnalyser.Query.build_key_insert buffer key; + Buffer.add_string buffer ","); + + Buffer.truncate buffer (Buffer.length buffer - 1); + let query = Buffer.contents buffer in + + let statement = Sqlite3.prepare_or_reset db (ref stmt) query in + + (* Extract all the column id used in the keys. + *) + let keys_id = + List.fold_left keys ~init:ImportContainers.IntSet.empty + ~f:(fun acc (keys : ImportAnalyser.Dependency.key) -> + let columns = Lazy.force keys.ImportAnalyser.Dependency.columns in + ImportContainers.IntSet.union acc columns) + in + + let** _ = + List.fold_left values ~init:(Ok 1) ~f:(fun idx (id, value) -> + let** idx = idx in + + (* Ensure the column is required in the keys *) + match ImportContainers.IntSet.mem (1 + id) keys_id with + | false -> Ok (idx + 1) + | true -> + let sql_data = T.of_datatype value in + + let col_name = ":col_" ^ string_of_int (1 + id) in + let* _ = Sqlite3.bind_name statement col_name sql_data in + Ok (idx + 1)) + in + + let result, evaluated_keys = + Sqlite3.fold statement ~init:[] ~f:(fun _ v -> Array.to_list v) + in + let* _ = result in + Ok (Some statement, evaluated_keys) + +let insert : + Sqlite3.db -> + Sqlite3.stmt -> + id:int -> + (int * CSV.DataType.t) list -> + unit T.result = + fun db statement ~id values -> + let** _ = T.savepoint db "PREVIOUS" in + let* _ = + Sqlite3.bind_name statement ":id" (Sqlite3.Data.INT (Int64.of_int id)) + in + let** _ = + List.fold_left values ~init:(Ok 1) ~f:(fun idx (id, value) -> + let** idx = idx in + let sql_data = T.of_datatype value in + + let col_name = ":col_" ^ string_of_int (1 + id) in + let* _ = Sqlite3.bind_name statement col_name sql_data in + + Ok (idx + 1)) + in + + match T.to_result (Sqlite3.step statement) with + | Ok () -> T.release db "PREVIOUS" + | Error e -> + (* I intentionnaly ignore any error here, as we are already in an + error case *) + ignore (Sqlite3.exec db "ROLLBACK TRANSACTION TO SAVEPOINT PREVIOUS"); + Error e + +(** This simple function convert a query generated by the application into a + statement executed with sqlite. + + The function expect a perfect match between the query and the parameters. *) +let execute_query : + Sqlite3.db -> ImportAnalyser.Query.query -> Sqlite3.stmt T.result = + fun db query -> + let statement = + try Sqlite3.prepare db query.q with + | e -> + print_endline "Error during this query :"; + print_endline query.q; + raise e + in + + (* Add the binded parameters *) + let values = + Seq.map (fun v -> T.of_datatype v) query.parameters |> List.of_seq + in + + let* _ = Sqlite3.bind_values statement values in + + Ok statement + +let query : + f:((Path.t ImportExpression.T.t * CSV.DataType.t) array -> unit) -> + Sqlite3.db -> + Syntax.t -> + unit T.result = + fun ~f db output -> + (* Extract the query from the configuration. *) + let** query_analysis = + match ImportAnalyser.Query.select output with + | exception e -> Error e + | other -> Ok other + in + + let query, columns = query_analysis in + let** statement = execute_query db query in + + let* _ = + Sqlite3.iter statement ~f:(fun data -> + let values = + Array.map2 data columns ~f:(fun value column -> + (column, T.to_datatype value)) + in + f values) + in + Ok () + +let create_view : Sqlite3.db -> Syntax.t -> unit T.result = + fun db output -> + ignore output; + let* drop = Sqlite3.exec db "DROP VIEW IF EXISTS 'result'" in + ignore drop; + + Ok () + +(* + let query, _ = ImportAnalyser.Query.select output in + + let query = + { query with q = Printf.sprintf "CREATE VIEW result AS %s" query.q } + in + let** statement = execute_query db query in + + Sqlite3.step statement |> T.to_result + *) + +let check_foreign : + f:((string * CSV.DataType.t) array -> unit) -> + Sqlite3.db -> + Syntax.t -> + Syntax.extern -> + unit T.result = + fun ~f db conf external_ -> + let query = ImportAnalyser.Query.check_external conf external_ in + + let** statement = execute_query db query in + Sqlite3.iter statement ~f:(fun data -> + let values = + Array.mapi data ~f:(fun i value -> + (Sqlite3.column_name statement i, T.to_datatype value)) + in + f values) + |> T.to_result + +let clear_duplicates : + f:((string * ImportCSV.DataType.t) array -> unit) -> + 'a t -> + ImportDataTypes.Table.t -> + ImportAnalyser.Dependency.key list -> + unit T.result = + fun ~f db table keys -> + let table_name = Table.name table in + + List.fold_left keys ~init:(Ok ()) + ~f:(fun state { ImportAnalyser.Dependency.name; _ } -> + let** _ = state in + + let select_query = + String.concat ~sep:"" + [ + "SELECT '"; + table_name; + "'.id, '"; + table_name; + "'.'key_"; + name; + "', '"; + name; + "' FROM '"; + table_name; + "' INNER JOIN (SELECT id, row_number() OVER(PARTITION BY '"; + table_name; + "'.'key_"; + name; + "' ORDER BY (id)) AS row_num from '"; + table_name; + "') other_table WHERE other_table.row_num <> 1 and '"; + table_name; + "'.id = other_table.id"; + ] + in + let stmt = Sqlite3.prepare db select_query in + + ignore + @@ Sqlite3.iter stmt ~f:(fun data -> + let values = + Array.mapi data ~f:(fun i value -> + (Sqlite3.column_name stmt i, T.to_datatype value)) + in + f values); + + let delete_query = + Printf.sprintf + {|UPDATE '%s' +SET key_%s = NULL +FROM +( + SELECT id, row_number() OVER(PARTITION BY key_%s ORDER BY (id)) AS row_num + from '%s' +) other_table +WHERE other_table.row_num <> 1 +and '%s'.id = other_table.id|} + table_name name name table_name table_name + in + + Sqlite3.exec db delete_query |> T.to_result) + +type 'a result = ('a, exn) Result.t -- cgit v1.2.3