open StdLabels
module A = ImportAnalyser.Dependency
module CSV = ImportCSV
module C = ImportContainers
module Syntax = ImportConf.Syntax
module Db = ImportSQL.Db
type state = CSV.DataType.t array State.t
let default_mapper :
(ImportCSV.DataType.t, ImportCSV.DataType.t array) State.mapper =
{ get_row = Fun.id; get_value = Fun.id; default = ImportCSV.DataType.Null }
let extract_values : string -> CSV.DataType.t =
fun value ->
(* Test first if the content is empty *)
if String.equal String.empty value then CSV.DataType.Null
else
(* else, try differents conversion in order to see which one works *)
match int_of_string_opt value with
| Some i -> CSV.DataType.Integer i
| None -> (
match float_of_string_opt value with
| Some f -> CSV.DataType.Float f
| None ->
(* And finaly convert into date *)
CSV.DataType.Content value)
(** Initialize the state for the first row, count the column number and create
the table in the database *)
+ the table in the database *)
fun mapping db acc row ->
(if acc.transaction then
match Db.commit db with
| Ok () -> ()
+ | Ok () -> ()
ignore @@ Db.create_table db mapping;
let row = Array.of_list row in
match Db.prepare_insert db mapping with
| Ok stmt ->
{
+ {
+ acc with
transaction = false;
insert_stmt = Some stmt;
row_number = acc.row_number + 1;
}
+ }
let read_csv_line :
+let read_csv_line :
=
+ =
let processed_row =
+ let processed_row =
in
+ in
+ if acc.State.transaction then
else
+ else
| Error e ->
+ | Error e ->
acc
+ acc
+ | Ok () ->
+ let acc = { acc with transaction = true } in
acc
+ acc
log_error:ImportErrors.t ->
conf:Syntax.t ->
dirname:string ->
A.t ->
'a Db.t ->
+ 'a Db.t ->
fun ~log_error ~conf ~dirname mapping db ->
let file = Filename.concat dirname (A.table mapping).file in
let channel = Stdlib.open_in_bin file in
+ let channel = Stdlib.open_in_bin file in
+ let csv_channel = Csv.of_channel ~separator:';' ~excel_tricks:true channel in
let header =
List.map ~f:(fun v -> CSV.DataType.Content v) (Csv.next csv_channel)
in
let state =
State.
{
+ let state =
+ State.
+ {
check_key_stmt = None;
row_number = 1;
sheet_number = 1;
delayed = [];
}
in
+ delayed = [];
+ }
try
Csv.fold_left csv_channel ~init:state
+ let state =
with
| Csv.Failure (line, row, cause) as e ->
Printf.eprintf "Error %s on line %d — field : %s\n" cause line
+ with
raise e
in
ignore @@ State.clear ~log_error db mapping conf;
+ raise e
+ in
let () =
+ ignore @@ Db.commit db;
Option.iter (fun v -> ignore @@ Db.finalize v) state.check_key_stmt
in
(* Insert all the headers *)
let _ =
+ in
(fun headers ->
+ let _ =
ignore
+ (fun headers ->
state.header
in
Lwt.return state.header
+ state.header
+ in
+ Lwt.return state.header
+val importInDatable :
+ log_error:ImportErrors.t ->
+ conf:ImportConf.Syntax.t ->
+ dirname:string ->
+ ImportAnalyser.Dependency.t ->
+ _ ImportSQL.Db.t ->
+ ImportCSV.DataType.t array option Lwt.t
+(** Load an excel spreadsheet in an SQLite database.
+Return the header if at least one row where present *)
+ (name importFileHandler)
+ (libraries
+ csv
+ sqlite3
+ base
+ core
+ lwt
+ lwt.unix
+ helpers
+ importAnalyser
+ importConf
+ importContainers
+ importCSV
+ importDataTypes
+ importErrors
+ importExpression
+ importSQL
+ )
open StdLabels
module Table = ImportDataTypes.Table
+type 'a t = {
+ header : 'a option;
+ transaction : bool;
+ insert_stmt : Sqlite3.stmt option;
+ check_key_stmt : Sqlite3.stmt option;
+ row_number : int;
+ sheet_number : int;
+ delayed : 'a list;
+type insert_result = {
+ insert_stmt : Sqlite3.stmt option;
+ check_key_stmt : Sqlite3.stmt option;
+type ('a, 'b) mapper = {
+ get_row : 'b -> 'a Array.t;
+ get_value : 'a -> ImportCSV.DataType.t;
+ default : 'a;
module A = ImportAnalyser.Dependency
+let insert_row :
+ mapper:(_, 'row) mapper ->
+ A.t ->
+ _ ImportSQL.Db.t ->
+ 'row ->
+ _ t ->
fun ~mapper mapping db row state ->
+ fun ~mapper mapping db row state ->
let keys_id =
+ let keys_id =
~f:(fun acc (keys : A.key) ->
let columns = keys.A.columns in
+ let columns = keys.A.columns in
and columns_id = A.columns mapping in
+ and columns_id = A.columns mapping in
(* Filter only the required columns in the row *)
let values =
+ let values =
let index = i - 1 in
let value =
+ let value =
| Stdlib.Invalid_argument _ ->
+ | Stdlib.Invalid_argument _ ->
+ (* If we have more headers than data, assume the value are NULL.
+ This can happen when all the line tail is empty, Excel can
mapper.default
in
+ in
in
+ in
let execution =
+ let execution =
let* check_key_stmt, result =
+ let* check_key_stmt, result =
in
+ in
+ let no_null =
+ (* We check if we have at least one key which is not null — and in such
+ case we ignore the line.
+ If multiple keys are presents, we ensure there is at least one non
+ null here.
+ *)
| [] -> true
| _ ->
+ | _ ->
+ List.exists result ~f:(function
| Sqlite3.Data.BLOB t | Sqlite3.Data.TEXT t ->
not (String.equal "" t)
+ not (String.equal "" t)
in

+ let* _ =
+ match no_null with
+ | true -> Ok ()
+ | false -> Error (Failure "The key is null")
+ in
+ let* statement =
+ match state.insert_stmt with
+ | None -> ImportSQL.Db.prepare_insert db mapping
+ | Some v -> Ok v
+ in
+ let* _ = ImportSQL.Db.insert db statement ~id:state.row_number values in
+ let* _ = ImportSQL.Db.reset statement in
+ Helpers.Console.update_cursor ();
+ Ok { insert_stmt = Some statement; check_key_stmt }
+ in
+ (* In case of error, wrap the exception to get the line *)
+ Result.map_error
+ (fun e ->
+ ImportErrors.
+ {
+ source = ImportAnalyser.Dependency.table mapping;
+ sheet = state.sheet_number;
+ row = state.row_number;
+ target = None;
+ value = CSV.DataType.Content (String.concat ~sep:", " []);
+ exn = e;
+ })
+ execution
+(** Load the row with all the informations associated with this sheet.
+ If an error has already been raised during the sheet, ignore this row only. *)
+let run_row :
+ log_error:ImportErrors.t ->
+ mapper:(_, 'row) mapper ->
+ A.t ->
+ _ ImportSQL.Db.t ->
+ 'row ->
+ 'a t ->
+ 'a t =
+ fun ~log_error ~mapper mapping db row state ->
+ match insert_row ~mapper mapping db row state with
+ | Ok { insert_stmt; check_key_stmt } ->
+ {
+ state with
+ insert_stmt;
+ check_key_stmt;
+ row_number = state.row_number + 1;
+ }
+ | Error e ->
+ Option.iter (fun v -> ignore @@ ImportSQL.Db.finalize v) state.insert_stmt;
+ Option.iter
+ (fun v -> ignore @@ ImportSQL.Db.finalize v)
+ state.check_key_stmt;
+ ImportErrors.output_error log_error e;
+ {
+ state with
+ insert_stmt = None;
+ check_key_stmt = None;
+ row_number = state.row_number + 1;
+ }
+let clear :
+ log_error:ImportErrors.t ->
+ 'a ImportSQL.Db.t ->
+ A.t ->
+ ImportConf.Syntax.t ->
+ unit ImportSQL.Db.result =
+ fun ~log_error db mapping conf ->
+ ImportSQL.Db.clear_duplicates db (A.table mapping) (A.keys mapping)
+ ~f:(fun values ->
+ let line =
+ match snd @@ Array.get values 0 with
+ | ImportCSV.DataType.Integer i -> i
+ | _ -> -1
+ and value = snd @@ Array.get values 1
+ and target =
+ match snd @@ Array.get values 2 with
+ | ImportCSV.DataType.Content s ->
+ Some (ImportConf.get_table_for_name conf (Some s))
+ | _ -> None
+ in
+ let error =
+ ImportErrors.
+ {
+ source = A.table mapping;
+ sheet = (A.table mapping).tab;
+ row = line;
+ target;
+ value;
+ exn = Failure "Duplicated key";
+ }
+ in
+ ImportErrors.output_error log_error error)
+type 'a t = {
+ header : 'a option;
+ transaction : bool;
+ insert_stmt : Sqlite3.stmt option;
+ check_key_stmt : Sqlite3.stmt option;
+ row_number : int;
+ sheet_number : int;
+ delayed : 'a list;
+type insert_result = {
+ insert_stmt : Sqlite3.stmt option;
+ check_key_stmt : Sqlite3.stmt option;
+type ('a, 'b) mapper = {
+ get_row : 'b -> 'a Array.t;
+ get_value : 'a -> ImportCSV.DataType.t;
+ default : 'a;
+val insert_row :
+ mapper:(_, 'row) mapper ->
+ ImportAnalyser.Dependency.t ->
+ _ ImportSQL.Db.t ->
+ 'row ->
+ _ t ->
+ (insert_result, ImportErrors.xlsError) result
+(** Low level row insertion *)
+val run_row :
+ log_error:ImportErrors.t ->
+ mapper:(_, 'row) mapper ->
+ ImportAnalyser.Dependency.t ->
+ _ ImportSQL.Db.t ->
+ 'row ->
+ 'a t ->
+ 'a t
+val clear :
+ log_error:ImportErrors.t ->
+ 'a ImportSQL.Db.t ->
+ ImportAnalyser.Dependency.t ->
+ ImportConf.Syntax.t ->
+ unit ImportSQL.Db.result
+(** Clean up the table after the insertion, check for the duplicates and external references *)
+open StdLabels
+module A = ImportAnalyser.Dependency
+module C = ImportContainers
+module CSV = ImportCSV
+module Syntax = ImportConf.Syntax
+module Db = ImportSQL.Db
+let flags = Unix.[ O_RDONLY; O_NONBLOCK ]
+let extractors =
+ SZXX.Xlsx.
+ {
+ string = (fun _location s -> CSV.DataType.Content s);
+ error =
+ (fun _location s -> CSV.DataType.Error (Printf.sprintf "#ERROR# %s" s));
+ boolean =
+ (fun _location s ->
+ let value = String.(equal s "1") in
+ CSV.DataType.Content (string_of_bool value));
+ number =
+ (fun _location s ->
+ let f = Float.of_string s in
+ if Float.is_integer f then CSV.DataType.Integer (Float.to_int f)
+ else CSV.DataType.Float f);
+ date = (fun _location s -> CSV.DataType.Content s);
+ null = CSV.DataType.Null;
+ formula =
+ (fun _location ~formula s ->
+ ignore formula;
+ CSV.DataType.Content s);
+ }
+let feed_bigstring ic =
+ let open Lwt.Infix in
+ let len = Lwt_io.buffer_size ic in
+ let buf = Lwt_bytes.create len in
+ SZXX.Zip.Bigstring
+ (fun () ->
+ Lwt_io.read_into_bigstring ic buf 0 len >|= function
+ | 0 -> None
+ | len -> Some SZXX.Zip.{ buf; pos = 0; len })
+(* Evaluate if the row can be processed right now (does not contain
+ any delayed value) *)
+let is_delayed row =
+ Array.exists row.SZXX.Xlsx.data ~f:(function
+ | SZXX.Xlsx.Delayed _ -> true
+ | _ -> false)
+let default_mapper :
+ (ImportCSV.DataType.t, ImportCSV.DataType.t SZXX.Xlsx.row) State.mapper =
+ {
+ get_value =
+ (function
+ | ImportCSV.DataType.Content s ->
+ ImportCSV.DataType.Content (SZXX.Xml.unescape s)
+ | any -> any);
+ default = ImportCSV.DataType.Null;
+ get_row = (fun v -> v.SZXX.Xlsx.data);
+ }
+type state = CSV.DataType.t SZXX.Xlsx.status SZXX.Xlsx.row State.t
+let delayed_mapper =
+ State.
+ {
+ get_value =
+ (function
+ | SZXX.Xlsx.Available (CSV.DataType.Content s) ->
+ CSV.DataType.Content (SZXX.Xml.unescape s)
+ | SZXX.Xlsx.Available value -> value
+ | _ -> CSV.DataType.Null);
+ default = SZXX.Xlsx.Available CSV.DataType.Null;
+ get_row = (fun v -> v.SZXX.Xlsx.data);
+ }
+(** Initialize the state for the first row, count the column number and create
+ the table in the database *)
+let first_row : A.t -> _ Db.t -> state -> 'a SZXX.Xlsx.row -> state =
+ fun mapping db acc row ->
+ (if acc.transaction then
+ match Db.commit db with
+ | Ok () -> ()
+ | Error e -> print_endline (ImportErrors.repr_error e));
+ ignore @@ Db.create_table db mapping;
+ match Db.prepare_insert db mapping with
+ | Ok stmt ->
+ {
+ acc with
+ header = Some row;
+ transaction = false;
+ insert_stmt = Some stmt;
+ }
+ | _ -> { acc with header = Some row; transaction = false; insert_stmt = None }
+let importInDatable :
+ log_error:Csv.out_channel Lazy.t ->
+ conf:Syntax.t ->
+ dirname:string ->
+ A.t ->
+ 'a Db.t ->
+ CSV.DataType.t array option Lwt.t =
+ fun ~log_error ~conf ~dirname mapping db ->
+ let file = Filename.concat dirname (A.table mapping).file in
+ Lwt_io.with_file ~flags ~mode:Input file (fun ic ->
+ let open Lwt.Syntax in
+ let stream, sst_p, success =
+ SZXX.Xlsx.stream_rows ~only_sheet:(A.table mapping).tab
+ ~feed:(feed_bigstring ic) extractors
+ in
+ let* processed =
+ Lwt_stream.fold
+ (fun row acc ->
+ (* Create the table on the first line *)
+ if Int.equal 1 row.SZXX.Xlsx.row_number then
+ first_row mapping db acc row
+ else
+ match is_delayed row with
+ | true -> { acc with delayed = row :: acc.delayed }
+ | false -> (
+ let row_number = row.SZXX.Xlsx.row_number in
+ if acc.transaction then
+ State.run_row ~log_error ~mapper:delayed_mapper mapping db
+ row { acc with row_number }
+ else
+ match Db.begin_transaction db with
+ | Error e ->
+ print_endline (ImportErrors.repr_error e);
+ acc
+ | Ok () ->
+ let acc = { acc with transaction = true; row_number } in
+ State.run_row ~log_error ~mapper:delayed_mapper mapping
+ db row acc))
+ stream
+ {
+ transaction = false;
+ header = None;
+ delayed = [];
+ insert_stmt = None;
+ check_key_stmt = None;
+ row_number = 1;
+ sheet_number = (A.table mapping).tab;
+ }
+ in
+ (* Wait to reach the sst *)
+ let* sst = sst_p in
+ if processed.transaction then ignore (Db.commit db);
+ (* Insert the missing elements *)
+ ignore @@ Db.begin_transaction db;
+ List.iter processed.delayed ~f:(fun row ->
+ let fully_available_row =
+ SZXX.Xlsx.unwrap_status extractors sst row
+ in
+ let row_number = row.SZXX.Xlsx.row_number in
+ match
+ State.insert_row ~mapper:default_mapper mapping db
+ fully_available_row
+ { processed with row_number }
+ with
+ | Ok _ -> ()
+ | Error e ->
+ ImportErrors.output_error log_error e;
+ ());
+ ignore @@ State.clear ~log_error db mapping conf;
+ ignore @@ Db.commit db;
+ (* Finalize the statements created during the import *)
+ let () =
+ Option.iter (fun v -> ignore @@ Db.finalize v) processed.insert_stmt;
+ Option.iter (fun v -> ignore @@ Db.finalize v) processed.check_key_stmt
+ in
+ let _ =
+ Option.iter
+ (fun headers ->
+ let res = SZXX.Xlsx.unwrap_status extractors sst headers in
+ let values = Array.mapi res.data ~f:(fun i value -> (i, value)) in
+ ignore
+ @@ Db.insert_header db
+ (ImportAnalyser.Dependency.table mapping)
+ values)
+ processed.header
+ in
+ let header =
+ Option.map
+ (fun header ->
+ let res = SZXX.Xlsx.unwrap_status extractors sst header in
+ res.data)
+ processed.header
+ in
+ (* Finalize the process *)
+ let* () = success in
+ Lwt.return header)
+val importInDatable :
+ log_error:ImportErrors.t ->
+ conf:ImportConf.Syntax.t ->
+ dirname:string ->
+ ImportAnalyser.Dependency.t ->
+ _ ImportSQL.Db.t ->
+ ImportCSV.DataType.t array option Lwt.t
+(** Load an excel spreadsheet in an SQLite database.
+Return the header if at least one row where present *)