aboutsummaryrefslogtreecommitdiff
path: root/lib/file_handler/csv2sql.ml
blob: 42d84eb6bcd248673804149b4446e8cf861ec8df (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
open StdLabels
module A = ImportAnalyser.Dependency
module CSV = ImportCSV
module C = ImportContainers
module Syntax = ImportConf.Syntax
module Db = ImportSQL.Db

type state = CSV.DataType.t array State.t

let default_mapper :
    (ImportCSV.DataType.t, ImportCSV.DataType.t array) State.mapper =
  { get_row = Fun.id; get_value = Fun.id; default = ImportCSV.DataType.Null }

let extract_values : string -> CSV.DataType.t =
 fun value ->
  (* Test first if the content is empty *)
  if String.equal String.empty value then CSV.DataType.Null
  else
    (* else, try differents conversion in order to see which one works *)
    match int_of_string_opt value with
    | Some i -> CSV.DataType.Integer i
    | None -> (
        match float_of_string_opt value with
        | Some f -> CSV.DataType.Float f
        | None ->
            (* And finaly convert into date *)
            CSV.DataType.Content value)

(** Initialize the state for the first row, count the column number and create
    the table in the database *)
let first_row : A.t -> _ Db.t -> state -> CSV.DataType.t list -> state =
 fun mapping db acc row ->
  (if acc.transaction then
     match Db.commit db with
     | Ok () -> ()
     | Error e -> print_endline (ImportErrors.repr_error e));

  ignore @@ Db.create_table db mapping;
  let row = Array.of_list row in
  match Db.prepare_insert db mapping with
  | Ok stmt ->
      {
        acc with
        header = Some row;
        transaction = false;
        insert_stmt = Some stmt;
        row_number = acc.row_number + 1;
      }
  | _ -> { acc with header = Some row; transaction = false; insert_stmt = None }

let read_csv_line :
    log_error:ImportErrors.t -> A.t -> 'a Db.t -> state -> string list -> state
    =
 fun ~log_error mapping db acc row ->
  let processed_row =
    List.to_seq row |> Seq.map extract_values |> Array.of_seq
  in
  if acc.State.transaction then
    State.run_row ~log_error ~mapper:default_mapper mapping db processed_row acc
  else
    match Db.begin_transaction db with
    | Error e ->
        print_endline (ImportErrors.repr_error e);
        acc
    | Ok () ->
        let acc = { acc with transaction = true } in
        State.run_row ~log_error ~mapper:default_mapper mapping db processed_row
          acc

let importInDatable :
    log_error:ImportErrors.t ->
    conf:Syntax.t ->
    dirname:string ->
    A.t ->
    'a Db.t ->
    CSV.DataType.t array option Lwt.t =
 fun ~log_error ~conf ~dirname mapping db ->
  let file = Filename.concat dirname (A.table mapping).file in

  let channel = Stdlib.open_in_bin file in

  let csv_channel = Csv.of_channel ~separator:';' ~excel_tricks:true channel in

  (* In the headers, we only keep the string.

     This line could generate an error if the headers are not correctly defined.
  *)
  let header =
    List.map ~f:(fun v -> CSV.DataType.Content v) (Csv.next csv_channel)
  in

  let state =
    State.
      {
        transaction = false;
        header = None;
        insert_stmt = None;
        check_key_stmt = None;
        row_number = 1;
        sheet_number = 1;
        delayed = [];
      }
  in
  let state = first_row mapping db state header in

  let state =
    try
      Csv.fold_left csv_channel ~init:state
        ~f:(read_csv_line ~log_error mapping db)
    with
    | Csv.Failure (line, row, cause) as e ->
        Printf.eprintf "Error %s on line %d — field : %s\n" cause line
          (ImportCSV.Csv.column_to_string row);
        raise e
  in
  ignore @@ State.clear ~log_error db mapping conf;
  ignore @@ Db.commit db;

  (* Finalize the statements created during the import  *)
  let () =
    Option.iter (fun v -> ignore @@ Db.finalize v) state.insert_stmt;
    Option.iter (fun v -> ignore @@ Db.finalize v) state.check_key_stmt
  in

  (* Insert all the headers *)
  let _ =
    Option.iter
      (fun headers ->
        let values = Array.mapi headers ~f:(fun i value -> (i, value)) in

        ignore
        @@ Db.insert_header db (ImportAnalyser.Dependency.table mapping) values)
      state.header
  in
  Lwt.return state.header