2017-07-28 38 views
0

重复一次,我有一个pyspark脚本像下面。在这个脚本中,我循环了一个input文件的表名并执行代码。分别获取函数的日志每次在Python

现在我想每个功能mysql_spark迭代时间分别收集日志。

例如:

input file

table1 
table2 
table3 

现在,当我执行pyspark脚本我有所有在一个文件中的三个表日志。

What I want is 3 separate log files 1 for each table

Pyspark脚本:

#!/usr/bin/env python 
import sys 
from pyspark import SparkContext, SparkConf 
from pyspark.sql import HiveContext 
conf = SparkConf() 
sc = SparkContext(conf=conf) 
sqlContext = HiveContext(sc) 

#Condition to specify exact number of arguments in the spark-submit command line 
if len(sys.argv) != 5: 
    print "Invalid number of args......" 
    print "Usage: spark-submit import.py Arguments" 
    exit() 
args_file = sys.argv[1] 
hivedb = sys.argv[2] 
mysqldb=sys.argv[3] 
mysqltable=sys.argv[4] 

def mysql_spark(table, hivedb, mysqldb, mysqltable): 

    print "*********************************************************table = {} ***************************".format(table) 

    df = sqlContext.table("{}.{}".format(mysqldb, mysqltable)) 

    df.registerTempTable("mytempTable") 

    sqlContext.sql("create table {}.{} as select * from mytempTable".format(hivedb,table)) 

input = sc.textFile('/user/XXXXXXXX/mysql_spark/%s' %args_file).collect() 

for table in input: 
    mysql_spark(table, hivedb, mysqldb, mysqltable) 

sc.stop() 

Shell脚本调用pyspark脚本文件运行。

#!/bin/bash 

source /home/$USER/mysql_spark/source.sh 
[ $# -ne 1 ] && { echo "Usage : $0 table ";exit 1; } 

args_file=$1 

TIMESTAMP=`date "+%Y-%m-%d"` 
touch /home/$USER/logs/${TIMESTAMP}.success_log 
touch /home/$USER/logs/${TIMESTAMP}.fail_log 
success_logs=/home/$USER/logs/${TIMESTAMP}.success_log 
failed_logs=/home/$USER/logs/${TIMESTAMP}.fail_log 

#Function to get the status of the job creation 
function log_status 
{ 
     status=$1 
     message=$2 
     if [ "$status" -ne 0 ]; then 
       echo "`date +\"%Y-%m-%d %H:%M:%S\"` [ERROR] $message [Status] $status : failed" | tee -a "${failed_logs}" 
       exit 1 
       else 
        echo "`date +\"%Y-%m-%d %H:%M:%S\"` [INFO] $message [Status] $status : success" | tee -a "${success_logs}" 
       fi 
} 

spark-submit --name "${args_file}" --master "yarn-client" /home/$USER/mysql_spark/mysql_spark.py ${args_file} ${hivedb} ${mysqldb} ${mysqltable} 

g_STATUS=$? 
log_status $g_STATUS "Spark job ${args_file} Execution" 

Sample log file:

Connection to spark 
***************************table = table 1 ******************************** 
created dataframe 
created table 
delete temp directory 
***************************table = table 2 ******************************** 
created dataframe 
created table 
delete temp directory 
***************************table = table 3 ******************************** 
created dataframe 
created table 
delete temp directory 

Expected output

table1.logfile

Connection to spark 
***************************table = table 1 ******************************** 
created dataframe 
created table 
delete temp directory 

table2.logfile

***************************table = table 1 ******************************** 
created dataframe 
created table 
delete temp directory 

table3.logfile

***************************table = table 1 ******************************** 
created dataframe 
created table 
delete temp directory 
shutdown sparkContext 

我怎样才能做到这一点?

有没有可能这样做?

回答

1

您可以创建新的文件,并把数据写入,对于每次迭代。

这是一个简单的例子:

lis =['table1','table2'] 

for table in lis: 
    logfile = open(str(table)+".logfile",'w') 
    logfile.write(str(table)) 
    logfile.close() 

在你的代码,如果要实现相同的概念,并将该文件对象mysql_spark功能每次迭代它应该工作。

for table in input: 
    logfile = open(str(table)+".logfile",'w') 
    mysql_spark(table, hivedb, mysqldb, mysqltable, logfile) 
    logfile.close() 
+0

使用你的代码我能够创建日志文件,但文件是空的,没有日志信息。 –

+0

是的,但你有文件对象,你可以检查条件并写下你在该文件对象中所需要的东西。 –

+0

例如: DF = sqlContext.table( “{} {}。” 格式(MySQLdb的,mysqltable)。) 如果(DF): \t logfile.write( “SQL上下文创建。”) 否则: \t logfile.write( “SQL上下文未创建。”) \t 这样的事情。 –