2014-10-03 88 views
0

我有一个由Ruby脚本解析的大型数据集。该脚本创建一个CSV,然后将其上传到Redshift数据库。日志中的大部分行成功上传,但由于“找到多余的列”,导致很多上传失败。我已经隔离了发生这种情况的情况。将数据解析为CSV格式时出错:“将csv加载到数据库时发现额外列出错”

日志数据是这样的:

2014-09-22 13:02:16-0400,238 {"Items":[{"PubEndDate":"2002/04/09","ItmId":"1280429264","SourceType":"Government & Official Publications","ReasonCode":"","MyResearchUser":"","ProjectCode":"","PublicationCode":"","PubStartDate":"2002/04/09","ItmFrmt":"KWIC","Subrole":"KWIC","PaymentType":"PrePaid","UsageInfo":"P-1008361-158946-STAFF-null-2195091","Role":"KWIC","RetailPrice":1.19,"EffectivePrice":0,"ParentItemId":"396489"},{"PubEndDate":"2012/04/05","ItmId":"1139461559","SourceType":"Government & Official Publications","ReasonCode":"","MyResearchUser":"","ProjectCode":"","PublicationCode":"","PubStartDate":"2012/04/05","ItmFrmt":"KWIC","Subrole":"KWIC","PaymentType":"PrePaid","UsageInfo":"P-1008365-158946-STAFF-null-2195099","Role":"KWIC","RetailPrice":0.75,"EffectivePrice":0,"ParentItemId":"396490"}]} 

然后我通过一个Ruby脚本,看起来像这样创建一个CSV(原谅大的代码块,这是一个漫长的脚本):

require 'json' 

# add methods to unnest ruby hashes for converting nested json into an array with reasonable values 
class Hash 
    def unnest 
    new_hash = {} 
    each do |key,val| 
     if val.is_a?(Hash) 
     new_hash.merge!(val.prefix_keys("#{key}-")) 
     else 
     new_hash[key] = val 
     end 
    end 
    new_hash 
    end 

    def prefix_keys(prefix) 
    Hash[map{|key,val| [prefix + key, val]}].unnest 
    end 
end 

def parse(usage) 

    usage = usage.gsub(/|/,'').gsub(/\n/, '') 
    #Array of all possible keys, make sure all fields in db are filled regardless of how many params are passed into the usage log 
    keys = ["UserAgent","IP","AppId","SessId","JSessionId","LangCd","UsageType","BreadCrumb","AuthType","UsageGroupId","SearchType","ResponseTime","EventType","LandedFirstPage","ReferringUrl","PubEndDate","ItmId","PubStartDate","ItmFrmt","OpenUrlRefId","OpenAccess","LinkSource","SourceType","Subrole","PremId","PaymentType","ObjectType","OrigSite","UsageInfo","Role","DeliveryMethod","ParentItemId","SearchAllProductsFlag","MarketSegment","SearchCount","SearchEngine","QryString","SubjectKey","SearchId","SearchHits","UserInfo-IP","UserInfo-AppId","UserInfo-SessId","UserInfo-UsageGroupId","SearchProductInfo","TurnAwayFlag","LinkOutTarget","LinkOutType","TranslationTime","TextSize","TextType","SourceLang","DestinationLang","ReasonCode","RetailPrice","EffectivePrice","MyResearchUser","ProjectCode","DocID","ListingType","MasterID","TerminatedSessionID","PublicationId","PublicationTitle","ItemTitle","AccessAgreementStatus"] 

    items_keys = ["ReferringUrl","PubEndDate","ItmId","SourceType","PubStartDate","PublicationCode","ItmFrmt","PaymentType","ObjectType","OrigSite","UsageInfo","OpenUrlRefId","TurnAwayFlag","OpenAccess","ParentItemId","SearchId","SearchProductInfo","EventName","HistoryId","AlertId","ReasonCode","Origin","MyResearchUser","ProjectCode","Subrole","NumberOfCopies","Role","RetailPrice","EffectivePrice","Multiplier","PublicationId","PublicationTitle","ItemTitle",] 
    # extract date and time from json, then parse json to ruby hash 

    date = usage.scan(/\d{4}-\d\d-\d\d/).first 
    time = usage.scan(/\d\d:\d\d:\d\d/).first 
    json = usage.scan(/\{.*\}/).first 
    parsed = JSON.parse(json).unnest 

    # return array of values, substituting 'Not Listed' for all missing attributes 
    result = [] 
    items_result = [] 
    result = (0...keys.length).map{ |i| parsed[keys[i]] || 'NA'} 
    result.unshift date 
    result.unshift time 
    result.push "save_space"#usage 
    items = JSON.parse(json) 

    temp_result = result 

    CSV.open("testing.csv", "a+", {:col_sep => "|"}) do |csv| 
     begin 
     items["Items"].each do |item| 
      item_result = (0...items_keys.length).map{ |i| item[items_keys[i]] || "NA" } 
      temp_result = (temp_result << item_result).flatten! 
    csv << temp_result 
    temp_result = result.flatten 
     item_result = [] 
     end 
     rescue 
      item_result = (0...items_keys.length).map{ |i| "NA" } 
    temp_result = (temp_result << item_result).flatten! 
    csv << temp_result 
    temp_result = result.flatten 
     item_result = [] 
     end 
    end 
    nil 
end 

File.readlines("file.log").each do |line| 
    parse(line) 
end 
`ruby upload_csv_to_redshift.rb usage_logs_testing` 

此脚本创建一个CSV,看起来像这样:

13:02:16|2014-09-22|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|save_space|NA|2002/04/09|1280429264|Government & Official Publications|2002/04/09|""|KWIC|PrePaid|NA|NA|P-1008361-158946-STAFF-null-2195091|NA|NA|NA|396489|NA|NA|NA|NA|NA|""|NA|""|""|KWIC|NA|KWIC|1.19|0|NA|NA|NA|NA 
13:02:16|2014-09-22|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|NA|save_space|NA|2002/04/09|1280429264|Government & Official Publications|2002/04/09|""|KWIC|PrePaid|NA|NA|P-1008361-158946-STAFF-null-2195091|NA|NA|NA|396489|NA|NA|NA|NA|NA|""|NA|""|""|KWIC|NA|KWIC|1.19|0|NA|NA|NA|NA|NA|2012/04/05|1139461559|Government & Official Publications|2012/04/05|""|KWIC|PrePaid|NA|NA|P-1008365-158946-STAFF-null-2195099|NA|NA|NA|396490|NA|NA|NA|NA|NA|""|NA|""|""|KWIC|NA|KWIC|0.75|0|NA|NA|NA|NA 

这是上传到结构化的数据库红移像这样:

CREATE TABLE usage_logs_test 
(
log_id bigint IDENTITY (0,1), 
log_time varchar(200), 
log_date varchar(200), 
UserAgent varchar(max), 
IP varchar(max), 
AppId varchar(max), 
SessId varchar(max), 
JSessionId varchar(max), 
LangCd varchar(max), 
UsageType varchar(max), 
BreadCrumb varchar(max), 
AuthType varchar(max), 
UsageGroupId varchar(max), 
SearchType varchar(max), 
ResponseTime varchar(max), 
EventType varchar(max), 
LandedFirstPage varchar(max), 
ReferringUrl varchar(max), 
PubEndDate varchar(max), 
ItmId varchar(max), 
PubStartDate varchar(max), 
ItmFrmt varchar(max), 
OpenUrlRefId varchar(max), 
OpenAccess varchar(max), 
LinkSource varchar(max), 
SourceType varchar(max), 
Subrole varchar(max), 
PremId varchar(max), 
PaymentType varchar(max), 
ObjectType varchar(max), 
OrigSite varchar(max), 
UsageInfo varchar(max), 
Role varchar(max), 
DeliveryMethod varchar(max), 
ParentItemId varchar(max), 
SearchAllProductsFlag varchar(max), 
MarketSegment varchar(max), 
SearchCount varchar(max), 
SearchEngine varchar(max), 
QryString varchar(max), 
SubjectKey varchar(max), 
SearchId varchar(max), 
SearchHits varchar(max), 
UserInfo_IP varchar(max), 
UserInfo_AppId varchar(max), 
UserInfo_SessId varchar(max), 
UserInfo_UsageGroupId varchar(max), 
SearchProductInfo varchar(max), 
TurnAwayFlag varchar(max), 
LinkOutTarget varchar(max), 
LinkOutType varchar(max), 
TranslationTime varchar(max), 
TextSize varchar(max), 
TextType varchar(max), 
SourceLang varchar(max), 
DestinationLang varchar(max), 
ReasonCode varchar(max), 
RetailPrice varchar(max), 
EffectivePrice varchar(max), 
MyResearchUser varchar(max), 
ProjectCode varchar(max), 
DocID varchar(max), 
ListingType varchar(max), 
MasterID varchar(max), 
TerminatedSessionID varchar(max), 
PublicationId varchar(max), 
PublicationTitle varchar(max), 
ItemTitle varchar(max), 
AccessAgreementStatus varchar(max), 
full_log varchar(max) 


ReferringUrl varchar(max), 
PubEndDate varchar(max), 
ItmId varchar(max), 
SourceType varchar(max), 
PubStartDate varchar(max), 
PublicationCode varchar(max), 
ItmFrmt varchar(max), 
PaymentType varchar(max), 
ObjectType varchar(max), 
OrigSite varchar(max), 
UsageInfo varchar(max), 
OpenUrlRefId varchar(max), 
TurnAwayFlag varchar(max), 
OpenAccess varchar(max), 
ParentItemId varchar(max), 
SearchId varchar(max), 
SearchProductInfo varchar(max), 
EventName varchar(max), 
HistoryId varchar(max), 
AlertId varchar(max), 
ReasonCode varchar(max), 
Origin varchar(max), 
MyResearchUser varchar(max), 
ProjectCode varchar(max), 
Subrole varchar(max), 
NumberOfCopies varchar(max), 
Role varchar(max), 
RetailPrice varchar(max), 
EffectivePrice varchar(max), 
Multiplier varchar(max), 
PublicationId varchar(max), 
PublicationTitle varchar(max), 
ItemTitle varchar(max), 
OrigId varchar(200) 
); 

这个问题似乎是一个很大的数据被复制,就好像我叫temp_result阵列不是在items["Items"].each块结束结算回result值。

我意识到这是一个非常大的问题,但我已经通过巨大的努力,以简化和概括它尽可能同时保持工作的代码示例了。

回答

1

你需要记住,数组是通过引用在Ruby中传递的。试试这个:

a = ["YO"] 
b = a 
b << "HEY" 
puts a.inspect 
# => ["YO", "HEY"] 

现在考虑您的脚本这一行:

temp_result = result 

,后来

(temp_result << item_result).flatten! 

temp_result不清除回result原来的价值,因为它们都是指到内存中的同一个数组。结果的原始值已消失,通过使用追加到位方法<<覆盖。

最快的解决方法是:

temp_result = result.clone 

同样,除非你明确知道你为什么这样做它的确切原因,你不希望使用爆炸方法,例如在分配flatten!,你想flatten

+1

一种更好的方式来显示,包含阵列和存储器都指向其分配给另一个变量到同一阵列的变量,是使用'object_id'。 'a = []; b = a'。此时'a.object_id#=> 70098065043200'和'b.object_id#=> 70098065043200'。 (或者其他一些ID,但他们都是一样的。) – 2014-10-03 17:22:25

+2

我不认为这样做更好,因为它依赖于隐性知识。在不知道'#object_id'代表什么的情况下,完全可能进入ruby的职业生涯。如果您的目标是最大限度地增加可能会发现它的用户的读者数量,那么演示将胜过描述。 – SLD 2014-10-03 17:35:08

+1

这个社区是我喜欢编程的重要组成部分。非常感谢你们。 – johncorser 2014-10-03 20:02:48

相关问题