aboutsummaryrefslogtreecommitdiff
path: root/lib/file_handler/xlsx2sql.ml
blob: f2d8f12badf5a47b280fa5e25a662255b04eeff9 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
open StdLabels
module A = ImportAnalyser.Dependency
module C = ImportContainers
module CSV = ImportCSV
module Syntax = ImportConf.Syntax
module Db = ImportSQL.Db

let flags = Unix.[ O_RDONLY; O_NONBLOCK ]

let extractors =
  SZXX.Xlsx.
    {
      string = (fun _location s -> CSV.DataType.Content s);
      error =
        (fun _location s -> CSV.DataType.Error (Printf.sprintf "#ERROR# %s" s));
      boolean =
        (fun _location s ->
          let value = String.(equal s "1") in
          CSV.DataType.Content (string_of_bool value));
      number =
        (fun _location s ->
          let f = Float.of_string s in
          if Float.is_integer f then CSV.DataType.Integer (Float.to_int f)
          else CSV.DataType.Float f);
      date = (fun _location s -> CSV.DataType.Content s);
      null = CSV.DataType.Null;
      formula =
        (fun _location ~formula s ->
          ignore formula;
          CSV.DataType.Content s);
    }

let feed_bigstring ic =
  let open Lwt.Infix in
  let len = Lwt_io.buffer_size ic in
  let buf = Lwt_bytes.create len in
  SZXX.Zip.Bigstring
    (fun () ->
      Lwt_io.read_into_bigstring ic buf 0 len >|= function
      | 0 -> None
      | len -> Some SZXX.Zip.{ buf; pos = 0; len })

(* Evaluate if the row can be processed right now (does not contain
   any delayed value) *)
let is_delayed row =
  Array.exists row.SZXX.Xlsx.data ~f:(function
    | SZXX.Xlsx.Delayed _ -> true
    | _ -> false)

let default_mapper :
    (ImportCSV.DataType.t, ImportCSV.DataType.t SZXX.Xlsx.row) State.mapper =
  {
    get_value =
      (function
      | ImportCSV.DataType.Content s ->
          ImportCSV.DataType.Content (SZXX.Xml.unescape s)
      | any -> any);
    default = ImportCSV.DataType.Null;
    get_row = (fun v -> v.SZXX.Xlsx.data);
  }

type state = CSV.DataType.t SZXX.Xlsx.status SZXX.Xlsx.row State.t

let delayed_mapper =
  State.
    {
      get_value =
        (function
        | SZXX.Xlsx.Available (CSV.DataType.Content s) ->
            CSV.DataType.Content (SZXX.Xml.unescape s)
        | SZXX.Xlsx.Available value -> value
        | _ -> CSV.DataType.Null);
      default = SZXX.Xlsx.Available CSV.DataType.Null;
      get_row = (fun v -> v.SZXX.Xlsx.data);
    }

(** Initialize the state for the first row, count the column number and create
    the table in the database *)
let first_row : A.t -> _ Db.t -> state -> 'a SZXX.Xlsx.row -> state =
 fun mapping db acc row ->
  (if acc.transaction then
     match Db.commit db with
     | Ok () -> ()
     | Error e -> print_endline (ImportErrors.repr_error e));

  ignore @@ Db.create_table db mapping;
  match Db.prepare_insert db mapping with
  | Ok stmt ->
      {
        acc with
        header = Some row;
        transaction = false;
        insert_stmt = Some stmt;
      }
  | _ -> { acc with header = Some row; transaction = false; insert_stmt = None }

let importInDatable :
    log_error:Csv.out_channel Lazy.t ->
    conf:Syntax.t ->
    dirname:string ->
    A.t ->
    'a Db.t ->
    CSV.DataType.t array option Lwt.t =
 fun ~log_error ~conf ~dirname mapping db ->
  let file = Filename.concat dirname (A.table mapping).file in

  Lwt_io.with_file ~flags ~mode:Input file (fun ic ->
      let open Lwt.Syntax in
      let stream, sst_p, success =
        SZXX.Xlsx.stream_rows ~only_sheet:(A.table mapping).tab
          ~feed:(feed_bigstring ic) extractors
      in
      let* processed =
        Lwt_stream.fold
          (fun row acc ->
            (* Create the table on the first line *)
            if Int.equal 1 row.SZXX.Xlsx.row_number then
              first_row mapping db acc row
            else
              match is_delayed row with
              | true -> { acc with delayed = row :: acc.delayed }
              | false -> (
                  let row_number = row.SZXX.Xlsx.row_number in
                  if acc.transaction then
                    State.run_row ~log_error ~mapper:delayed_mapper mapping db
                      row { acc with row_number }
                  else
                    match Db.begin_transaction db with
                    | Error e ->
                        print_endline (ImportErrors.repr_error e);
                        acc
                    | Ok () ->
                        let acc = { acc with transaction = true; row_number } in
                        State.run_row ~log_error ~mapper:delayed_mapper mapping
                          db row acc))
          stream
          {
            transaction = false;
            header = None;
            delayed = [];
            insert_stmt = None;
            check_key_stmt = None;
            row_number = 1;
            sheet_number = (A.table mapping).tab;
          }
      in
      (* Wait to reach the sst *)
      let* sst = sst_p in

      if processed.transaction then ignore (Db.commit db);

      (* Insert the missing elements *)
      ignore @@ Db.begin_transaction db;
      List.iter processed.delayed ~f:(fun row ->
          let fully_available_row =
            SZXX.Xlsx.unwrap_status extractors sst row
          in

          let row_number = row.SZXX.Xlsx.row_number in

          match
            State.insert_row ~mapper:default_mapper mapping db
              fully_available_row
              { processed with row_number }
          with
          | Ok _ -> ()
          | Error e ->
              ImportErrors.output_error log_error e;
              ());

      ignore @@ State.clear ~log_error db mapping conf;
      ignore @@ Db.commit db;

      (* Finalize the statements created during the import  *)
      let () =
        Option.iter (fun v -> ignore @@ Db.finalize v) processed.insert_stmt;
        Option.iter (fun v -> ignore @@ Db.finalize v) processed.check_key_stmt
      in

      let _ =
        Option.iter
          (fun headers ->
            let res = SZXX.Xlsx.unwrap_status extractors sst headers in

            let values = Array.mapi res.data ~f:(fun i value -> (i, value)) in

            ignore
            @@ Db.insert_header db
                 (ImportAnalyser.Dependency.table mapping)
                 values)
          processed.header
      in

      let header =
        Option.map
          (fun header ->
            let res = SZXX.Xlsx.unwrap_status extractors sst header in
            res.data)
          processed.header
      in

      (* Finalize the process *)
      let* () = success in

      Lwt.return header)