diff options
Diffstat (limited to 'lib/file_handler')
-rw-r--r-- | lib/file_handler/csv2sql.ml | 135 | ||||
-rw-r--r-- | lib/file_handler/csv2sql.mli | 10 | ||||
-rwxr-xr-x | lib/file_handler/dune | 21 | ||||
-rw-r--r-- | lib/file_handler/state.ml | 178 | ||||
-rw-r--r-- | lib/file_handler/state.mli | 46 | ||||
-rw-r--r-- | lib/file_handler/xlsx2sql.ml | 205 | ||||
-rw-r--r-- | lib/file_handler/xlsx2sql.mli | 10 |
7 files changed, 605 insertions, 0 deletions
diff --git a/lib/file_handler/csv2sql.ml b/lib/file_handler/csv2sql.ml new file mode 100644 index 0000000..42d84eb --- /dev/null +++ b/lib/file_handler/csv2sql.ml @@ -0,0 +1,135 @@ +open StdLabels +module A = ImportAnalyser.Dependency +module CSV = ImportCSV +module C = ImportContainers +module Syntax = ImportConf.Syntax +module Db = ImportSQL.Db + +type state = CSV.DataType.t array State.t + +let default_mapper : + (ImportCSV.DataType.t, ImportCSV.DataType.t array) State.mapper = + { get_row = Fun.id; get_value = Fun.id; default = ImportCSV.DataType.Null } + +let extract_values : string -> CSV.DataType.t = + fun value -> + (* Test first if the content is empty *) + if String.equal String.empty value then CSV.DataType.Null + else + (* else, try differents conversion in order to see which one works *) + match int_of_string_opt value with + | Some i -> CSV.DataType.Integer i + | None -> ( + match float_of_string_opt value with + | Some f -> CSV.DataType.Float f + | None -> + (* And finaly convert into date *) + CSV.DataType.Content value) + +(** Initialize the state for the first row, count the column number and create + the table in the database *) +let first_row : A.t -> _ Db.t -> state -> CSV.DataType.t list -> state = + fun mapping db acc row -> + (if acc.transaction then + match Db.commit db with + | Ok () -> () + | Error e -> print_endline (ImportErrors.repr_error e)); + + ignore @@ Db.create_table db mapping; + let row = Array.of_list row in + match Db.prepare_insert db mapping with + | Ok stmt -> + { + acc with + header = Some row; + transaction = false; + insert_stmt = Some stmt; + row_number = acc.row_number + 1; + } + | _ -> { acc with header = Some row; transaction = false; insert_stmt = None } + +let read_csv_line : + log_error:ImportErrors.t -> A.t -> 'a Db.t -> state -> string list -> state + = + fun ~log_error mapping db acc row -> + let processed_row = + List.to_seq row |> Seq.map extract_values |> Array.of_seq + in + if acc.State.transaction then + State.run_row ~log_error ~mapper:default_mapper mapping db processed_row acc + else + match Db.begin_transaction db with + | Error e -> + print_endline (ImportErrors.repr_error e); + acc + | Ok () -> + let acc = { acc with transaction = true } in + State.run_row ~log_error ~mapper:default_mapper mapping db processed_row + acc + +let importInDatable : + log_error:ImportErrors.t -> + conf:Syntax.t -> + dirname:string -> + A.t -> + 'a Db.t -> + CSV.DataType.t array option Lwt.t = + fun ~log_error ~conf ~dirname mapping db -> + let file = Filename.concat dirname (A.table mapping).file in + + let channel = Stdlib.open_in_bin file in + + let csv_channel = Csv.of_channel ~separator:';' ~excel_tricks:true channel in + + (* In the headers, we only keep the string. + + This line could generate an error if the headers are not correctly defined. + *) + let header = + List.map ~f:(fun v -> CSV.DataType.Content v) (Csv.next csv_channel) + in + + let state = + State. + { + transaction = false; + header = None; + insert_stmt = None; + check_key_stmt = None; + row_number = 1; + sheet_number = 1; + delayed = []; + } + in + let state = first_row mapping db state header in + + let state = + try + Csv.fold_left csv_channel ~init:state + ~f:(read_csv_line ~log_error mapping db) + with + | Csv.Failure (line, row, cause) as e -> + Printf.eprintf "Error %s on line %d — field : %s\n" cause line + (ImportCSV.Csv.column_to_string row); + raise e + in + ignore @@ State.clear ~log_error db mapping conf; + ignore @@ Db.commit db; + + (* Finalize the statements created during the import *) + let () = + Option.iter (fun v -> ignore @@ Db.finalize v) state.insert_stmt; + Option.iter (fun v -> ignore @@ Db.finalize v) state.check_key_stmt + in + + (* Insert all the headers *) + let _ = + Option.iter + (fun headers -> + let values = Array.mapi headers ~f:(fun i value -> (i, value)) in + + ignore + @@ Db.insert_header db (ImportAnalyser.Dependency.table mapping) values) + state.header + in + Lwt.return state.header diff --git a/lib/file_handler/csv2sql.mli b/lib/file_handler/csv2sql.mli new file mode 100644 index 0000000..e09737b --- /dev/null +++ b/lib/file_handler/csv2sql.mli @@ -0,0 +1,10 @@ +val importInDatable : + log_error:ImportErrors.t -> + conf:ImportConf.Syntax.t -> + dirname:string -> + ImportAnalyser.Dependency.t -> + _ ImportSQL.Db.t -> + ImportCSV.DataType.t array option Lwt.t +(** Load an excel spreadsheet in an SQLite database. + +Return the header if at least one row where present *) diff --git a/lib/file_handler/dune b/lib/file_handler/dune new file mode 100755 index 0000000..6b247db --- /dev/null +++ b/lib/file_handler/dune @@ -0,0 +1,21 @@ +(library
+ (name importFileHandler)
+ (libraries
+ csv
+ SZXX
+ sqlite3
+ base
+ core
+ lwt
+ lwt.unix
+ helpers
+ importAnalyser
+ importConf
+ importContainers
+ importCSV
+ importDataTypes
+ importErrors
+ importExpression
+ importSQL
+ )
+)
diff --git a/lib/file_handler/state.ml b/lib/file_handler/state.ml new file mode 100644 index 0000000..5b43aff --- /dev/null +++ b/lib/file_handler/state.ml @@ -0,0 +1,178 @@ +open StdLabels +module Table = ImportDataTypes.Table + +type 'a t = { + header : 'a option; + transaction : bool; + insert_stmt : Sqlite3.stmt option; + check_key_stmt : Sqlite3.stmt option; + row_number : int; + sheet_number : int; + delayed : 'a list; +} + +type insert_result = { + insert_stmt : Sqlite3.stmt option; + check_key_stmt : Sqlite3.stmt option; +} + +type ('a, 'b) mapper = { + get_row : 'b -> 'a Array.t; + get_value : 'a -> ImportCSV.DataType.t; + default : 'a; +} + +module A = ImportAnalyser.Dependency + +let insert_row : + mapper:(_, 'row) mapper -> + A.t -> + _ ImportSQL.Db.t -> + 'row -> + _ t -> + (insert_result, ImportErrors.xlsError) result = + fun ~mapper mapping db row state -> + (* Extract all columns referenced in the keys or the columns to extract *) + let keys_id = + List.fold_left (A.keys mapping) ~init:ImportContainers.IntSet.empty + ~f:(fun acc (keys : A.key) -> + let columns = keys.A.columns in + ImportContainers.IntSet.union acc (Lazy.force columns)) + and columns_id = A.columns mapping in + let ids = ImportContainers.IntSet.(union keys_id columns_id |> elements) in + + (* Filter only the required columns in the row *) + let values = + List.map ids ~f:(fun i -> + let index = i - 1 in + let value = + try Array.get (mapper.get_row row) index with + | Stdlib.Invalid_argument _ -> + (* If we have more headers than data, assume the value are NULL. + This can happen when all the line tail is empty, Excel can + give us a truncated line instead of a series of NULL *) + mapper.default + in + (index, mapper.get_value value)) + in + let keys = A.keys mapping in + + let execution = + let ( let* ) = Result.bind in + let* check_key_stmt, result = + ImportSQL.Db.eval_key db state.check_key_stmt keys values + in + let no_null = + (* We check if we have at least one key which is not null — and in such + case we ignore the line. + + If multiple keys are presents, we ensure there is at least one non + null here. + *) + match result with + | [] -> true + | _ -> + List.exists result ~f:(function + | Sqlite3.Data.FLOAT _ | Sqlite3.Data.INT _ -> true + | Sqlite3.Data.BLOB t | Sqlite3.Data.TEXT t -> + not (String.equal "" t) + | Sqlite3.Data.NONE | Sqlite3.Data.NULL -> false) + in + let* _ = + match no_null with + | true -> Ok () + | false -> Error (Failure "The key is null") + in + + let* statement = + match state.insert_stmt with + | None -> ImportSQL.Db.prepare_insert db mapping + | Some v -> Ok v + in + let* _ = ImportSQL.Db.insert db statement ~id:state.row_number values in + let* _ = ImportSQL.Db.reset statement in + + Helpers.Console.update_cursor (); + Ok { insert_stmt = Some statement; check_key_stmt } + in + + (* In case of error, wrap the exception to get the line *) + Result.map_error + (fun e -> + ImportErrors. + { + source = ImportAnalyser.Dependency.table mapping; + sheet = state.sheet_number; + row = state.row_number; + target = None; + value = CSV.DataType.Content (String.concat ~sep:", " []); + exn = e; + }) + execution + +(** Load the row with all the informations associated with this sheet. + + If an error has already been raised during the sheet, ignore this row only. *) +let run_row : + log_error:ImportErrors.t -> + mapper:(_, 'row) mapper -> + A.t -> + _ ImportSQL.Db.t -> + 'row -> + 'a t -> + 'a t = + fun ~log_error ~mapper mapping db row state -> + match insert_row ~mapper mapping db row state with + | Ok { insert_stmt; check_key_stmt } -> + { + state with + insert_stmt; + check_key_stmt; + row_number = state.row_number + 1; + } + | Error e -> + Option.iter (fun v -> ignore @@ ImportSQL.Db.finalize v) state.insert_stmt; + Option.iter + (fun v -> ignore @@ ImportSQL.Db.finalize v) + state.check_key_stmt; + ImportErrors.output_error log_error e; + { + state with + insert_stmt = None; + check_key_stmt = None; + row_number = state.row_number + 1; + } + +let clear : + log_error:ImportErrors.t -> + 'a ImportSQL.Db.t -> + A.t -> + ImportConf.Syntax.t -> + unit ImportSQL.Db.result = + fun ~log_error db mapping conf -> + ImportSQL.Db.clear_duplicates db (A.table mapping) (A.keys mapping) + ~f:(fun values -> + let line = + match snd @@ Array.get values 0 with + | ImportCSV.DataType.Integer i -> i + | _ -> -1 + and value = snd @@ Array.get values 1 + and target = + match snd @@ Array.get values 2 with + | ImportCSV.DataType.Content s -> + Some (ImportConf.get_table_for_name conf (Some s)) + | _ -> None + in + let error = + ImportErrors. + { + source = A.table mapping; + sheet = (A.table mapping).tab; + row = line; + target; + value; + exn = Failure "Duplicated key"; + } + in + + ImportErrors.output_error log_error error) diff --git a/lib/file_handler/state.mli b/lib/file_handler/state.mli new file mode 100644 index 0000000..f744c33 --- /dev/null +++ b/lib/file_handler/state.mli @@ -0,0 +1,46 @@ +type 'a t = { + header : 'a option; + transaction : bool; + insert_stmt : Sqlite3.stmt option; + check_key_stmt : Sqlite3.stmt option; + row_number : int; + sheet_number : int; + delayed : 'a list; +} + +type insert_result = { + insert_stmt : Sqlite3.stmt option; + check_key_stmt : Sqlite3.stmt option; +} + +type ('a, 'b) mapper = { + get_row : 'b -> 'a Array.t; + get_value : 'a -> ImportCSV.DataType.t; + default : 'a; +} + +val insert_row : + mapper:(_, 'row) mapper -> + ImportAnalyser.Dependency.t -> + _ ImportSQL.Db.t -> + 'row -> + _ t -> + (insert_result, ImportErrors.xlsError) result +(** Low level row insertion *) + +val run_row : + log_error:ImportErrors.t -> + mapper:(_, 'row) mapper -> + ImportAnalyser.Dependency.t -> + _ ImportSQL.Db.t -> + 'row -> + 'a t -> + 'a t + +val clear : + log_error:ImportErrors.t -> + 'a ImportSQL.Db.t -> + ImportAnalyser.Dependency.t -> + ImportConf.Syntax.t -> + unit ImportSQL.Db.result +(** Clean up the table after the insertion, check for the duplicates and external references *) diff --git a/lib/file_handler/xlsx2sql.ml b/lib/file_handler/xlsx2sql.ml new file mode 100644 index 0000000..f2d8f12 --- /dev/null +++ b/lib/file_handler/xlsx2sql.ml @@ -0,0 +1,205 @@ +open StdLabels +module A = ImportAnalyser.Dependency +module C = ImportContainers +module CSV = ImportCSV +module Syntax = ImportConf.Syntax +module Db = ImportSQL.Db + +let flags = Unix.[ O_RDONLY; O_NONBLOCK ] + +let extractors = + SZXX.Xlsx. + { + string = (fun _location s -> CSV.DataType.Content s); + error = + (fun _location s -> CSV.DataType.Error (Printf.sprintf "#ERROR# %s" s)); + boolean = + (fun _location s -> + let value = String.(equal s "1") in + CSV.DataType.Content (string_of_bool value)); + number = + (fun _location s -> + let f = Float.of_string s in + if Float.is_integer f then CSV.DataType.Integer (Float.to_int f) + else CSV.DataType.Float f); + date = (fun _location s -> CSV.DataType.Content s); + null = CSV.DataType.Null; + formula = + (fun _location ~formula s -> + ignore formula; + CSV.DataType.Content s); + } + +let feed_bigstring ic = + let open Lwt.Infix in + let len = Lwt_io.buffer_size ic in + let buf = Lwt_bytes.create len in + SZXX.Zip.Bigstring + (fun () -> + Lwt_io.read_into_bigstring ic buf 0 len >|= function + | 0 -> None + | len -> Some SZXX.Zip.{ buf; pos = 0; len }) + +(* Evaluate if the row can be processed right now (does not contain + any delayed value) *) +let is_delayed row = + Array.exists row.SZXX.Xlsx.data ~f:(function + | SZXX.Xlsx.Delayed _ -> true + | _ -> false) + +let default_mapper : + (ImportCSV.DataType.t, ImportCSV.DataType.t SZXX.Xlsx.row) State.mapper = + { + get_value = + (function + | ImportCSV.DataType.Content s -> + ImportCSV.DataType.Content (SZXX.Xml.unescape s) + | any -> any); + default = ImportCSV.DataType.Null; + get_row = (fun v -> v.SZXX.Xlsx.data); + } + +type state = CSV.DataType.t SZXX.Xlsx.status SZXX.Xlsx.row State.t + +let delayed_mapper = + State. + { + get_value = + (function + | SZXX.Xlsx.Available (CSV.DataType.Content s) -> + CSV.DataType.Content (SZXX.Xml.unescape s) + | SZXX.Xlsx.Available value -> value + | _ -> CSV.DataType.Null); + default = SZXX.Xlsx.Available CSV.DataType.Null; + get_row = (fun v -> v.SZXX.Xlsx.data); + } + +(** Initialize the state for the first row, count the column number and create + the table in the database *) +let first_row : A.t -> _ Db.t -> state -> 'a SZXX.Xlsx.row -> state = + fun mapping db acc row -> + (if acc.transaction then + match Db.commit db with + | Ok () -> () + | Error e -> print_endline (ImportErrors.repr_error e)); + + ignore @@ Db.create_table db mapping; + match Db.prepare_insert db mapping with + | Ok stmt -> + { + acc with + header = Some row; + transaction = false; + insert_stmt = Some stmt; + } + | _ -> { acc with header = Some row; transaction = false; insert_stmt = None } + +let importInDatable : + log_error:Csv.out_channel Lazy.t -> + conf:Syntax.t -> + dirname:string -> + A.t -> + 'a Db.t -> + CSV.DataType.t array option Lwt.t = + fun ~log_error ~conf ~dirname mapping db -> + let file = Filename.concat dirname (A.table mapping).file in + + Lwt_io.with_file ~flags ~mode:Input file (fun ic -> + let open Lwt.Syntax in + let stream, sst_p, success = + SZXX.Xlsx.stream_rows ~only_sheet:(A.table mapping).tab + ~feed:(feed_bigstring ic) extractors + in + let* processed = + Lwt_stream.fold + (fun row acc -> + (* Create the table on the first line *) + if Int.equal 1 row.SZXX.Xlsx.row_number then + first_row mapping db acc row + else + match is_delayed row with + | true -> { acc with delayed = row :: acc.delayed } + | false -> ( + let row_number = row.SZXX.Xlsx.row_number in + if acc.transaction then + State.run_row ~log_error ~mapper:delayed_mapper mapping db + row { acc with row_number } + else + match Db.begin_transaction db with + | Error e -> + print_endline (ImportErrors.repr_error e); + acc + | Ok () -> + let acc = { acc with transaction = true; row_number } in + State.run_row ~log_error ~mapper:delayed_mapper mapping + db row acc)) + stream + { + transaction = false; + header = None; + delayed = []; + insert_stmt = None; + check_key_stmt = None; + row_number = 1; + sheet_number = (A.table mapping).tab; + } + in + (* Wait to reach the sst *) + let* sst = sst_p in + + if processed.transaction then ignore (Db.commit db); + + (* Insert the missing elements *) + ignore @@ Db.begin_transaction db; + List.iter processed.delayed ~f:(fun row -> + let fully_available_row = + SZXX.Xlsx.unwrap_status extractors sst row + in + + let row_number = row.SZXX.Xlsx.row_number in + + match + State.insert_row ~mapper:default_mapper mapping db + fully_available_row + { processed with row_number } + with + | Ok _ -> () + | Error e -> + ImportErrors.output_error log_error e; + ()); + + ignore @@ State.clear ~log_error db mapping conf; + ignore @@ Db.commit db; + + (* Finalize the statements created during the import *) + let () = + Option.iter (fun v -> ignore @@ Db.finalize v) processed.insert_stmt; + Option.iter (fun v -> ignore @@ Db.finalize v) processed.check_key_stmt + in + + let _ = + Option.iter + (fun headers -> + let res = SZXX.Xlsx.unwrap_status extractors sst headers in + + let values = Array.mapi res.data ~f:(fun i value -> (i, value)) in + + ignore + @@ Db.insert_header db + (ImportAnalyser.Dependency.table mapping) + values) + processed.header + in + + let header = + Option.map + (fun header -> + let res = SZXX.Xlsx.unwrap_status extractors sst header in + res.data) + processed.header + in + + (* Finalize the process *) + let* () = success in + + Lwt.return header) diff --git a/lib/file_handler/xlsx2sql.mli b/lib/file_handler/xlsx2sql.mli new file mode 100644 index 0000000..e09737b --- /dev/null +++ b/lib/file_handler/xlsx2sql.mli @@ -0,0 +1,10 @@ +val importInDatable : + log_error:ImportErrors.t -> + conf:ImportConf.Syntax.t -> + dirname:string -> + ImportAnalyser.Dependency.t -> + _ ImportSQL.Db.t -> + ImportCSV.DataType.t array option Lwt.t +(** Load an excel spreadsheet in an SQLite database. + +Return the header if at least one row where present *) |