aboutsummaryrefslogtreecommitdiff
path: root/lib/file_handler
diff options
context:
space:
mode:
authorSébastien Dailly <sebastien@dailly.me>2024-03-14 08:26:58 +0100
committerSébastien Dailly <sebastien@dailly.me>2024-03-14 08:26:58 +0100
commit6b377719c10d5ab3343fd5221f99a4a21008e25a (patch)
treea7c1e9a820d339a2f161af3e09cf9e3161286796 /lib/file_handler
Initial commitmain
Diffstat (limited to 'lib/file_handler')
-rw-r--r--lib/file_handler/csv2sql.ml135
-rw-r--r--lib/file_handler/csv2sql.mli10
-rwxr-xr-xlib/file_handler/dune21
-rw-r--r--lib/file_handler/state.ml178
-rw-r--r--lib/file_handler/state.mli46
-rw-r--r--lib/file_handler/xlsx2sql.ml205
-rw-r--r--lib/file_handler/xlsx2sql.mli10
7 files changed, 605 insertions, 0 deletions
diff --git a/lib/file_handler/csv2sql.ml b/lib/file_handler/csv2sql.ml
new file mode 100644
index 0000000..42d84eb
--- /dev/null
+++ b/lib/file_handler/csv2sql.ml
@@ -0,0 +1,135 @@
+open StdLabels
+module A = ImportAnalyser.Dependency
+module CSV = ImportCSV
+module C = ImportContainers
+module Syntax = ImportConf.Syntax
+module Db = ImportSQL.Db
+
+type state = CSV.DataType.t array State.t
+
+let default_mapper :
+ (ImportCSV.DataType.t, ImportCSV.DataType.t array) State.mapper =
+ { get_row = Fun.id; get_value = Fun.id; default = ImportCSV.DataType.Null }
+
+let extract_values : string -> CSV.DataType.t =
+ fun value ->
+ (* Test first if the content is empty *)
+ if String.equal String.empty value then CSV.DataType.Null
+ else
+ (* else, try differents conversion in order to see which one works *)
+ match int_of_string_opt value with
+ | Some i -> CSV.DataType.Integer i
+ | None -> (
+ match float_of_string_opt value with
+ | Some f -> CSV.DataType.Float f
+ | None ->
+ (* And finaly convert into date *)
+ CSV.DataType.Content value)
+
+(** Initialize the state for the first row, count the column number and create
+ the table in the database *)
+let first_row : A.t -> _ Db.t -> state -> CSV.DataType.t list -> state =
+ fun mapping db acc row ->
+ (if acc.transaction then
+ match Db.commit db with
+ | Ok () -> ()
+ | Error e -> print_endline (ImportErrors.repr_error e));
+
+ ignore @@ Db.create_table db mapping;
+ let row = Array.of_list row in
+ match Db.prepare_insert db mapping with
+ | Ok stmt ->
+ {
+ acc with
+ header = Some row;
+ transaction = false;
+ insert_stmt = Some stmt;
+ row_number = acc.row_number + 1;
+ }
+ | _ -> { acc with header = Some row; transaction = false; insert_stmt = None }
+
+let read_csv_line :
+ log_error:ImportErrors.t -> A.t -> 'a Db.t -> state -> string list -> state
+ =
+ fun ~log_error mapping db acc row ->
+ let processed_row =
+ List.to_seq row |> Seq.map extract_values |> Array.of_seq
+ in
+ if acc.State.transaction then
+ State.run_row ~log_error ~mapper:default_mapper mapping db processed_row acc
+ else
+ match Db.begin_transaction db with
+ | Error e ->
+ print_endline (ImportErrors.repr_error e);
+ acc
+ | Ok () ->
+ let acc = { acc with transaction = true } in
+ State.run_row ~log_error ~mapper:default_mapper mapping db processed_row
+ acc
+
+let importInDatable :
+ log_error:ImportErrors.t ->
+ conf:Syntax.t ->
+ dirname:string ->
+ A.t ->
+ 'a Db.t ->
+ CSV.DataType.t array option Lwt.t =
+ fun ~log_error ~conf ~dirname mapping db ->
+ let file = Filename.concat dirname (A.table mapping).file in
+
+ let channel = Stdlib.open_in_bin file in
+
+ let csv_channel = Csv.of_channel ~separator:';' ~excel_tricks:true channel in
+
+ (* In the headers, we only keep the string.
+
+ This line could generate an error if the headers are not correctly defined.
+ *)
+ let header =
+ List.map ~f:(fun v -> CSV.DataType.Content v) (Csv.next csv_channel)
+ in
+
+ let state =
+ State.
+ {
+ transaction = false;
+ header = None;
+ insert_stmt = None;
+ check_key_stmt = None;
+ row_number = 1;
+ sheet_number = 1;
+ delayed = [];
+ }
+ in
+ let state = first_row mapping db state header in
+
+ let state =
+ try
+ Csv.fold_left csv_channel ~init:state
+ ~f:(read_csv_line ~log_error mapping db)
+ with
+ | Csv.Failure (line, row, cause) as e ->
+ Printf.eprintf "Error %s on line %d — field : %s\n" cause line
+ (ImportCSV.Csv.column_to_string row);
+ raise e
+ in
+ ignore @@ State.clear ~log_error db mapping conf;
+ ignore @@ Db.commit db;
+
+ (* Finalize the statements created during the import *)
+ let () =
+ Option.iter (fun v -> ignore @@ Db.finalize v) state.insert_stmt;
+ Option.iter (fun v -> ignore @@ Db.finalize v) state.check_key_stmt
+ in
+
+ (* Insert all the headers *)
+ let _ =
+ Option.iter
+ (fun headers ->
+ let values = Array.mapi headers ~f:(fun i value -> (i, value)) in
+
+ ignore
+ @@ Db.insert_header db (ImportAnalyser.Dependency.table mapping) values)
+ state.header
+ in
+ Lwt.return state.header
diff --git a/lib/file_handler/csv2sql.mli b/lib/file_handler/csv2sql.mli
new file mode 100644
index 0000000..e09737b
--- /dev/null
+++ b/lib/file_handler/csv2sql.mli
@@ -0,0 +1,10 @@
+val importInDatable :
+ log_error:ImportErrors.t ->
+ conf:ImportConf.Syntax.t ->
+ dirname:string ->
+ ImportAnalyser.Dependency.t ->
+ _ ImportSQL.Db.t ->
+ ImportCSV.DataType.t array option Lwt.t
+(** Load an excel spreadsheet in an SQLite database.
+
+Return the header if at least one row where present *)
diff --git a/lib/file_handler/dune b/lib/file_handler/dune
new file mode 100755
index 0000000..6b247db
--- /dev/null
+++ b/lib/file_handler/dune
@@ -0,0 +1,21 @@
+(library
+ (name importFileHandler)
+ (libraries
+ csv
+ SZXX
+ sqlite3
+ base
+ core
+ lwt
+ lwt.unix
+ helpers
+ importAnalyser
+ importConf
+ importContainers
+ importCSV
+ importDataTypes
+ importErrors
+ importExpression
+ importSQL
+ )
+)
diff --git a/lib/file_handler/state.ml b/lib/file_handler/state.ml
new file mode 100644
index 0000000..5b43aff
--- /dev/null
+++ b/lib/file_handler/state.ml
@@ -0,0 +1,178 @@
+open StdLabels
+module Table = ImportDataTypes.Table
+
+type 'a t = {
+ header : 'a option;
+ transaction : bool;
+ insert_stmt : Sqlite3.stmt option;
+ check_key_stmt : Sqlite3.stmt option;
+ row_number : int;
+ sheet_number : int;
+ delayed : 'a list;
+}
+
+type insert_result = {
+ insert_stmt : Sqlite3.stmt option;
+ check_key_stmt : Sqlite3.stmt option;
+}
+
+type ('a, 'b) mapper = {
+ get_row : 'b -> 'a Array.t;
+ get_value : 'a -> ImportCSV.DataType.t;
+ default : 'a;
+}
+
+module A = ImportAnalyser.Dependency
+
+let insert_row :
+ mapper:(_, 'row) mapper ->
+ A.t ->
+ _ ImportSQL.Db.t ->
+ 'row ->
+ _ t ->
+ (insert_result, ImportErrors.xlsError) result =
+ fun ~mapper mapping db row state ->
+ (* Extract all columns referenced in the keys or the columns to extract *)
+ let keys_id =
+ List.fold_left (A.keys mapping) ~init:ImportContainers.IntSet.empty
+ ~f:(fun acc (keys : A.key) ->
+ let columns = keys.A.columns in
+ ImportContainers.IntSet.union acc (Lazy.force columns))
+ and columns_id = A.columns mapping in
+ let ids = ImportContainers.IntSet.(union keys_id columns_id |> elements) in
+
+ (* Filter only the required columns in the row *)
+ let values =
+ List.map ids ~f:(fun i ->
+ let index = i - 1 in
+ let value =
+ try Array.get (mapper.get_row row) index with
+ | Stdlib.Invalid_argument _ ->
+ (* If we have more headers than data, assume the value are NULL.
+ This can happen when all the line tail is empty, Excel can
+ give us a truncated line instead of a series of NULL *)
+ mapper.default
+ in
+ (index, mapper.get_value value))
+ in
+ let keys = A.keys mapping in
+
+ let execution =
+ let ( let* ) = Result.bind in
+ let* check_key_stmt, result =
+ ImportSQL.Db.eval_key db state.check_key_stmt keys values
+ in
+ let no_null =
+ (* We check if we have at least one key which is not null — and in such
+ case we ignore the line.
+
+ If multiple keys are presents, we ensure there is at least one non
+ null here.
+ *)
+ match result with
+ | [] -> true
+ | _ ->
+ List.exists result ~f:(function
+ | Sqlite3.Data.FLOAT _ | Sqlite3.Data.INT _ -> true
+ | Sqlite3.Data.BLOB t | Sqlite3.Data.TEXT t ->
+ not (String.equal "" t)
+ | Sqlite3.Data.NONE | Sqlite3.Data.NULL -> false)
+ in
+ let* _ =
+ match no_null with
+ | true -> Ok ()
+ | false -> Error (Failure "The key is null")
+ in
+
+ let* statement =
+ match state.insert_stmt with
+ | None -> ImportSQL.Db.prepare_insert db mapping
+ | Some v -> Ok v
+ in
+ let* _ = ImportSQL.Db.insert db statement ~id:state.row_number values in
+ let* _ = ImportSQL.Db.reset statement in
+
+ Helpers.Console.update_cursor ();
+ Ok { insert_stmt = Some statement; check_key_stmt }
+ in
+
+ (* In case of error, wrap the exception to get the line *)
+ Result.map_error
+ (fun e ->
+ ImportErrors.
+ {
+ source = ImportAnalyser.Dependency.table mapping;
+ sheet = state.sheet_number;
+ row = state.row_number;
+ target = None;
+ value = CSV.DataType.Content (String.concat ~sep:", " []);
+ exn = e;
+ })
+ execution
+
+(** Load the row with all the informations associated with this sheet.
+
+ If an error has already been raised during the sheet, ignore this row only. *)
+let run_row :
+ log_error:ImportErrors.t ->
+ mapper:(_, 'row) mapper ->
+ A.t ->
+ _ ImportSQL.Db.t ->
+ 'row ->
+ 'a t ->
+ 'a t =
+ fun ~log_error ~mapper mapping db row state ->
+ match insert_row ~mapper mapping db row state with
+ | Ok { insert_stmt; check_key_stmt } ->
+ {
+ state with
+ insert_stmt;
+ check_key_stmt;
+ row_number = state.row_number + 1;
+ }
+ | Error e ->
+ Option.iter (fun v -> ignore @@ ImportSQL.Db.finalize v) state.insert_stmt;
+ Option.iter
+ (fun v -> ignore @@ ImportSQL.Db.finalize v)
+ state.check_key_stmt;
+ ImportErrors.output_error log_error e;
+ {
+ state with
+ insert_stmt = None;
+ check_key_stmt = None;
+ row_number = state.row_number + 1;
+ }
+
+let clear :
+ log_error:ImportErrors.t ->
+ 'a ImportSQL.Db.t ->
+ A.t ->
+ ImportConf.Syntax.t ->
+ unit ImportSQL.Db.result =
+ fun ~log_error db mapping conf ->
+ ImportSQL.Db.clear_duplicates db (A.table mapping) (A.keys mapping)
+ ~f:(fun values ->
+ let line =
+ match snd @@ Array.get values 0 with
+ | ImportCSV.DataType.Integer i -> i
+ | _ -> -1
+ and value = snd @@ Array.get values 1
+ and target =
+ match snd @@ Array.get values 2 with
+ | ImportCSV.DataType.Content s ->
+ Some (ImportConf.get_table_for_name conf (Some s))
+ | _ -> None
+ in
+ let error =
+ ImportErrors.
+ {
+ source = A.table mapping;
+ sheet = (A.table mapping).tab;
+ row = line;
+ target;
+ value;
+ exn = Failure "Duplicated key";
+ }
+ in
+
+ ImportErrors.output_error log_error error)
diff --git a/lib/file_handler/state.mli b/lib/file_handler/state.mli
new file mode 100644
index 0000000..f744c33
--- /dev/null
+++ b/lib/file_handler/state.mli
@@ -0,0 +1,46 @@
+type 'a t = {
+ header : 'a option;
+ transaction : bool;
+ insert_stmt : Sqlite3.stmt option;
+ check_key_stmt : Sqlite3.stmt option;
+ row_number : int;
+ sheet_number : int;
+ delayed : 'a list;
+}
+
+type insert_result = {
+ insert_stmt : Sqlite3.stmt option;
+ check_key_stmt : Sqlite3.stmt option;
+}
+
+type ('a, 'b) mapper = {
+ get_row : 'b -> 'a Array.t;
+ get_value : 'a -> ImportCSV.DataType.t;
+ default : 'a;
+}
+
+val insert_row :
+ mapper:(_, 'row) mapper ->
+ ImportAnalyser.Dependency.t ->
+ _ ImportSQL.Db.t ->
+ 'row ->
+ _ t ->
+ (insert_result, ImportErrors.xlsError) result
+(** Low level row insertion *)
+
+val run_row :
+ log_error:ImportErrors.t ->
+ mapper:(_, 'row) mapper ->
+ ImportAnalyser.Dependency.t ->
+ _ ImportSQL.Db.t ->
+ 'row ->
+ 'a t ->
+ 'a t
+
+val clear :
+ log_error:ImportErrors.t ->
+ 'a ImportSQL.Db.t ->
+ ImportAnalyser.Dependency.t ->
+ ImportConf.Syntax.t ->
+ unit ImportSQL.Db.result
+(** Clean up the table after the insertion, check for the duplicates and external references *)
diff --git a/lib/file_handler/xlsx2sql.ml b/lib/file_handler/xlsx2sql.ml
new file mode 100644
index 0000000..f2d8f12
--- /dev/null
+++ b/lib/file_handler/xlsx2sql.ml
@@ -0,0 +1,205 @@
+open StdLabels
+module A = ImportAnalyser.Dependency
+module C = ImportContainers
+module CSV = ImportCSV
+module Syntax = ImportConf.Syntax
+module Db = ImportSQL.Db
+
+let flags = Unix.[ O_RDONLY; O_NONBLOCK ]
+
+let extractors =
+ SZXX.Xlsx.
+ {
+ string = (fun _location s -> CSV.DataType.Content s);
+ error =
+ (fun _location s -> CSV.DataType.Error (Printf.sprintf "#ERROR# %s" s));
+ boolean =
+ (fun _location s ->
+ let value = String.(equal s "1") in
+ CSV.DataType.Content (string_of_bool value));
+ number =
+ (fun _location s ->
+ let f = Float.of_string s in
+ if Float.is_integer f then CSV.DataType.Integer (Float.to_int f)
+ else CSV.DataType.Float f);
+ date = (fun _location s -> CSV.DataType.Content s);
+ null = CSV.DataType.Null;
+ formula =
+ (fun _location ~formula s ->
+ ignore formula;
+ CSV.DataType.Content s);
+ }
+
+let feed_bigstring ic =
+ let open Lwt.Infix in
+ let len = Lwt_io.buffer_size ic in
+ let buf = Lwt_bytes.create len in
+ SZXX.Zip.Bigstring
+ (fun () ->
+ Lwt_io.read_into_bigstring ic buf 0 len >|= function
+ | 0 -> None
+ | len -> Some SZXX.Zip.{ buf; pos = 0; len })
+
+(* Evaluate if the row can be processed right now (does not contain
+ any delayed value) *)
+let is_delayed row =
+ Array.exists row.SZXX.Xlsx.data ~f:(function
+ | SZXX.Xlsx.Delayed _ -> true
+ | _ -> false)
+
+let default_mapper :
+ (ImportCSV.DataType.t, ImportCSV.DataType.t SZXX.Xlsx.row) State.mapper =
+ {
+ get_value =
+ (function
+ | ImportCSV.DataType.Content s ->
+ ImportCSV.DataType.Content (SZXX.Xml.unescape s)
+ | any -> any);
+ default = ImportCSV.DataType.Null;
+ get_row = (fun v -> v.SZXX.Xlsx.data);
+ }
+
+type state = CSV.DataType.t SZXX.Xlsx.status SZXX.Xlsx.row State.t
+
+let delayed_mapper =
+ State.
+ {
+ get_value =
+ (function
+ | SZXX.Xlsx.Available (CSV.DataType.Content s) ->
+ CSV.DataType.Content (SZXX.Xml.unescape s)
+ | SZXX.Xlsx.Available value -> value
+ | _ -> CSV.DataType.Null);
+ default = SZXX.Xlsx.Available CSV.DataType.Null;
+ get_row = (fun v -> v.SZXX.Xlsx.data);
+ }
+
+(** Initialize the state for the first row, count the column number and create
+ the table in the database *)
+let first_row : A.t -> _ Db.t -> state -> 'a SZXX.Xlsx.row -> state =
+ fun mapping db acc row ->
+ (if acc.transaction then
+ match Db.commit db with
+ | Ok () -> ()
+ | Error e -> print_endline (ImportErrors.repr_error e));
+
+ ignore @@ Db.create_table db mapping;
+ match Db.prepare_insert db mapping with
+ | Ok stmt ->
+ {
+ acc with
+ header = Some row;
+ transaction = false;
+ insert_stmt = Some stmt;
+ }
+ | _ -> { acc with header = Some row; transaction = false; insert_stmt = None }
+
+let importInDatable :
+ log_error:Csv.out_channel Lazy.t ->
+ conf:Syntax.t ->
+ dirname:string ->
+ A.t ->
+ 'a Db.t ->
+ CSV.DataType.t array option Lwt.t =
+ fun ~log_error ~conf ~dirname mapping db ->
+ let file = Filename.concat dirname (A.table mapping).file in
+
+ Lwt_io.with_file ~flags ~mode:Input file (fun ic ->
+ let open Lwt.Syntax in
+ let stream, sst_p, success =
+ SZXX.Xlsx.stream_rows ~only_sheet:(A.table mapping).tab
+ ~feed:(feed_bigstring ic) extractors
+ in
+ let* processed =
+ Lwt_stream.fold
+ (fun row acc ->
+ (* Create the table on the first line *)
+ if Int.equal 1 row.SZXX.Xlsx.row_number then
+ first_row mapping db acc row
+ else
+ match is_delayed row with
+ | true -> { acc with delayed = row :: acc.delayed }
+ | false -> (
+ let row_number = row.SZXX.Xlsx.row_number in
+ if acc.transaction then
+ State.run_row ~log_error ~mapper:delayed_mapper mapping db
+ row { acc with row_number }
+ else
+ match Db.begin_transaction db with
+ | Error e ->
+ print_endline (ImportErrors.repr_error e);
+ acc
+ | Ok () ->
+ let acc = { acc with transaction = true; row_number } in
+ State.run_row ~log_error ~mapper:delayed_mapper mapping
+ db row acc))
+ stream
+ {
+ transaction = false;
+ header = None;
+ delayed = [];
+ insert_stmt = None;
+ check_key_stmt = None;
+ row_number = 1;
+ sheet_number = (A.table mapping).tab;
+ }
+ in
+ (* Wait to reach the sst *)
+ let* sst = sst_p in
+
+ if processed.transaction then ignore (Db.commit db);
+
+ (* Insert the missing elements *)
+ ignore @@ Db.begin_transaction db;
+ List.iter processed.delayed ~f:(fun row ->
+ let fully_available_row =
+ SZXX.Xlsx.unwrap_status extractors sst row
+ in
+
+ let row_number = row.SZXX.Xlsx.row_number in
+
+ match
+ State.insert_row ~mapper:default_mapper mapping db
+ fully_available_row
+ { processed with row_number }
+ with
+ | Ok _ -> ()
+ | Error e ->
+ ImportErrors.output_error log_error e;
+ ());
+
+ ignore @@ State.clear ~log_error db mapping conf;
+ ignore @@ Db.commit db;
+
+ (* Finalize the statements created during the import *)
+ let () =
+ Option.iter (fun v -> ignore @@ Db.finalize v) processed.insert_stmt;
+ Option.iter (fun v -> ignore @@ Db.finalize v) processed.check_key_stmt
+ in
+
+ let _ =
+ Option.iter
+ (fun headers ->
+ let res = SZXX.Xlsx.unwrap_status extractors sst headers in
+
+ let values = Array.mapi res.data ~f:(fun i value -> (i, value)) in
+
+ ignore
+ @@ Db.insert_header db
+ (ImportAnalyser.Dependency.table mapping)
+ values)
+ processed.header
+ in
+
+ let header =
+ Option.map
+ (fun header ->
+ let res = SZXX.Xlsx.unwrap_status extractors sst header in
+ res.data)
+ processed.header
+ in
+
+ (* Finalize the process *)
+ let* () = success in
+
+ Lwt.return header)
diff --git a/lib/file_handler/xlsx2sql.mli b/lib/file_handler/xlsx2sql.mli
new file mode 100644
index 0000000..e09737b
--- /dev/null
+++ b/lib/file_handler/xlsx2sql.mli
@@ -0,0 +1,10 @@
+val importInDatable :
+ log_error:ImportErrors.t ->
+ conf:ImportConf.Syntax.t ->
+ dirname:string ->
+ ImportAnalyser.Dependency.t ->
+ _ ImportSQL.Db.t ->
+ ImportCSV.DataType.t array option Lwt.t
+(** Load an excel spreadsheet in an SQLite database.
+
+Return the header if at least one row where present *)