Perl script to load in CSV file from SAS file to MySQL

To run this, you would put (say) “dsi.csv.gz” and “dsi_schema.csv” in the working directory and then call “csv_to_mysql.pl dsi” to run the following:

#!/usr/local/ActivePerl-5.12/bin/perl
use IO::Uncompress::Gunzip qw(gunzip $GunzipError);
use Text::CSV_XS;

# $hours, $minutes, and $seconds represent a time today,
# in the current time zone
use Time::Local;
use Time::localtime;
use warnings;
# Get table and file name from command line
$table_name = $ARGV[0];
print "$table_name\n";
$gz_file = "./$table_name.csv.gz";

use Net::MySQL;
# Set up database connection
my $mysql = Net::MySQL->new(
      # hostname => 'mysql.example.jp',   # Default use UNIX socket
      database => 'crsp',
      user     => 'iangow',
      password => 'test');

# Get table information from the "schema" file
my $schema_csv = Text::CSV_XS->new ({ binary => 1 });
$schema = "$table_name" . "_schema.csv";

my %hrec;

open my $io, "<", $schema or die "$schema: $!";
while (my $row = $schema_csv->getline ($io)) {
  my @fields = @$row;
  my $field = $fields[0];
  $field =~ s/^do$/do_/i;
  my $type = $fields[1];
  $hrec{$field} = $type;
}

# Clean up, etc.
$schema_csv->eof or $schema_csv->error_diag;
close $io or die "$schema: $!";

# Get the first row of the text file
my $csv = Text::CSV_XS->new ({ binary => 1 });
my $fh = new IO::Uncompress::Gunzip $gz_file 
        or die "IO::Uncompress::Gunzip failed: $GunzipError\n";

$row = $csv->getline($fh);

$sql = "CREATE TABLE $table_name (";

# Set some default/initial parameters
$first_field = 1;
$sep="";
$has_date = 0;
$has_gvkey = 0;
$i=0;
# Construct SQL fragment associated with each variable for 
# the table creation statement
foreach $field (@$row) {

  # Flag some key fields for indexing. Probably should use the schema file
  # to indicate what fields should be used for indexing. Not sure if
  # WRDS SAS file contains useful information in this regard.
  if ($field =~ /^PERMNO$/i) { $has_gvkey = 1; }
  if ($field =~ /^DATE$/i) { $has_date = 1;}
  $field =~ s/^do$/do_/i;

  # Dates are stored as integers initially, then converted to 
  # dates later on (see below).
  if ($hrec{$field} eq "date") {
    $type = "integer";
  } else {
    $type = $hrec{$field};
  }

  # Concatenate the component strings. Note that, apart from the first
  # field a leading comma is inserted to separate fields in the 
  # CREATE TABLE SQL statement.
  $sql = $sql . $sep . $field . " " . $type;
  if ($first_field) { $sep=", "; $first_field=0; }
}
$sql = $sql . ");";
print "$sql\n";

# Drop the table if it exists already, then create the new table
$mysql->query("DROP TABLE IF EXISTS $table_name;");
$mysql->query($sql);

$i=0;
$sql ="";
$mysql->query("BEGIN");


# Get text one row at a time, insert data into SQLite table
while ((my $row = $csv->getline ($fh)) && $i < 100000000) {
  $i++;
  $temp = "";
  foreach $val (@$row)  {
    $val =~ s/'/''/g;
    if ($val ne '') {
      $temp = $temp . "'" . $val . "',";
    } else { $temp = $temp . "NULL,"; }
  }

  $temp =~ s/,$//; # Remove trailing comma

  $sql= "INSERT INTO $table_name VALUES($temp);";
  
  # Process SQL every 1000 rows
  $mysql->query($sql);
  
  if ($i % 10000==0) { 
    $mysql->query("COMMIT");
    $mysql->query("BEGIN");
  }


  # Show progress every 100,000 rows
  if ($i % 100000==0) { 
    $time = localtime;
    print "Processing row: $i at @$time[2]:@$time[1]:@$time[0]\n";
  }
}

$mysql->query($sql);
# Clean up, etc.
$csv->eof or $csv->error_diag;
close $fh or die "$gz_file: $!";

# Convert date fields from integers to dates
foreach $key (keys %hrec) { 
  $value = $hrec{$key};
  
  if ($value eq 'date') {
    print "Converting $key from integer to date\n";
    $sql = "ALTER TABLE $table_name CHANGE `$key` `olddate` INTEGER;"; 
    print "$sql\n";
    $mysql->query($sql);
    $sql = "ALTER TABLE $table_name ADD COLUMN `$key` DATE;"; 
    print "$sql\n";
    $mysql->query($sql);
    $sql = "UPDATE $table_name SET `$key`=ADDDATE('1960-01-01', olddate);";
    print "$sql\n";
    $mysql->query($sql);
    $sql = "ALTER TABLE $table_name DROP COLUMN olddate;";
    print "$sql\n";
    $mysql->query($sql);
    printf "Affected row: %d\n", $mysql->get_affected_rows_length;
  } 
}

# Create indices for performance
if ($table_name eq "ccmxpf_linktable") {
  $sql = "ALTER TABLE $table_name ADD INDEX $table_name". "_idx (lpermno);";
  print "$sql\n";
  $mysql->query($sql);
  $sql = "ALTER TABLE $table_name ADD INDEX $table_name". "_idx (gvkey);";
  print "$sql\n";
  $mysql->query($sql);
}

$index=0;
if ($has_date==1 && $has_gvkey==1) {
  $index_on = "(PERMNO, DATE)";
  $index=1;
} elsif ($has_date==1) {
  $index_on = "(DATE)";
  $index=1;
} elsif ($has_gvkey==1) {
  $index_on = "(PERMNO)";
  $index=1
}

if ($index==1) {
  $sql = "ALTER TABLE $table_name ADD INDEX $table_name". "_idx $index_on;";
  print "$sql\n";
  $mysql->query($sql);
}

$mysql->close;
Advertisements
This entry was posted in Uncategorized. Bookmark the permalink.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s