aboutsummaryrefslogtreecommitdiff
path: root/lib/file_handler/xlsx2sql.ml
diff options
context:
space:
mode:
authorSébastien Dailly <sebastien@dailly.me>2024-03-14 08:26:58 +0100
committerSébastien Dailly <sebastien@dailly.me>2024-03-14 08:26:58 +0100
commit6b377719c10d5ab3343fd5221f99a4a21008e25a (patch)
treea7c1e9a820d339a2f161af3e09cf9e3161286796 /lib/file_handler/xlsx2sql.ml
Initial commitmain
Diffstat (limited to 'lib/file_handler/xlsx2sql.ml')
-rw-r--r--lib/file_handler/xlsx2sql.ml205
1 files changed, 205 insertions, 0 deletions
diff --git a/lib/file_handler/xlsx2sql.ml b/lib/file_handler/xlsx2sql.ml
new file mode 100644
index 0000000..f2d8f12
--- /dev/null
+++ b/lib/file_handler/xlsx2sql.ml
@@ -0,0 +1,205 @@
+open StdLabels
+module A = ImportAnalyser.Dependency
+module C = ImportContainers
+module CSV = ImportCSV
+module Syntax = ImportConf.Syntax
+module Db = ImportSQL.Db
+
+let flags = Unix.[ O_RDONLY; O_NONBLOCK ]
+
+let extractors =
+ SZXX.Xlsx.
+ {
+ string = (fun _location s -> CSV.DataType.Content s);
+ error =
+ (fun _location s -> CSV.DataType.Error (Printf.sprintf "#ERROR# %s" s));
+ boolean =
+ (fun _location s ->
+ let value = String.(equal s "1") in
+ CSV.DataType.Content (string_of_bool value));
+ number =
+ (fun _location s ->
+ let f = Float.of_string s in
+ if Float.is_integer f then CSV.DataType.Integer (Float.to_int f)
+ else CSV.DataType.Float f);
+ date = (fun _location s -> CSV.DataType.Content s);
+ null = CSV.DataType.Null;
+ formula =
+ (fun _location ~formula s ->
+ ignore formula;
+ CSV.DataType.Content s);
+ }
+
+let feed_bigstring ic =
+ let open Lwt.Infix in
+ let len = Lwt_io.buffer_size ic in
+ let buf = Lwt_bytes.create len in
+ SZXX.Zip.Bigstring
+ (fun () ->
+ Lwt_io.read_into_bigstring ic buf 0 len >|= function
+ | 0 -> None
+ | len -> Some SZXX.Zip.{ buf; pos = 0; len })
+
+(* Evaluate if the row can be processed right now (does not contain
+ any delayed value) *)
+let is_delayed row =
+ Array.exists row.SZXX.Xlsx.data ~f:(function
+ | SZXX.Xlsx.Delayed _ -> true
+ | _ -> false)
+
+let default_mapper :
+ (ImportCSV.DataType.t, ImportCSV.DataType.t SZXX.Xlsx.row) State.mapper =
+ {
+ get_value =
+ (function
+ | ImportCSV.DataType.Content s ->
+ ImportCSV.DataType.Content (SZXX.Xml.unescape s)
+ | any -> any);
+ default = ImportCSV.DataType.Null;
+ get_row = (fun v -> v.SZXX.Xlsx.data);
+ }
+
+type state = CSV.DataType.t SZXX.Xlsx.status SZXX.Xlsx.row State.t
+
+let delayed_mapper =
+ State.
+ {
+ get_value =
+ (function
+ | SZXX.Xlsx.Available (CSV.DataType.Content s) ->
+ CSV.DataType.Content (SZXX.Xml.unescape s)
+ | SZXX.Xlsx.Available value -> value
+ | _ -> CSV.DataType.Null);
+ default = SZXX.Xlsx.Available CSV.DataType.Null;
+ get_row = (fun v -> v.SZXX.Xlsx.data);
+ }
+
+(** Initialize the state for the first row, count the column number and create
+ the table in the database *)
+let first_row : A.t -> _ Db.t -> state -> 'a SZXX.Xlsx.row -> state =
+ fun mapping db acc row ->
+ (if acc.transaction then
+ match Db.commit db with
+ | Ok () -> ()
+ | Error e -> print_endline (ImportErrors.repr_error e));
+
+ ignore @@ Db.create_table db mapping;
+ match Db.prepare_insert db mapping with
+ | Ok stmt ->
+ {
+ acc with
+ header = Some row;
+ transaction = false;
+ insert_stmt = Some stmt;
+ }
+ | _ -> { acc with header = Some row; transaction = false; insert_stmt = None }
+
+let importInDatable :
+ log_error:Csv.out_channel Lazy.t ->
+ conf:Syntax.t ->
+ dirname:string ->
+ A.t ->
+ 'a Db.t ->
+ CSV.DataType.t array option Lwt.t =
+ fun ~log_error ~conf ~dirname mapping db ->
+ let file = Filename.concat dirname (A.table mapping).file in
+
+ Lwt_io.with_file ~flags ~mode:Input file (fun ic ->
+ let open Lwt.Syntax in
+ let stream, sst_p, success =
+ SZXX.Xlsx.stream_rows ~only_sheet:(A.table mapping).tab
+ ~feed:(feed_bigstring ic) extractors
+ in
+ let* processed =
+ Lwt_stream.fold
+ (fun row acc ->
+ (* Create the table on the first line *)
+ if Int.equal 1 row.SZXX.Xlsx.row_number then
+ first_row mapping db acc row
+ else
+ match is_delayed row with
+ | true -> { acc with delayed = row :: acc.delayed }
+ | false -> (
+ let row_number = row.SZXX.Xlsx.row_number in
+ if acc.transaction then
+ State.run_row ~log_error ~mapper:delayed_mapper mapping db
+ row { acc with row_number }
+ else
+ match Db.begin_transaction db with
+ | Error e ->
+ print_endline (ImportErrors.repr_error e);
+ acc
+ | Ok () ->
+ let acc = { acc with transaction = true; row_number } in
+ State.run_row ~log_error ~mapper:delayed_mapper mapping
+ db row acc))
+ stream
+ {
+ transaction = false;
+ header = None;
+ delayed = [];
+ insert_stmt = None;
+ check_key_stmt = None;
+ row_number = 1;
+ sheet_number = (A.table mapping).tab;
+ }
+ in
+ (* Wait to reach the sst *)
+ let* sst = sst_p in
+
+ if processed.transaction then ignore (Db.commit db);
+
+ (* Insert the missing elements *)
+ ignore @@ Db.begin_transaction db;
+ List.iter processed.delayed ~f:(fun row ->
+ let fully_available_row =
+ SZXX.Xlsx.unwrap_status extractors sst row
+ in
+
+ let row_number = row.SZXX.Xlsx.row_number in
+
+ match
+ State.insert_row ~mapper:default_mapper mapping db
+ fully_available_row
+ { processed with row_number }
+ with
+ | Ok _ -> ()
+ | Error e ->
+ ImportErrors.output_error log_error e;
+ ());
+
+ ignore @@ State.clear ~log_error db mapping conf;
+ ignore @@ Db.commit db;
+
+ (* Finalize the statements created during the import *)
+ let () =
+ Option.iter (fun v -> ignore @@ Db.finalize v) processed.insert_stmt;
+ Option.iter (fun v -> ignore @@ Db.finalize v) processed.check_key_stmt
+ in
+
+ let _ =
+ Option.iter
+ (fun headers ->
+ let res = SZXX.Xlsx.unwrap_status extractors sst headers in
+
+ let values = Array.mapi res.data ~f:(fun i value -> (i, value)) in
+
+ ignore
+ @@ Db.insert_header db
+ (ImportAnalyser.Dependency.table mapping)
+ values)
+ processed.header
+ in
+
+ let header =
+ Option.map
+ (fun header ->
+ let res = SZXX.Xlsx.unwrap_status extractors sst header in
+ res.data)
+ processed.header
+ in
+
+ (* Finalize the process *)
+ let* () = success in
+
+ Lwt.return header)