通过Rfc4180CsvParser导入数据时,是否有排除标题行的方法? COPY
命令具有SKIP
选项,但在使用Vertica SDK中提供的CSV分析器时,该选项似乎不起作用。使用Rfc4180CsvParser将CSV导入到Vertica并排除标题行
背景
作为背景,COPY
命令本身不读的CSV文件。对于简单的CSV文件,可以说COPY schema.table FROM '/data/myfile.csv' DELIMITER ',' ENCLOSED BY '"';
,但是这将失败,数据文件中包含带有嵌入引号的字符串值。
添加ESCAPE AS '"'
将产生错误3210。这是一个问题,因为CSV值被封闭并被"
转义。
Vertica的SDK CsvParser扩展救援
Vertica的/opt/vertica/sdk/examples
下提供了一个SDK与可编译成扩展的C++程序。其中之一是/opt/vertica/sdk/examples/ParserFunctions/Rfc4180CsvParser.cpp
。
这个伟大的工程如下:
cd /opt/vertica/sdk/examples
make clean
vsql
==> CREATE LIBRARY Rfc4180CsvParserLib AS '/opt/vertica/sdk/examples/build/Rfc4180CsvParser.so';
==> COPY myschema.mytable FROM '/data/myfile.csv' WITH PARSER Rfc4180CsvParser();
问题
除了它导入数据文件的第一行作为行上述的伟大工程。 COPY
命令有一个SKIP 1
选项,但这不适用于解析器。
问题
是否possble编辑Rfc4180CsvParser.cpp
跳过第一行,或更好,但需要一定的参数指定的行数跳过?
该方案只是135线,但我没有看到在哪里/如何做这个切口。提示?
复制下面,我没有看到一个公开回购链接到整个程序...
Rfc4180CsvParser.cpp
/* Copyright (c) 2005 - 2012 Vertica, an HP company -*- C++ -*- */
#include "Vertica.h"
#include "StringParsers.h"
#include "csv.h"
using namespace Vertica;
// Note, the class template is mostly for demonstration purposes,
// so that the same class can use each of two string-parsers.
// Custom parsers can also just pick a string-parser to use.
/**
* A parser that parses something approximating the "official" CSV format
* as defined in IETF RFC-4180: <http://tools.ietf.org/html/rfc4180>
* Oddly enough, many "CSV" files don't actually conform to this standard
* for one reason or another. But for sources that do, this parser should
* be able to handle the data.
* Note that the CSV format does not specify how to handle different
* data types; it is entirely a string-based format.
* So we just use standard parsers based on the corresponding column type.
*/
template <class StringParsersImpl>
class LibCSVParser : public UDParser {
public:
LibCSVParser() : colNum(0) {}
// Keep a copy of the information about each column.
// Note that Vertica doesn't let us safely keep a reference to
// the internal copy of this data structure that it shows us.
// But keeping a copy is fine.
SizedColumnTypes colInfo;
// An instance of the class containing the methods that we're
// using to parse strings to the various relevant data types
StringParsersImpl sp;
/// Current column index
size_t colNum;
/// Parsing state for libcsv
struct csv_parser parser;
// Format strings
std::vector<std::string> formatStrings;
/**
* Given a field in string form (a pointer to the first character and
* a length), submit that field to Vertica.
* `colNum` is the column number from the input file; how many fields
* it is into the current record.
*/
bool handleField(size_t colNum, char* start, size_t len) {
if (colNum >= colInfo.getColumnCount()) {
// Ignore column overflow
return false;
}
// Empty colums are null.
if (len==0) {
writer->setNull(colNum);
return true;
} else {
return parseStringToType(start, len, colNum, colInfo.getColumnType(c
olNum), writer, sp);
}
}
static void handle_record(void *data, size_t len, void *p) {
static_cast<LibCSVParser*>(p)->handleField(static_cast<LibCSVParser*>(p)
->colNum++, (char*)data, len);
}
static void handle_end_of_row(int c, void *p) {
// Ignore 'c' (the terminating character); trust that it's correct
static_cast<LibCSVParser*>(p)->colNum = 0;
static_cast<LibCSVParser*>(p)->writer->next();
}
virtual StreamState process(ServerInterface &srvInterface, DataBuffer &input
, InputState input_state) {
size_t processed;
while ((processed = csv_parse(&parser, input.buf + input.offset, input.s
ize - input.offset,
handle_record, handle_end_of_row, this)) > 0) {
input.offset += processed;
}
if (input_state == END_OF_FILE && input.size == input.offset) {
csv_fini(&parser, handle_record, handle_end_of_row, this);
return DONE;
}
return INPUT_NEEDED;
}
virtual void setup(ServerInterface &srvInterface, SizedColumnTypes &returnTy
pe);
virtual void destroy(ServerInterface &srvInterface, SizedColumnTypes &return
Type) {
csv_free(&parser);
}
};
template <class StringParsersImpl>
void LibCSVParser<StringParsersImpl>::setup(ServerInterface &srvInterface, Sized
ColumnTypes &returnType) {
csv_init(&parser, CSV_APPEND_NULL);
colInfo = returnType;
}
template <>
void LibCSVParser<FormattedStringParsers>::setup(ServerInterface &srvInterface,
SizedColumnTypes &returnType) {
csv_init(&parser, CSV_APPEND_NULL);
colInfo = returnType;
if (formatStrings.size() != returnType.getColumnCount()) {
formatStrings.resize(returnType.getColumnCount(), "");
}
sp.setFormats(formatStrings);
}
template <class StringParsersImpl>
class LibCSVParserFactoryTmpl : public ParserFactory {
public:
virtual void plan(ServerInterface &srvInterface,
PerColumnParamReader &perColumnParamReader,
PlanContext &planCtxt) {}
virtual UDParser* prepare(ServerInterface &srvInterface,
PerColumnParamReader &perColumnParamReader,
PlanContext &planCtxt,
const SizedColumnTypes &returnType)
{
return vt_createFuncObj(srvInterface.allocator,
LibCSVParser<StringParsersImpl>);
}
};
typedef LibCSVParserFactoryTmpl<StringParsers> LibCSVParserFactory;
RegisterFactory(LibCSVParserFactory);
typedef LibCSVParserFactoryTmpl<FormattedStringParsers> FormattedLibCSVParserFac
tory;
RegisterFactory(FormattedLibCSVParserFactory);
一个快速评论,我一定会使用多个文件来测试这个。当涉及多个文件时,文档并不清楚输入缓冲区是如何工作的。我会尝试1个文件,2个文件,然后n + 1个文件,其中n =节点数。我会尝试这个有和没有'任何节点'。 – woot
这太棒了,谢谢! – prototype
@ user645715 NP。让我知道它是如何解决你的。正如我在我的评论中提到的那样,我对输入缓冲区的工作原理并不积极,但如果结果不是我认为的那样工作,另一个想法可能是创建一个UDSource。 – woot