1
2from argparse import ArgumentParser
3import gzip
4import json
5import os
6
7import mysql.connector
8
9
10def main():
11 parser = ArgumentParser(
12 description="Insert data from Monetate Session Stream files organized under specified path to a prepared MySQL database."
13 )
14 parser.add_argument("input_path", help="The root directory where your files live")
15 parser.add_argument(
16 "-u", "--username", required=True, help="Database connection username"
17 )
18 parser.add_argument(
19 "-p", "--password", required=True, help="Database connection password"
20 )
21 parser.add_argument(
22 "-d",
23 "--database",
24 required=True,
25 help="Name of the schema where session stream data is held",
26 )
27 parser.add_argument("-a", "--address", default="127.0.0.1", help="Database address")
28 parser.add_argument(
29 "-v",
30 "--verbose",
31 action="count",
32 help="v for filenames, vv for sessions, vvv for everything",
33 )
34 args = parser.parse_args()
35
36 sessions_sql = """
37 INSERT INTO session_sessions (
38 session_id,
39 account_id,
40 browser,
41 browser_version,
42 city,
43 country_code,
44 customer_id,
45 customer_link,
46 device_type,
47 end_time,
48 guid,
49 has_cart,
50 has_new_customer,
51 has_product_view,
52 has_purchase,
53 has_stealth,
54 is_bounce,
55 is_closed,
56 os,
57 os_version,
58 page_views,
59 page_views_ss,
60 product_view_count,
61 purchase_count,
62 purchase_value_ss,
63 region,
64 screen_height,
65 screen_width,
66 session_count,
67 session_value,
68 session_value_ss,
69 start_time,
70 time_on_site,
71 time_on_site_ss
72 )
73 VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s,
74 %s, %s, %s, %s, %s, %s, %s, %s)
75 """
76 custom_targets_sql = """
77 INSERT INTO session_custom_targets (
78 session_id,
79 custom_target_id,
80 custom_target_value
81 )
82 VALUES (%s, %s, %s)
83 """
84 offers_sql = """
85 INSERT INTO session_offers (
86 session_id,
87 offer_id,
88 timestamp
89 )
90 VALUES (%s, %s, %s)
91 """
92 page_event_ids_sql = """
93 INSERT INTO session_page_event_ids (
94 session_id,
95 event_id,
96 timestamp
97 )
98 VALUES (%s, %s, %s)
99 """
100 purchases_sql = """
101 INSERT INTO session_purchases (
102 session_id,
103 purchase_id,
104 purchase_value
105 )
106 VALUES (%s, %s, %s)
107 """
108 cart_lines_sql = """
109 INSERT INTO session_cart_lines (
110 session_id,
111 product_id,
112 sku,
113 quantity,
114 time
115 )
116 VALUES (%s, %s, %s, %s, %s)
117 """
118 view_lines_sql = """
119 INSERT INTO session_view_lines (
120 session_id,
121 product_id,
122 time
123 )
124 VALUES (%s, %s, %s)
125 """
126 purchase_lines_sql = """
127 INSERT INTO session_purchase_lines (
128 session_id,
129 product_id,
130 time,
131 total
132 )
133 VALUES (%s, %s, %s, %s)
134 """
135 purchase_line_items_sql = """
136 INSERT INTO session_purchase_line_items (
137 purchase_id,
138 product_id,
139 sku,
140 quantity,
141 unit_price
142 )
143 VALUES (%s, %s, %s, %s, %s)
144 """
145
146 conn = mysql.connector.connect(
147 user=args.username,
148 password=args.password,
149 host=args.address,
150 database=args.database,
151 )
152
153 for root, subfolders, files in os.walk(args.input_path):
154 for filename in files:
155 with gzip.open(os.path.join(root, filename), "r") as infile:
156 if args.verbose >= 1:
157 print(">>> " + filename)
158 raw_data_iter = (json.loads(line) for line in infile)
159 cursor = conn.cursor()
160 for raw_datum in raw_data_iter:
161 sessions_data = (
162 raw_datum["session_id"],
163 raw_datum["account_id"],
164 raw_datum["browser"],
165 raw_datum["browser_version"],
166 raw_datum["city"],
167 raw_datum["country_code"],
168 raw_datum["customer_id"],
169 raw_datum["customer_link"],
170 raw_datum["device_type"],
171 raw_datum["end_time"],
172 raw_datum["guid"],
173 1 if raw_datum["has_cart"] == "t" else 0,
174 1 if raw_datum["has_new_customer"] == "t" else 0,
175 1 if raw_datum["has_product_view"] == "t" else 0,
176 1 if raw_datum["has_purchase"] == "t" else 0,
177 1 if raw_datum["has_stealth"] == "t" else 0,
178 1 if raw_datum["is_bounce"] == "t" else 0,
179 1 if raw_datum["is_closed"] == "t" else 0,
180 raw_datum["os"],
181 raw_datum["os_version"],
182 raw_datum["page_views"],
183 raw_datum["page_views_ss"],
184 raw_datum["product_view_count"],
185 raw_datum["purchase_count"],
186 raw_datum["purchase_value_ss"],
187 raw_datum["region"],
188 raw_datum["screen_height"],
189 raw_datum["screen_width"],
190 raw_datum["session_count"],
191 raw_datum["session_value"],
192 raw_datum["session_value_ss"],
193 raw_datum["start_time"],
194 raw_datum["time_on_site"],
195 raw_datum["time_on_site_ss"],
196 )
197
198 try:
199 cursor.execute(sessions_sql, sessions_data)
200 except mysql.connector.IntegrityError as err:
201
202 print("*** Error: {}".format(err))
203 continue
204 if args.verbose >= 2:
205 print("---> sessions: {}".format(sessions_data))
206
207 for custom_target_id in raw_datum["custom_targets"]:
208 custom_target = (
209 raw_datum["session_id"],
210 custom_target_id,
211 raw_datum["custom_targets"][custom_target_id],
212 )
213 cursor.execute(custom_targets_sql, custom_target)
214 if args.verbose >= 3:
215 print("-> custom_targets: {}".format(custom_target))
216
217 for offer_id in raw_datum["offers"]:
218 offer = (
219 raw_datum["session_id"],
220 offer_id,
221 raw_datum["offers"][offer_id],
222 )
223 cursor.execute(offers_sql, offer)
224 if args.verbose >= 3:
225 print("-> offers: {}".format(offer))
226
227 for event_id in raw_datum["page_event_ids"]:
228 page_event_id = (
229 raw_datum["session_id"],
230 event_id,
231 raw_datum["page_event_ids"][event_id],
232 )
233 cursor.execute(page_event_ids_sql, page_event_id)
234 if args.verbose >= 3:
235 print("-> page_event_ids: {}".format(page_event_id))
236
237 for purchase_id in raw_datum["purchases"]:
238 purchase = (
239 raw_datum["session_id"],
240 purchase_id,
241 raw_datum["purchases"][purchase_id],
242 )
243 cursor.execute(purchases_sql, purchase)
244 if args.verbose >= 3:
245 print("-> purchases: {}".format(purchase))
246
247 for cart_line in raw_datum["cart_lines"]:
248 line = (
249 raw_datum["session_id"],
250 cart_line["product_id"],
251 cart_line["sku"],
252 cart_line["quantity"],
253 cart_line["time"],
254 )
255 cursor.execute(cart_lines_sql, line)
256 if args.verbose >= 3:
257 print("-> cart_lines: {}".format(line))
258
259 for view_line in raw_datum["view_lines"]:
260
261 view = (
262 raw_datum["session_id"],
263 view_line["product_id"],
264 view_line["time"],
265 )
266 cursor.execute(view_lines_sql, view)
267 if args.verbose >= 3:
268 print("-> view_lines: {}".format(view))
269
270 for purchase_id in raw_datum["purchase_lines"]:
271 line = (
272 raw_datum["session_id"],
273 purchase_id,
274 raw_datum["purchase_lines"][purchase_id]["time"],
275 raw_datum["purchase_lines"][purchase_id]["total"],
276 )
277 cursor.execute(purchase_lines_sql, line)
278 if args.verbose >= 3:
279 print("-> purchase_lines: {}".format(line))
280
281 for purchase_line_item in raw_datum["purchase_lines"][
282 purchase_id
283 ]["items"]:
284 line_item = (
285 purchase_id,
286 purchase_line_item["product_id"],
287 purchase_line_item["sku"],
288 purchase_line_item["quantity"],
289 purchase_line_item["unit_price"],
290 )
291 cursor.execute(purchase_line_items_sql, line_item)
292 if args.verbose >= 3:
293 print("-> purchase_line_items: {}".format(line_item))
294
295 if args.verbose >= 2:
296 print("------END FILE------\n")
297 conn.commit()
298 cursor.close()
299
300 conn.close()
301
302
303if __name__ == "__main__":
304 main()
305