hadoop鎬庝箞璇诲彇csv鏂囦欢
Hadoop鏈韩骞朵笉鐩存帴鏀寔CSV鏂囦欢鐨勮鍙栵紝浣嗗彲浠ラ€氳繃Hadoop鐨凪apReduce妗嗘灦鎴朒ive绛夊伐鍏锋潵璇诲彇CSV鏂囦欢銆?/p>
-
浣跨敤MapReduce妗嗘灦璇诲彇CSV鏂囦欢锛?鍙互缂栧啓涓€涓狹apReduce绋嬪簭鏉ヨ鍙朇SV鏂囦欢銆傚湪Mapper闃舵锛屽皢CSV鏂囦欢涓殑姣忎竴琛屼綔涓鸿緭鍏ワ紝骞跺皢鍏舵媶鍒嗕负瀛楁锛涘湪Reducer闃舵锛屽皢澶勭悊杩囩殑鏁版嵁鍐欏叆HDFS鎴栧叾浠栧瓨鍌ㄤ腑銆?/p>
-
浣跨敤Hive璇诲彇CSV鏂囦欢锛?Hive鏄缓绔嬪湪Hadoop涔嬩笂鐨勬暟鎹粨搴撳伐鍏凤紝鍙互閫氳繃Hive鐨凷QL璇█鏉ユ煡璇㈠拰澶勭悊鏁版嵁銆傚彲浠ュ垱寤轰竴涓閮ㄨ〃鏉ヨ鍙朇SV鏂囦欢锛屽苟浣跨敤Hive鐨勬煡璇㈣鍙ユ潵鎿嶄綔杩欎簺鏁版嵁銆?/p>
绀轰緥浠g爜锛?/p>
浣跨敤MapReduce妗嗘灦璇诲彇CSV鏂囦欢鐨勭ず渚嬩唬鐮侊細
public class CSVReader {
public static class CSVMapper extends Mapper<LongWritable, Text, Text, Text> {
public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String line = value.toString();
String[] fields = line.split(",");
// 澶勭悊CSV鏂囦欢涓殑姣忎竴琛屾暟鎹?/span>
context.write(new Text(fields[0]), new Text(fields[1]));
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf, "CSVReader");
job.setJarByClass(CSVReader.class);
job.setMapperClass(CSVMapper.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job, new Path("input.csv"));
FileOutputFormat.setOutputPath(job, new Path("output"));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
浣跨敤Hive璇诲彇CSV鏂囦欢鐨勭ず渚嬩唬鐮侊細
CREATE EXTERNAL TABLE my_table (
col1 STRING,
col2 STRING,
col3 INT
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
LOCATION '/path/to/csv/file';
SELECT * FROM my_table;
閫氳繃浠ヤ笂涓ょ鏂规硶锛屽彲浠ュ湪Hadoop涓婅鍙朇SV鏂囦欢骞惰繘琛岀浉搴旂殑鏁版嵁澶勭悊鎿嶄綔銆?/p>