open StdLabels module A = ImportAnalyser.Dependency module C = ImportContainers module CSV = ImportCSV module Syntax = ImportConf.Syntax module Db = ImportSQL.Db let flags = Unix.[ O_RDONLY; O_NONBLOCK ] let extractors = SZXX.Xlsx. { string = (fun _location s -> CSV.DataType.Content s); error = (fun _location s -> CSV.DataType.Error (Printf.sprintf "#ERROR# %s" s)); boolean = (fun _location s -> let value = String.(equal s "1") in CSV.DataType.Content (string_of_bool value)); number = (fun _location s -> let f = Float.of_string s in if Float.is_integer f then CSV.DataType.Integer (Float.to_int f) else CSV.DataType.Float f); date = (fun _location s -> CSV.DataType.Content s); null = CSV.DataType.Null; formula = (fun _location ~formula s -> ignore formula; CSV.DataType.Content s); } let feed_bigstring ic = let open Lwt.Infix in let len = Lwt_io.buffer_size ic in let buf = Lwt_bytes.create len in SZXX.Zip.Bigstring (fun () -> Lwt_io.read_into_bigstring ic buf 0 len >|= function | 0 -> None | len -> Some SZXX.Zip.{ buf; pos = 0; len }) (* Evaluate if the row can be processed right now (does not contain any delayed value) *) let is_delayed row = Array.exists row.SZXX.Xlsx.data ~f:(function | SZXX.Xlsx.Delayed _ -> true | _ -> false) let default_mapper : (ImportCSV.DataType.t, ImportCSV.DataType.t SZXX.Xlsx.row) State.mapper = { get_value = (function | ImportCSV.DataType.Content s -> ImportCSV.DataType.Content (SZXX.Xml.unescape s) | any -> any); default = ImportCSV.DataType.Null; get_row = (fun v -> v.SZXX.Xlsx.data); } type state = CSV.DataType.t SZXX.Xlsx.status SZXX.Xlsx.row State.t let delayed_mapper = State. { get_value = (function | SZXX.Xlsx.Available (CSV.DataType.Content s) -> CSV.DataType.Content (SZXX.Xml.unescape s) | SZXX.Xlsx.Available value -> value | _ -> CSV.DataType.Null); default = SZXX.Xlsx.Available CSV.DataType.Null; get_row = (fun v -> v.SZXX.Xlsx.data); } (** Initialize the state for the first row, count the column number and create the table in the database *) let first_row : A.t -> _ Db.t -> state -> 'a SZXX.Xlsx.row -> state = fun mapping db acc row -> (if acc.transaction then match Db.commit db with | Ok () -> () | Error e -> print_endline (ImportErrors.repr_error e)); ignore @@ Db.create_table db mapping; match Db.prepare_insert db mapping with | Ok stmt -> { acc with header = Some row; transaction = false; insert_stmt = Some stmt; } | _ -> { acc with header = Some row; transaction = false; insert_stmt = None } let importInDatable : log_error:Csv.out_channel Lazy.t -> conf:Syntax.t -> dirname:string -> A.t -> 'a Db.t -> CSV.DataType.t array option Lwt.t = fun ~log_error ~conf ~dirname mapping db -> let file = Filename.concat dirname (A.table mapping).file in Lwt_io.with_file ~flags ~mode:Input file (fun ic -> let open Lwt.Syntax in let stream, sst_p, success = SZXX.Xlsx.stream_rows ~only_sheet:(A.table mapping).tab ~feed:(feed_bigstring ic) extractors in let* processed = Lwt_stream.fold (fun row acc -> (* Create the table on the first line *) if Int.equal 1 row.SZXX.Xlsx.row_number then first_row mapping db acc row else match is_delayed row with | true -> { acc with delayed = row :: acc.delayed } | false -> ( let row_number = row.SZXX.Xlsx.row_number in if acc.transaction then State.run_row ~log_error ~mapper:delayed_mapper mapping db row { acc with row_number } else match Db.begin_transaction db with | Error e -> print_endline (ImportErrors.repr_error e); acc | Ok () -> let acc = { acc with transaction = true; row_number } in State.run_row ~log_error ~mapper:delayed_mapper mapping db row acc)) stream { transaction = false; header = None; delayed = []; insert_stmt = None; check_key_stmt = None; row_number = 1; sheet_number = (A.table mapping).tab; } in (* Wait to reach the sst *) let* sst = sst_p in if processed.transaction then ignore (Db.commit db); (* Insert the missing elements *) ignore @@ Db.begin_transaction db; List.iter processed.delayed ~f:(fun row -> let fully_available_row = SZXX.Xlsx.unwrap_status extractors sst row in let row_number = row.SZXX.Xlsx.row_number in match State.insert_row ~mapper:default_mapper mapping db fully_available_row { processed with row_number } with | Ok _ -> () | Error e -> ImportErrors.output_error log_error e; ()); ignore @@ State.clear ~log_error db mapping conf; ignore @@ Db.commit db; (* Finalize the statements created during the import *) let () = Option.iter (fun v -> ignore @@ Db.finalize v) processed.insert_stmt; Option.iter (fun v -> ignore @@ Db.finalize v) processed.check_key_stmt in let _ = Option.iter (fun headers -> let res = SZXX.Xlsx.unwrap_status extractors sst headers in let values = Array.mapi res.data ~f:(fun i value -> (i, value)) in ignore @@ Db.insert_header db (ImportAnalyser.Dependency.table mapping) values) processed.header in let header = Option.map (fun header -> let res = SZXX.Xlsx.unwrap_status extractors sst header in res.data) processed.header in (* Finalize the process *) let* () = success in Lwt.return header)