Session Stream

Making Use of Session Stream Data

once you have access to session stream files, you need an automated way to consume these files reading session stream files after you download the files, you must parse the data into in memory objects you can use the following example python script as a basic starting point example script to parse session stream files #!/usr/bin/env python \# coding utf 8 from argparse import argumentparser import gzip import json import os def main() parser = argumentparser( description="parse monetate session stream files recursively under the specified path ") parser add argument('input path', help="the root directory under which your files live") args = parser parse args() for root, subfolders, files in os walk(args input path) for filename in files with gzip open(os path join(root, filename), "r") as fileobject for line in fileobject session = json loads(line) \# add code to interact with the session data here if name == ' main ' main() now you can transform and load the data into a database, use it to drive application code, join monetate data with your own data for further analysis, and more see session data description in the monetate knowledge base for the detailed structure of the data object using session stream data with a bi platform if your bi platform accepts json files as a data source, you can load session stream files as is you may need to decompress the json files first consult your platform's documentation to determine if it supports json files and to learn how to load data due to the volume of data involved, monetate highly recommends loading session stream data into a database first you can then configure the database as a data source in your bi platform most bi platforms better support connecting to databases rather than loading json files the session json structure contains nested data for this reason you should split the data into several tables if you plan to use a relational database consult your database's documentation, your database administrator, and session data description in the monetate knowledge base for guidance on designing the table structure creating the database schema the following schema is an example of a mysql database without metadata example mysql database without metadata create table `session sessions` ( `session id` varchar(255) not null, `account id` int(11) not null, `browser` varchar(255) not null, `browser version` varchar(255) not null, `city` varchar(255) not null, `country code` varchar(2) not null, `customer id` varchar(255) default null, `customer link` varchar(255) default null, `device type` varchar(255) not null, `end time` bigint(13) not null, `guid` varchar(255) not null, `has cart` tinyint(1) not null, `has new customer` tinyint(1) not null, `has product view` tinyint(1) not null, `has purchase` tinyint(1) not null, `has stealth` tinyint(1) not null, `is bounce` tinyint(1) not null, `is closed` tinyint(1) not null, `os` varchar(255) not null, `os version` varchar(255) not null, `page views` int(11) not null, `page views ss` int(11) not null, `product view count` int(11) not null, `purchase count` int(11) not null, `purchase value ss` decimal(16,4) not null, `region` varchar(255) not null, `screen height` int(11) not null, `screen width` int(11) not null, `session count` int(11) not null, `session value` decimal(16,4) not null, `session value ss` decimal(16,4) not null, `start time` bigint(13) not null, `time on site` int(11) not null, `time on site ss` int(11) not null, primary key (`session id`) ) default charset=utf8; create table `session custom targets` ( `id` int(11) not null auto increment, `session id` varchar(255) not null, `custom target id` bigint(13) not null, `custom target value` varchar(255) not null, primary key (`id`), unique key `id unique` (`id`), key `session id idx` (`session id`), constraint `session id custom targets` foreign key (`session id`) references `session sessions` (`session id`) on delete cascade on update cascade ) default charset=utf8; create table `session offers` ( `id` int(11) not null auto increment, `session id` varchar(255) not null, `offer id` varchar(255) not null, `timestamp` bigint(13) not null, primary key (`id`), unique key `id unique` (`id`), key `session id offers idx` (`session id`), constraint `session id offers` foreign key (`session id`) references `session sessions` (`session id`) on delete cascade on update cascade ) default charset=utf8; create table `session page event ids` ( `id` int(11) not null auto increment, `session id` varchar(255) not null, `event id` int(11) not null, `timestamp` bigint(13) not null, primary key (`id`), unique key `id unique` (`id`), key `session id page event ids idx` (`session id`), constraint `session id page event ids` foreign key (`session id`) references `session sessions` (`session id`) on delete cascade on update cascade ) default charset=utf8; create table `session purchases` ( `id` int(11) not null auto increment, `session id` varchar(255) not null, `purchase id` varchar(255) not null, `purchase value` decimal(16,4) not null, primary key (`id`), unique key `id unique` (`id`), key `session id purchases idx` (`session id`), constraint `session id purchases` foreign key (`session id`) references `session sessions` (`session id`) on delete cascade on update cascade ) default charset=utf8; create table `session cart lines` ( `id` int(11) not null auto increment, `session id` varchar(255) not null, `product id` varchar(255) not null, `sku` varchar(255) not null, `quantity` int(11) not null, `time` datetime not null, primary key (`id`), unique key `id unique` (`id`), key `session id cart lines idx` (`session id`), constraint `session id cart lines` foreign key (`session id`) references `session sessions` (`session id`) on delete cascade on update cascade ) default charset=utf8; create table `session purchase lines` ( `id` int(11) not null auto increment, `session id` varchar(255) not null, `purchase id` varchar(255) not null, `time` datetime not null, `total` decimal(16,4) not null, primary key (`id`), unique key `id unique` (`id`), key `session id purchase lines idx` (`session id`), key `purchase id purchase lines idx` (`purchase id`), constraint `session id purchase lines` foreign key (`session id`) references `session sessions` (`session id`) on delete cascade on update cascade ) default charset=utf8; create table `session purchase line items` ( `id` int(11) not null auto increment, `purchase id` varchar(255) not null, `product id` varchar(255) not null, `sku` varchar(255) not null, `quantity` int(11) not null, `unit price` decimal(16,4) not null, primary key (`id`), unique key `id unique` (`id`), key `purchase id purchase line items idx` (`purchase id`), constraint `purchase id purchase line items` foreign key (`purchase id`) references `session purchase lines` (`purchase id`) on delete cascade on update cascade ) default charset=utf8; create table `session view lines` ( `id` int(11) not null auto increment, `session id` varchar(255) not null, `product id` varchar(255) not null, `time` datetime not null, primary key (`id`), unique key `id unique` (`id`), key `session id view lines idx` (`session id`), constraint `session id view lines` foreign key (`session id`) references `session sessions` (`session id`) on delete cascade on update cascade ) default charset=utf8; loading the data the following example script demonstrates how to load data into the tables created using the schema in the previous section while it's not a requirement for using session stream, you need mysql connector/python to run this example script example code to load monetate session stream data #!/usr/bin/env python from argparse import argumentparser import gzip import json import os import mysql connector def main() parser = argumentparser( description="insert data from monetate session stream files organized under specified path to a prepared mysql database " ) parser add argument("input path", help="the root directory where your files live") parser add argument( " u", " username", required=true, help="database connection username" ) parser add argument( " p", " password", required=true, help="database connection password" ) parser add argument( " d", " database", required=true, help="name of the schema where session stream data is held", ) parser add argument(" a", " address", default="127 0 0 1", help="database address") parser add argument( " v", " verbose", action="count", help="v for filenames, vv for sessions, vvv for everything", ) args = parser parse args() sessions sql = """ insert into session sessions ( session id, account id, browser, browser version, city, country code, customer id, customer link, device type, end time, guid, has cart, has new customer, has product view, has purchase, has stealth, is bounce, is closed, os, os version, page views, page views ss, product view count, purchase count, purchase value ss, region, screen height, screen width, session count, session value, session value ss, start time, time on site, time on site ss ) values (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) """ custom targets sql = """ insert into session custom targets ( session id, custom target id, custom target value ) values (%s, %s, %s) """ offers sql = """ insert into session offers ( session id, offer id, timestamp ) values (%s, %s, %s) """ page event ids sql = """ insert into session page event ids ( session id, event id, timestamp ) values (%s, %s, %s) """ purchases sql = """ insert into session purchases ( session id, purchase id, purchase value ) values (%s, %s, %s) """ cart lines sql = """ insert into session cart lines ( session id, product id, sku, quantity, time ) values (%s, %s, %s, %s, %s) """ view lines sql = """ insert into session view lines ( session id, product id, time ) values (%s, %s, %s) """ purchase lines sql = """ insert into session purchase lines ( session id, product id, time, total ) values (%s, %s, %s, %s) """ purchase line items sql = """ insert into session purchase line items ( purchase id, product id, sku, quantity, unit price ) values (%s, %s, %s, %s, %s) """ conn = mysql connector connect( user=args username, password=args password, host=args address, database=args database, ) for root, subfolders, files in os walk(args input path) for filename in files with gzip open(os path join(root, filename), "r") as infile if args verbose >= 1 print(">>> " + filename) raw data iter = (json loads(line) for line in infile) cursor = conn cursor() for raw datum in raw data iter sessions data = ( raw datum\["session id"], raw datum\["account id"], raw datum\["browser"], raw datum\["browser version"], raw datum\["city"], raw datum\["country code"], raw datum\["customer id"], raw datum\["customer link"], raw datum\["device type"], raw datum\["end time"], raw datum\["guid"], 1 if raw datum\["has cart"] == "t" else 0, 1 if raw datum\["has new customer"] == "t" else 0, 1 if raw datum\["has product view"] == "t" else 0, 1 if raw datum\["has purchase"] == "t" else 0, 1 if raw datum\["has stealth"] == "t" else 0, 1 if raw datum\["is bounce"] == "t" else 0, 1 if raw datum\["is closed"] == "t" else 0, raw datum\["os"], raw datum\["os version"], raw datum\["page views"], raw datum\["page views ss"], raw datum\["product view count"], raw datum\["purchase count"], raw datum\["purchase value ss"], raw datum\["region"], raw datum\["screen height"], raw datum\["screen width"], raw datum\["session count"], raw datum\["session value"], raw datum\["session value ss"], raw datum\["start time"], raw datum\["time on site"], raw datum\["time on site ss"], ) try cursor execute(sessions sql, sessions data) except mysql connector integrityerror as err \# ignore already inserted sessions print(" error {}" format(err)) continue if args verbose >= 2 print(" > sessions {}" format(sessions data)) for custom target id in raw datum\["custom targets"] custom target = ( raw datum\["session id"], custom target id, raw datum\["custom targets"]\[custom target id], ) cursor execute(custom targets sql, custom target) if args verbose >= 3 print(" > custom targets {}" format(custom target)) for offer id in raw datum\["offers"] offer = ( raw datum\["session id"], offer id, raw datum\["offers"]\[offer id], ) cursor execute(offers sql, offer) if args verbose >= 3 print(" > offers {}" format(offer)) for event id in raw datum\["page event ids"] page event id = ( raw datum\["session id"], event id, raw datum\["page event ids"]\[event id], ) cursor execute(page event ids sql, page event id) if args verbose >= 3 print(" > page event ids {}" format(page event id)) for purchase id in raw datum\["purchases"] purchase = ( raw datum\["session id"], purchase id, raw datum\["purchases"]\[purchase id], ) cursor execute(purchases sql, purchase) if args verbose >= 3 print(" > purchases {}" format(purchase)) for cart line in raw datum\["cart lines"] line = ( raw datum\["session id"], cart line\["product id"], cart line\["sku"], cart line\["quantity"], cart line\["time"], ) cursor execute(cart lines sql, line) if args verbose >= 3 print(" > cart lines {}" format(line)) for view line in raw datum\["view lines"] \# todo mguo mysql datetime format? view = ( raw datum\["session id"], view line\["product id"], view line\["time"], ) cursor execute(view lines sql, view) if args verbose >= 3 print(" > view lines {}" format(view)) for purchase id in raw datum\["purchase lines"] line = ( raw datum\["session id"], purchase id, raw datum\["purchase lines"]\[purchase id]\["time"], raw datum\["purchase lines"]\[purchase id]\["total"], ) cursor execute(purchase lines sql, line) if args verbose >= 3 print(" > purchase lines {}" format(line)) for purchase line item in raw datum\["purchase lines"]\[ purchase id ]\["items"] line item = ( purchase id, purchase line item\["product id"], purchase line item\["sku"], purchase line item\["quantity"], purchase line item\["unit price"], ) cursor execute(purchase line items sql, line item) if args verbose >= 3 print(" > purchase line items {}" format(line item)) if args verbose >= 2 print(" end file \n") conn commit() cursor close() conn close() if name == " main " main() if your site receives a large amount of traffic, your dataset may be too large for this type of script to process in a reasonable amount of time in that case, use specialized extract, transform, load (etl) tools for better throughput connecting with a bi platform once you store the data in a database, you can configure it as a data source in your platform consult the platform's documentation and support resources for how to do this the following example demonstrates how to use session stream data for analysis in tableau desktop first, create a new workbook in tableau select the mysql data source, and then fill in the connection details the mysql modal in the tableau desktop application in the database section on the left sidebar of the data source panel, select the database (synonymous with "schema" in mysql) that contains your session stream data tables the database selector expanded in tableau desktop drag the tables from the left sidebar to the main panel the tableau desktop application, with a list of tables in the left sidebar and the 'drag tables here' message in the main panel set up a left join between the main sessions table and the four auxiliary tables containing nested data the tableau desktop application, with multiple tables joined in the main panel build queries and analytics with your data the tableau desktop application, with a map of the us eastern seaboard dotted with a multitude of points representing data you may need to convert some measures to dimensions and vice versa in the data pane in the left sidebar to more accurately characterize the fields