From 6b377719c10d5ab3343fd5221f99a4a21008e25a Mon Sep 17 00:00:00 2001 From: Sébastien Dailly Date: Thu, 14 Mar 2024 08:26:58 +0100 Subject: Initial commit --- lib/file_handler/xlsx2sql.ml | 205 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 205 insertions(+) create mode 100644 lib/file_handler/xlsx2sql.ml (limited to 'lib/file_handler/xlsx2sql.ml') diff --git a/lib/file_handler/xlsx2sql.ml b/lib/file_handler/xlsx2sql.ml new file mode 100644 index 0000000..f2d8f12 --- /dev/null +++ b/lib/file_handler/xlsx2sql.ml @@ -0,0 +1,205 @@ +open StdLabels +module A = ImportAnalyser.Dependency +module C = ImportContainers +module CSV = ImportCSV +module Syntax = ImportConf.Syntax +module Db = ImportSQL.Db + +let flags = Unix.[ O_RDONLY; O_NONBLOCK ] + +let extractors = + SZXX.Xlsx. + { + string = (fun _location s -> CSV.DataType.Content s); + error = + (fun _location s -> CSV.DataType.Error (Printf.sprintf "#ERROR# %s" s)); + boolean = + (fun _location s -> + let value = String.(equal s "1") in + CSV.DataType.Content (string_of_bool value)); + number = + (fun _location s -> + let f = Float.of_string s in + if Float.is_integer f then CSV.DataType.Integer (Float.to_int f) + else CSV.DataType.Float f); + date = (fun _location s -> CSV.DataType.Content s); + null = CSV.DataType.Null; + formula = + (fun _location ~formula s -> + ignore formula; + CSV.DataType.Content s); + } + +let feed_bigstring ic = + let open Lwt.Infix in + let len = Lwt_io.buffer_size ic in + let buf = Lwt_bytes.create len in + SZXX.Zip.Bigstring + (fun () -> + Lwt_io.read_into_bigstring ic buf 0 len >|= function + | 0 -> None + | len -> Some SZXX.Zip.{ buf; pos = 0; len }) + +(* Evaluate if the row can be processed right now (does not contain + any delayed value) *) +let is_delayed row = + Array.exists row.SZXX.Xlsx.data ~f:(function + | SZXX.Xlsx.Delayed _ -> true + | _ -> false) + +let default_mapper : + (ImportCSV.DataType.t, ImportCSV.DataType.t SZXX.Xlsx.row) State.mapper = + { + get_value = + (function + | ImportCSV.DataType.Content s -> + ImportCSV.DataType.Content (SZXX.Xml.unescape s) + | any -> any); + default = ImportCSV.DataType.Null; + get_row = (fun v -> v.SZXX.Xlsx.data); + } + +type state = CSV.DataType.t SZXX.Xlsx.status SZXX.Xlsx.row State.t + +let delayed_mapper = + State. + { + get_value = + (function + | SZXX.Xlsx.Available (CSV.DataType.Content s) -> + CSV.DataType.Content (SZXX.Xml.unescape s) + | SZXX.Xlsx.Available value -> value + | _ -> CSV.DataType.Null); + default = SZXX.Xlsx.Available CSV.DataType.Null; + get_row = (fun v -> v.SZXX.Xlsx.data); + } + +(** Initialize the state for the first row, count the column number and create + the table in the database *) +let first_row : A.t -> _ Db.t -> state -> 'a SZXX.Xlsx.row -> state = + fun mapping db acc row -> + (if acc.transaction then + match Db.commit db with + | Ok () -> () + | Error e -> print_endline (ImportErrors.repr_error e)); + + ignore @@ Db.create_table db mapping; + match Db.prepare_insert db mapping with + | Ok stmt -> + { + acc with + header = Some row; + transaction = false; + insert_stmt = Some stmt; + } + | _ -> { acc with header = Some row; transaction = false; insert_stmt = None } + +let importInDatable : + log_error:Csv.out_channel Lazy.t -> + conf:Syntax.t -> + dirname:string -> + A.t -> + 'a Db.t -> + CSV.DataType.t array option Lwt.t = + fun ~log_error ~conf ~dirname mapping db -> + let file = Filename.concat dirname (A.table mapping).file in + + Lwt_io.with_file ~flags ~mode:Input file (fun ic -> + let open Lwt.Syntax in + let stream, sst_p, success = + SZXX.Xlsx.stream_rows ~only_sheet:(A.table mapping).tab + ~feed:(feed_bigstring ic) extractors + in + let* processed = + Lwt_stream.fold + (fun row acc -> + (* Create the table on the first line *) + if Int.equal 1 row.SZXX.Xlsx.row_number then + first_row mapping db acc row + else + match is_delayed row with + | true -> { acc with delayed = row :: acc.delayed } + | false -> ( + let row_number = row.SZXX.Xlsx.row_number in + if acc.transaction then + State.run_row ~log_error ~mapper:delayed_mapper mapping db + row { acc with row_number } + else + match Db.begin_transaction db with + | Error e -> + print_endline (ImportErrors.repr_error e); + acc + | Ok () -> + let acc = { acc with transaction = true; row_number } in + State.run_row ~log_error ~mapper:delayed_mapper mapping + db row acc)) + stream + { + transaction = false; + header = None; + delayed = []; + insert_stmt = None; + check_key_stmt = None; + row_number = 1; + sheet_number = (A.table mapping).tab; + } + in + (* Wait to reach the sst *) + let* sst = sst_p in + + if processed.transaction then ignore (Db.commit db); + + (* Insert the missing elements *) + ignore @@ Db.begin_transaction db; + List.iter processed.delayed ~f:(fun row -> + let fully_available_row = + SZXX.Xlsx.unwrap_status extractors sst row + in + + let row_number = row.SZXX.Xlsx.row_number in + + match + State.insert_row ~mapper:default_mapper mapping db + fully_available_row + { processed with row_number } + with + | Ok _ -> () + | Error e -> + ImportErrors.output_error log_error e; + ()); + + ignore @@ State.clear ~log_error db mapping conf; + ignore @@ Db.commit db; + + (* Finalize the statements created during the import *) + let () = + Option.iter (fun v -> ignore @@ Db.finalize v) processed.insert_stmt; + Option.iter (fun v -> ignore @@ Db.finalize v) processed.check_key_stmt + in + + let _ = + Option.iter + (fun headers -> + let res = SZXX.Xlsx.unwrap_status extractors sst headers in + + let values = Array.mapi res.data ~f:(fun i value -> (i, value)) in + + ignore + @@ Db.insert_header db + (ImportAnalyser.Dependency.table mapping) + values) + processed.header + in + + let header = + Option.map + (fun header -> + let res = SZXX.Xlsx.unwrap_status extractors sst header in + res.data) + processed.header + in + + (* Finalize the process *) + let* () = success in + + Lwt.return header) -- cgit v1.2.3