2017-07-26 69 views
1

我有一个pyspark脚本,如下所示。在此我将表名从文件传递给此脚本。该脚本正在执行成功,我没有问题的脚本。使用shell脚本调用python脚本时单独获取日志

现在我想为每个表单独收集这个脚本的日志。可能吗?

Pyspark脚本:

#!/usr/bin/env python 
import sys 
from pyspark import SparkContext, SparkConf 
from pyspark.sql import HiveContext 
conf = SparkConf() 
sc = SparkContext(conf=conf) 
sqlContext = HiveContext(sc) 

#Condition to specify exact number of arguments in the spark-submit command line 
if len(sys.argv) != 8: 
     print "Invalid number of args......" 
     print "Usage: spark-submit import.py Arguments" 
     exit() 
args_file = sys.argv[1] 
hivedb = sys.argv[2] 
domain = sys.argv[3] 
port=sys.argv[4] 
mysqldb=sys.argv[5] 
username=sys.argv[6] 
password=sys.argv[7] 

def mysql_spark(table, hivedb, domain, port, mysqldb, username, password): 

    print "*********************************************************table = {} ***************************".format(table) 

    df = sqlContext.read.format("jdbc").option("url", "{}:{}/{}".format(domain,port,mysqldb)).option("driver", "com.mysql.jdbc.Driver").option("dbtable","{}".format(table)).option("user", "{}".format(username)).option("password", "{}".format(password)).load() 

    df.registerTempTable("mytempTable") 

    sqlContext.sql("create table {}.{} stored as parquet as select * from mytempTable".format(hivedb,table)) 

input = sc.textFile('/user/XXXXXXXX/mysql_spark/%s' %args_file).collect() 

for table in input: 
mysql_spark(table, hivedb, domain, port, mysqldb, username, password) 

sc.stop() 

Shell脚本调用脚本pyspark

#!/bin/bash 

source /home/$USER/mysql_spark/source.sh 
[ $# -ne 1 ] && { echo "Usage : $0 table ";exit 1; } 

args_file=$1 

TIMESTAMP=`date "+%Y-%m-%d"` 
touch /home/$USER/logs/${TIMESTAMP}.success_log 
touch /home/$USER/logs/${TIMESTAMP}.fail_log 
success_logs=/home/$USER/logs/${TIMESTAMP}.success_log 
failed_logs=/home/$USER/logs/${TIMESTAMP}.fail_log 

#Function to get the status of the job creation 
function log_status 
{ 
     status=$1 
     message=$2 
     if [ "$status" -ne 0 ]; then 
       echo "`date +\"%Y-%m-%d %H:%M:%S\"` [ERROR] $message [Status] $status : failed" | tee -a "${failed_logs}" 
       #echo "Please find the attached log file for more details" 
       exit 1 
       else 
        echo "`date +\"%Y-%m-%d %H:%M:%S\"` [INFO] $message [Status] $status : success" | tee -a "${success_logs}" 
       fi 
} 

spark-submit --name "${args_file}" --master "yarn-client" /home/$USER/mysql_spark/mysql_spark.py ${args_file} ${hivedb} ${domain} ${port} ${mysqldb} ${username} ${password} 


g_STATUS=$? 
log_status $g_STATUS "Spark job ${args_file} Execution" 

Questions

I want to get the logs of each table separately as separate files rather than all the tables in a single file. 

If possible the `status` messages of each table separately rather than getting single status message of file  

Log file

*********************************************************table = table_1 *************************** 
17/07/26 12:47:36 INFO parquet.ParquetRelation: Listing hdfs://localhost/user/hive/warehouse/testing.db/table_1 on driver 
17/07/26 12:47:36 INFO spark.SparkContext: Starting job: sql at NativeMethodAccessorImpl.java:-2 
17/07/26 12:47:36 INFO scheduler.DAGScheduler: Got job 4 (sql at NativeMethodAccessorImpl.java:-2) with 2 output partitions 
17/07/26 12:47:36 INFO scheduler.DAGScheduler: Final stage: ResultStage 4 (sql at NativeMethodAccessorImpl.java:-2) 
17/07/26 12:47:36 INFO scheduler.DAGScheduler: Parents of final stage: List() 
17/07/26 12:47:36 INFO scheduler.DAGScheduler: Missing parents: List() 
17/07/26 12:47:36 INFO scheduler.DAGScheduler: Submitting ResultStage 4 (MapPartitionsRDD[12] at sql at NativeMethodAccessorImpl.java:-2), which has no missing parents 
17/07/26 12:47:36 INFO storage.MemoryStore: Block broadcast_5 stored as values in memory (estimated size 71.6 KB, free 602.7 KB) 
17/07/26 12:47:36 INFO storage.MemoryStore: Block broadcast_5_piece0 stored as bytes in memory (estimated size 25.0 KB, free 627.7 KB) 
17/07/26 12:47:36 INFO storage.BlockManagerInfo: Added broadcast_5_piece0 in memory on xxxxxxxxxxxxx:9612 (size: 25.0 KB, free: 530.2 MB) 
17/07/26 12:47:36 INFO spark.SparkContext: Created broadcast 5 from broadcast at DAGScheduler.scala:1006 
17/07/26 12:47:36 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from ResultStage 4 (MapPartitionsRDD[12] at sql at NativeMethodAccessorImpl.java:-2) 
17/07/26 12:47:36 INFO cluster.YarnScheduler: Adding task set 4.0 with 2 tasks 
17/07/26 12:47:36 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 4.0 (TID 7, localhost, partition 0,PROCESS_LOCAL, 1975 bytes) 
17/07/26 12:47:36 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 4.0 (TID 8, localhost, partition 1,PROCESS_LOCAL, 1975 bytes) 
17/07/26 12:47:36 INFO storage.BlockManagerInfo: Added broadcast_5_piece0 in memory on localhost:63339 (size: 25.0 KB, free: 3.1 GB) 
17/07/26 12:47:36 INFO storage.BlockManagerInfo: Added broadcast_5_piece0 in memory on localhost:59298 (size: 25.0 KB, free: 3.1 GB) 
17/07/26 12:47:37 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 4.0 (TID 8) in 121 ms on localhost (1/2) 
17/07/26 12:47:37 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 4.0 (TID 7) in 133 ms on localhost (2/2) 
17/07/26 12:47:37 INFO scheduler.DAGScheduler: ResultStage 4 (sql at NativeMethodAccessorImpl.java:-2) finished in 0.133 s 
17/07/26 12:47:37 INFO cluster.YarnScheduler: Removed TaskSet 4.0, whose tasks have all completed, from pool 
17/07/26 12:47:37 INFO scheduler.DAGScheduler: Job 4 finished: sql at NativeMethodAccessorImpl.java:-2, took 0.160750 s 
17/07/26 12:47:37 INFO parquet.ParquetRelation: Using default output committer for Parquet: parquet.hadoop.ParquetOutputCommitter 
17/07/26 12:47:37 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 1 
17/07/26 12:47:37 INFO datasources.DefaultWriterContainer: Using user defined output committer class parquet.hadoop.ParquetOutputCommitter 
17/07/26 12:47:37 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 1 
17/07/26 12:47:37 INFO spark.SparkContext: Starting job: sql at NativeMethodAccessorImpl.java:-2 
17/07/26 12:47:37 INFO scheduler.DAGScheduler: Got job 5 (sql at NativeMethodAccessorImpl.java:-2) with 1 output partitions 
17/07/26 12:47:37 INFO scheduler.DAGScheduler: Final stage: ResultStage 5 (sql at NativeMethodAccessorImpl.java:-2) 
17/07/26 12:47:37 INFO scheduler.DAGScheduler: Parents of final stage: List() 
17/07/26 12:47:37 INFO scheduler.DAGScheduler: Missing parents: List() 
17/07/26 12:47:37 INFO scheduler.DAGScheduler: Submitting ResultStage 5 (MapPartitionsRDD[15] at sql at NativeMethodAccessorImpl.java:-2), which has no missing parents 
17/07/26 12:47:37 INFO storage.MemoryStore: Block broadcast_6 stored as values in memory (estimated size 84.4 KB, free 712.0 KB) 
17/07/26 12:47:37 INFO storage.MemoryStore: Block broadcast_6_piece0 stored as bytes in memory (estimated size 30.9 KB, free 742.9 KB) 
17/07/26 12:47:37 INFO storage.BlockManagerInfo: Added broadcast_6_piece0 in memory on xxxxxxxxxxxxx:9612 (size: 30.9 KB, free: 530.1 MB) 
17/07/26 12:47:37 INFO spark.SparkContext: Created broadcast 6 from broadcast at DAGScheduler.scala:1006 
17/07/26 12:47:37 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ResultStage 5 (MapPartitionsRDD[15] at sql at NativeMethodAccessorImpl.java:-2) 
17/07/26 12:47:37 INFO cluster.YarnScheduler: Adding task set 5.0 with 1 tasks 
17/07/26 12:47:37 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 5.0 (TID 9, localhost, partition 0,PROCESS_LOCAL, 1922 bytes) 
17/07/26 12:47:37 INFO storage.BlockManagerInfo: Added broadcast_6_piece0 in memory on localhost:63339 (size: 30.9 KB, free: 3.1 GB) 
17/07/26 12:47:39 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 5.0 (TID 9) in 2270 ms on localhost (1/1) 
17/07/26 12:47:39 INFO cluster.YarnScheduler: Removed TaskSet 5.0, whose tasks have all completed, from pool 
17/07/26 12:47:39 INFO scheduler.DAGScheduler: ResultStage 5 (sql at NativeMethodAccessorImpl.java:-2) finished in 2.270 s 
17/07/26 12:47:39 INFO scheduler.DAGScheduler: Job 5 finished: sql at NativeMethodAccessorImpl.java:-2, took 2.302009 s 
17/07/26 12:47:39 INFO datasources.DefaultWriterContainer: Job job_201707261247_0000 committed. 
17/07/26 12:47:39 INFO parquet.ParquetRelation: Listing hdfs://localhost/user/hive/warehouse/testing.db/table_1 on driver 
17/07/26 12:47:39 INFO spark.SparkContext: Starting job: sql at NativeMethodAccessorImpl.java:-2 
17/07/26 12:47:39 INFO scheduler.DAGScheduler: Got job 6 (sql at NativeMethodAccessorImpl.java:-2) with 2 output partitions 
17/07/26 12:47:39 INFO scheduler.DAGScheduler: Final stage: ResultStage 6 (sql at NativeMethodAccessorImpl.java:-2) 
17/07/26 12:47:39 INFO scheduler.DAGScheduler: Parents of final stage: List() 
17/07/26 12:47:39 INFO scheduler.DAGScheduler: Missing parents: List() 
17/07/26 12:47:39 INFO scheduler.DAGScheduler: Submitting ResultStage 6 (MapPartitionsRDD[17] at sql at NativeMethodAccessorImpl.java:-2), which has no missing parents 
17/07/26 12:47:39 INFO storage.MemoryStore: Block broadcast_7 stored as values in memory (estimated size 71.6 KB, free 814.5 KB) 
17/07/26 12:47:39 INFO storage.MemoryStore: Block broadcast_7_piece0 stored as bytes in memory (estimated size 25.0 KB, free 839.5 KB) 
17/07/26 12:47:39 INFO storage.BlockManagerInfo: Added broadcast_7_piece0 in memory on xxxxxxxxxxxxx:9612 (size: 25.0 KB, free: 530.1 MB) 
17/07/26 12:47:39 INFO spark.SparkContext: Created broadcast 7 from broadcast at DAGScheduler.scala:1006 
17/07/26 12:47:39 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from ResultStage 6 (MapPartitionsRDD[17] at sql at NativeMethodAccessorImpl.java:-2) 
17/07/26 12:47:39 INFO cluster.YarnScheduler: Adding task set 6.0 with 2 tasks 
17/07/26 12:47:39 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 6.0 (TID 10, localhost, partition 0,PROCESS_LOCAL, 1975 bytes) 
17/07/26 12:47:39 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 6.0 (TID 11, localhost, partition 1,PROCESS_LOCAL, 2101 bytes) 
17/07/26 12:47:39 INFO storage.BlockManagerInfo: Added broadcast_7_piece0 in memory on localhost:63339 (size: 25.0 KB, free: 3.1 GB) 
17/07/26 12:47:39 INFO storage.BlockManagerInfo: Added broadcast_7_piece0 in memory on localhost:59298 (size: 25.0 KB, free: 3.1 GB) 
17/07/26 12:47:39 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 6.0 (TID 10) in 142 ms on localhost (1/2) 
17/07/26 12:47:39 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 6.0 (TID 11) in 180 ms on localhost (2/2) 
17/07/26 12:47:39 INFO cluster.YarnScheduler: Removed TaskSet 6.0, whose tasks have all completed, from pool 
17/07/26 12:47:39 INFO scheduler.DAGScheduler: ResultStage 6 (sql at NativeMethodAccessorImpl.java:-2) finished in 0.195 s 
17/07/26 12:47:39 INFO scheduler.DAGScheduler: Job 6 finished: sql at NativeMethodAccessorImpl.java:-2, took 0.219934 s 
*********************************************************table = table_2 *************************** 
17/07/26 12:47:40 INFO parquet.ParquetRelation: Listing hdfs://localhost/user/hive/warehouse/testing.db/table_2 on driver 
17/07/26 12:47:40 INFO spark.SparkContext: Starting job: sql at NativeMethodAccessorImpl.java:-2 
17/07/26 12:47:40 INFO scheduler.DAGScheduler: Got job 7 (sql at NativeMethodAccessorImpl.java:-2) with 2 output partitions 
17/07/26 12:47:40 INFO scheduler.DAGScheduler: Final stage: ResultStage 7 (sql at NativeMethodAccessorImpl.java:-2) 
17/07/26 12:47:40 INFO scheduler.DAGScheduler: Parents of final stage: List() 
17/07/26 12:47:40 INFO scheduler.DAGScheduler: Missing parents: List() 
17/07/26 12:47:40 INFO scheduler.DAGScheduler: Submitting ResultStage 7 (MapPartitionsRDD[21] at sql at NativeMethodAccessorImpl.java:-2), which has no missing parents 
17/07/26 12:47:40 INFO storage.MemoryStore: Block broadcast_8 stored as values in memory (estimated size 71.6 KB, free 911.1 KB) 
17/07/26 12:47:40 INFO storage.MemoryStore: Block broadcast_8_piece0 stored as bytes in memory (estimated size 25.0 KB, free 936.1 KB) 
17/07/26 12:47:40 INFO storage.BlockManagerInfo: Added broadcast_8_piece0 in memory on xxxxxxxxxxxxx:9612 (size: 25.0 KB, free: 530.1 MB) 
17/07/26 12:47:40 INFO spark.SparkContext: Created broadcast 8 from broadcast at DAGScheduler.scala:1006 
17/07/26 12:47:40 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from ResultStage 7 (MapPartitionsRDD[21] at sql at NativeMethodAccessorImpl.java:-2) 
17/07/26 12:47:40 INFO cluster.YarnScheduler: Adding task set 7.0 with 2 tasks 
17/07/26 12:47:40 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 7.0 (TID 12, localhost, partition 0,PROCESS_LOCAL, 1975 bytes) 
17/07/26 12:47:40 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 7.0 (TID 13, localhost, partition 1,PROCESS_LOCAL, 1975 bytes) 
17/07/26 12:47:40 INFO storage.BlockManagerInfo: Added broadcast_8_piece0 in memory on localhost:63339 (size: 25.0 KB, free: 3.1 GB) 
17/07/26 12:47:40 INFO storage.BlockManagerInfo: Added broadcast_8_piece0 in memory on localhost:59298 (size: 25.0 KB, free: 3.1 GB) 
17/07/26 12:47:40 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 7.0 (TID 13) in 69 ms on localhost (1/2) 
17/07/26 12:47:40 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 7.0 (TID 12) in 137 ms on localhost (2/2) 
17/07/26 12:47:40 INFO scheduler.DAGScheduler: ResultStage 7 (sql at NativeMethodAccessorImpl.java:-2) finished in 0.138 s 
17/07/26 12:47:40 INFO cluster.YarnScheduler: Removed TaskSet 7.0, whose tasks have all completed, from pool 
17/07/26 12:47:40 INFO scheduler.DAGScheduler: Job 7 finished: sql at NativeMethodAccessorImpl.java:-2, took 0.157692 s 
17/07/26 12:47:40 INFO parquet.ParquetRelation: Using default output committer for Parquet: parquet.hadoop.ParquetOutputCommitter 
17/07/26 12:47:40 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 1 
17/07/26 12:47:40 INFO datasources.DefaultWriterContainer: Using user defined output committer class parquet.hadoop.ParquetOutputCommitter 
17/07/26 12:47:40 INFO output.FileOutputCommitter: File Output Committer Algorithm version is 1 
17/07/26 12:47:40 INFO spark.SparkContext: Starting job: sql at NativeMethodAccessorImpl.java:-2 
17/07/26 12:47:40 INFO scheduler.DAGScheduler: Got job 8 (sql at NativeMethodAccessorImpl.java:-2) with 1 output partitions 
17/07/26 12:47:40 INFO scheduler.DAGScheduler: Final stage: ResultStage 8 (sql at NativeMethodAccessorImpl.java:-2) 
17/07/26 12:47:40 INFO scheduler.DAGScheduler: Parents of final stage: List() 
17/07/26 12:47:40 INFO scheduler.DAGScheduler: Missing parents: List() 
17/07/26 12:47:40 INFO scheduler.DAGScheduler: Submitting ResultStage 8 (MapPartitionsRDD[24] at sql at NativeMethodAccessorImpl.java:-2), which has no missing parents 
17/07/26 12:47:40 INFO storage.MemoryStore: Block broadcast_9 stored as values in memory (estimated size 84.4 KB, free 1020.4 KB) 
17/07/26 12:47:40 INFO storage.MemoryStore: Block broadcast_9_piece0 stored as bytes in memory (estimated size 30.9 KB, free 1051.3 KB) 
17/07/26 12:47:40 INFO storage.BlockManagerInfo: Added broadcast_9_piece0 in memory on xxxxxxxxxxxxx:9612 (size: 30.9 KB, free: 530.0 MB) 
17/07/26 12:47:40 INFO spark.SparkContext: Created broadcast 9 from broadcast at DAGScheduler.scala:1006 
17/07/26 12:47:40 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ResultStage 8 (MapPartitionsRDD[24] at sql at NativeMethodAccessorImpl.java:-2) 
17/07/26 12:47:40 INFO cluster.YarnScheduler: Adding task set 8.0 with 1 tasks 
17/07/26 12:47:40 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 8.0 (TID 14, localhost, partition 0,PROCESS_LOCAL, 1922 bytes) 
17/07/26 12:47:40 INFO storage.BlockManagerInfo: Added broadcast_9_piece0 in memory on localhost:63339 (size: 30.9 KB, free: 3.1 GB) 
17/07/26 12:47:40 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 8.0 (TID 14) in 194 ms on localhost (1/1) 
17/07/26 12:47:40 INFO cluster.YarnScheduler: Removed TaskSet 8.0, whose tasks have all completed, from pool 
17/07/26 12:47:40 INFO scheduler.DAGScheduler: ResultStage 8 (sql at NativeMethodAccessorImpl.java:-2) finished in 0.195 s 
17/07/26 12:47:40 INFO scheduler.DAGScheduler: Job 8 finished: sql at NativeMethodAccessorImpl.java:-2, took 0.221049 s 
17/07/26 12:47:40 INFO datasources.DefaultWriterContainer: Job job_201707261247_0000 committed. 
17/07/26 12:47:40 INFO parquet.ParquetRelation: Listing hdfs://localhost/user/hive/warehouse/testing.db/table_2 on driver 
17/07/26 12:47:40 INFO spark.SparkContext: Starting job: sql at NativeMethodAccessorImpl.java:-2 
17/07/26 12:47:40 INFO scheduler.DAGScheduler: Got job 9 (sql at NativeMethodAccessorImpl.java:-2) with 2 output partitions 
17/07/26 12:47:40 INFO scheduler.DAGScheduler: Final stage: ResultStage 9 (sql at NativeMethodAccessorImpl.java:-2) 
17/07/26 12:47:40 INFO scheduler.DAGScheduler: Parents of final stage: List() 
17/07/26 12:47:40 INFO scheduler.DAGScheduler: Missing parents: List() 
17/07/26 12:47:40 INFO scheduler.DAGScheduler: Submitting ResultStage 9 (MapPartitionsRDD[26] at sql at NativeMethodAccessorImpl.java:-2), which has no missing parents 
17/07/26 12:47:40 INFO storage.MemoryStore: Block broadcast_10 stored as values in memory (estimated size 71.6 KB, free 1122.9 KB) 
17/07/26 12:47:40 INFO storage.MemoryStore: Block broadcast_10_piece0 stored as bytes in memory (estimated size 25.0 KB, free 1147.9 KB) 
17/07/26 12:47:40 INFO storage.BlockManagerInfo: Added broadcast_10_piece0 in memory on xxxxxxxxxxxxx:9612 (size: 25.0 KB, free: 530.0 MB) 
17/07/26 12:47:40 INFO spark.SparkContext: Created broadcast 10 from broadcast at DAGScheduler.scala:1006 
17/07/26 12:47:40 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from ResultStage 9 (MapPartitionsRDD[26] at sql at NativeMethodAccessorImpl.java:-2) 
17/07/26 12:47:40 INFO cluster.YarnScheduler: Adding task set 9.0 with 2 tasks 
17/07/26 12:47:40 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 9.0 (TID 15, localhost, partition 0,PROCESS_LOCAL, 1975 bytes) 
17/07/26 12:47:40 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 9.0 (TID 16, localhost, partition 1,PROCESS_LOCAL, 2101 bytes) 
17/07/26 12:47:40 INFO storage.BlockManagerInfo: Added broadcast_10_piece0 in memory on localhost:63339 (size: 25.0 KB, free: 3.1 GB) 
17/07/26 12:47:40 INFO storage.BlockManagerInfo: Added broadcast_10_piece0 in memory on localhost:59298 (size: 25.0 KB, free: 3.1 GB) 
17/07/26 12:47:40 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 9.0 (TID 15) in 124 ms on localhost (1/2) 
17/07/26 12:47:41 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 9.0 (TID 16) in 151 ms on localhost (2/2) 
17/07/26 12:47:41 INFO scheduler.DAGScheduler: ResultStage 9 (sql at NativeMethodAccessorImpl.java:-2) finished in 0.158 s 
17/07/26 12:47:41 INFO cluster.YarnScheduler: Removed TaskSet 9.0, whose tasks have all completed, from pool 
17/07/26 12:47:41 INFO scheduler.DAGScheduler: Job 9 finished: sql at NativeMethodAccessorImpl.java:-2, took 0.181504 s 
+0

您可能想考虑简化您的示例代码到最基本的要领。这里有很多细节掩盖了问题的重要部分。 – timchap

+0

您可以发布此脚本创建的日志文件的内容吗? – alvarez

+0

@alvarez该脚本为table1打印一个日志文件,然后打印table2,然后是table3等。我想要的是每个表单独的日志文件。像table1.txt,table2.txt等等 –

回答

1

就我所见,你应该从bash脚本中读取包含表格列表的文件,然后为它们中的每一个发送一个spark-submit。

IFS=$'\n' read -d '' -r -a lines < args_file 
for it in "${lines[@]}" 
do 
    TIMESTAMP=`date "+%Y-%m-%d"` 
    touch /home/$USER/logs/${it}_${TIMESTAMP}.success_log 
    touch /home/$USER/logs/${it}_${TIMESTAMP}.fail_log 
    success_logs=/home/$USER/logs/${it}_${TIMESTAMP}.success_log 
    failed_logs=/home/$USER/logs/${it}_${TIMESTAMP}.fail_log 

    #Function to get the status of the job creation 
    function log_status 
    { 
     status=$1 
     message=$2 
     if [ "$status" -ne 0 ]; then 
       echo "`date +\"%Y-%m-%d %H:%M:%S\"` [ERROR] $message [Status] $status : failed" | tee -a "${failed_logs}" 
       #echo "Please find the attached log file for more details" 
       exit 1 
       else 
        echo "`date +\"%Y-%m-%d %H:%M:%S\"` [INFO] $message [Status] $status : success" | tee -a "${success_logs}" 
       fi 
    } 

    spark-submit --name "${it}" --master "yarn-client" /home/$USER/mysql_spark/mysql_spark.py ${it} ${hivedb} ${domain} ${port} ${mysqldb} ${username} ${password} 
done 

python脚本的第一个参数现在是一个表名:

if len(sys.argv) != 8: 
     print "Invalid number of args......" 
     print "Usage: spark-submit import.py Arguments" 
     exit() 
table = sys.argv[1] 
hivedb = sys.argv[2] 
domain = sys.argv[3] 
port=sys.argv[4] 
mysqldb=sys.argv[5] 
username=sys.argv[6] 
password=sys.argv[7] 

def mysql_spark(table, hivedb, domain, port, mysqldb, username, password): 

    print "*********************************************************table = {} ***************************".format(table) 

    df = sqlContext.read.format("jdbc").option("url", "{}:{}/{}".format(domain,port,mysqldb)).option("driver", "com.mysql.jdbc.Driver").option("dbtable","{}".format(table)).option("user", "{}".format(username)).option("password", "{}".format(password)).load() 

    df.registerTempTable("mytempTable") 

    sqlContext.sql("create table {}.{} stored as parquet as select * from mytempTable".format(hivedb,table)) 


mysql_spark(table, hivedb, domain, port, mysqldb, username, password) 

sc.stop() 

不过,我不知道这将如何影响任务的执行。

+0

实际上,我自己已经有了这个脚本。我想在'python'文件中循环表名的原因是我只想使用一个spark上下文,所以我可以最小化脚本为所有表运行所花费的时间 –

+0

@alvaraz如果我执行'spark-提交所有表格。需要花费10-12秒钟才能创建sparkcontext并运行脚本。 –

+0

@Ashwini我认为这不是一个非常有效的方法。 – alvarez

0

只要运行你的脚本

./script.sh >> log.txt 

应该保存你的日志在log.txt文件。你的脚本打印出来的任何东西!

+0

,正如您所说的,它仅打印出打印命令。我想要的是每次脚本运行时单独的日志文件 –