MongoDB千万级数据的分析 一、导入 清单1: 读取CSV文件,存储到数据库中 01 #-*- coding:UTF-8 -*- 02 03 Created on 2013-10-20 04 05 06 @author: tyk 07 08 09 10 11 from pymongo.connection import Connection 12 from time import time 13 import code
MongoDB千万级数据的分析一、导入
清单1:
读取CSV文件,存储到数据库中
01
#-*- coding:UTF-8 -*-
02
'''
03
Created on 2013-10-20
04
05
06
@author: tyk
07
08
09
10
'''
11
from pymongo.connection import Connection
12
from time import time
13
import codecs
14
import csv
15
import os
16
rootdir = "2000W/" # 指明被遍历的文件夹
17
'''
18
19
'''
20
def process_data():
21
conn = Connection('localhost', 27017) #获取一个连接
22
##conn.drop_database('guestHouse')
23
db = conn.TYK
24
guest = db.guestHouse
25
26
27
guest_info = []
28
for parent, dirnames, filenames in os.walk(rootdir): #三个参数:分别返回1.父目录 2.所有文件夹名字(不含路径) 3.所有文件名字
29
for filename in filenames:
30
ErrorLine = []
31
key_length = 0
32
fullname = os.path.join(parent,filename)
33
try:
34
#with codecs.open(fullname, encoding='utf_8') as file:
35
with codecs.open(fullname, encoding='utf_8_sig') as file:#忽略UTF-8文件前面的BOM
36
keys = file.readline().split(',')#先读掉第一行的注释
37
key_length = len(keys)
38
spamreader = csv.reader(file)#以CSV格式读取,返回的不再是str,而是list
39
for line in spamreader:
40
if key_length != len(line):#部分数据不完整,记录下来
41
ErrorLine.append(line)
42
else:
43
each_info = {}
44
for i in range(1, len(keys)):#过滤第一个字段Name,姓名将不再存到数据库中
45
each_info[keys[i]] = line[i]
46
47
guest_info.append(each_info)
48
if len(guest_info) == 10000:#每10000条进行一次存储操作
49
guest.insert(guest_info)
50
guest_info = []
51
52
except Exception, e:
53
print filename + "\t" + str(e)
54
55
#统一处理错误信息
56
with open('ERR/' + os.path.splitext(filename)[0] + '-ERR.csv', 'w') as log_file:
57
spamwriter = csv.writer(log_file)
58
for line in ErrorLine:
59
spamwriter.writerow(line)
60
#最后一批
61
guest.insert(guest_info)
62
63
if __name__ == '__main__':
64
start = time()
65
process_data()
66
stop = time()
67
print(str(stop-start) + "秒")
后来睡着了、关机了,耗时多久也不得而知了⊙﹏⊙b汗
总结:
1.文件编码为UTF-8,不能直接open()打开读取。
2.文件已CSV格式进行存储,读取时用CSV模块处理来读取。这是读出来的数据每行为一个list。注意,不能简单的以","拆分后进行读取。对于这种形状"a,b,c", d的数据是无法正确解析的。
3.对于UTF-8文件,如果有BOM的形式去读是要以'utf_8_sig'编码读取,这样会跳过开头的BOM。如果不处理掉BOM,BOM会随数据一同存到数据库中,造成类似" XXX"的现象(有一个空格的假象)。
如果真的已经存到库中了,那只有改key了
1
db.guestHouse.update({}, {"$rename" : {" Name" : "Name"}}, false, true)
另外,网上还有一种方法(尝试失败了,具体原因应该是把字符串转换成字节码然后再去比较。怎么转这个我还不会...)
1
#with codecs.open(fullname, encoding='utf-8') as file:
2
with codecs.open(fullname, encoding='utf_8_sig') as file:
3
keys = file.readline().split(',')
4
if keys[0][:3] == codecs.BOM_UTF8:#将keys[0]转化为字节码再去比较
5
keys[0] = keys[0][3:]
扩展:
今天发现MongoDB本身就带有导入功能mongoimport,可以直接导入CSV文件...
小试一把
1.不做错误数据过滤,直接导入。用专利引用数据做一下实验(《Hadoop权威指南》一书中的实验数据)
实验数据:
01
"PATENT","GYEAR","GDATE","APPYEAR","COUNTRY","POSTATE","ASSIGNEE","ASSCODE","CLAIMS","NCLASS","CAT","SUBCAT","CMADE","CRECEIVE","RATIOCIT","GENERAL","ORIGINAL","FWDAPLAG","BCKGTLAG","SELFCTUB","SELFCTLB","SECDUPBD","SECDLWBD"
02
3070801,1963,1096,,"BE","",,1,,269,6,69,,1,,0,,,,,,,
03
3070802,1963,1096,,"US","TX",,1,,2,6,63,,0,,,,,,,,,
04
3070803,1963,1096,,"US",
05
"IL",,1,,2,6,63,,9,,0.3704,,,,,,,
06
3070804,1963,1096,,"US","OH",,1,,2,6,63,,3,,0.6667,,,,,,,
07
3070805,1963,1096,,"US","CA",,1,,2,6,63,,1,,0,,,,,,,
08
3070806,1963,1096,,"US","PA",,1,,2,6,63,,0,,,,,,,,,
09
3070807,1963,1096,,"US","OH",,1,,623,3,39,,3,,0.4444,,,,,,,
10
3070808,1963,1096,,"US","IA",,1,,623,3,39,,4,,0.375,,,,,,,
11
3070809,1963,1096,,,,1,,4,6,65,,0,,,,,,,,,
1
mongoimport -d TYK -c guest --type csv --file d:\text.csv --headerline
一共11行。第一行注释,9条数据。第3条中间截断,第9条取出中间两个数值"US","AZ"。按照csv规定现在应该是10条数据
结果:
01
> db.guest.find({}, {"PATENT" : 1, "_id" : 1})
02
{ "_id" : ObjectId("52692c2a0b082a1bbb727d86"), "PATENT" : 3070801 }
03
{ "_id" : ObjectId("52692c2a0b082a1bbb727d87"), "PATENT" : 3070802 }
04
{ "_id" : ObjectId("52692c2a0b082a1bbb727d88"), "PATENT" : 3070803 }
05
{ "_id" : ObjectId("52692c2a0b082a1bbb727d89"), "PATENT" : "IL" }
06
{ "_id" : ObjectId("52692c2a0b082a1bbb727d8a"), "PATENT" : 3070804 }
07
{ "_id" : ObjectId("52692c2a0b082a1bbb727d8b"), "PATENT" : 3070805 }
08
{ "_id" : ObjectId("52692c2a0b082a1bbb727d8c"), "PATENT" : 3070806 }
09
{ "_id" : ObjectId("52692c2a0b082a1bbb727d8d"), "PATENT" : 3070807 }
10
{ "_id" : ObjectId("52692c2a0b082a1bbb727d8e"), "PATENT" : 3070808 }
11
{ "_id" : ObjectId("52692c2a0b082a1bbb727d8f"), "PATENT" : 3070809 }
12
> db.guest.count()
13
10
14
刚好10条,可见此命令导入是不会过滤异常数据。
2.以UTF-8有BOM格式再试一次。实验数据同上
01
> db.guest.find({}, {"PATENT" : 1, "_id" : 1})
02
{ "_id" : ObjectId("52692d730b082a1bbb727d90"), "PATENT" : 3070801 }
03
{ "_id" : ObjectId("52692d730b082a1bbb727d91"), "PATENT" : 3070802 }
04
{ "_id" : ObjectId("52692d730b082a1bbb727d92"), "PATENT" : 3070803 }
05
{ "_id" : ObjectId("52692d730b082a1bbb727d93"), "PATENT" : "IL" }
06
{ "_id" : ObjectId("52692d730b082a1bbb727d94"), "PATENT" : 3070804 }
07
{ "_id" : ObjectId("52692d730b082a1bbb727d95"), "PATENT" : 3070805 }
08
{ "_id" : ObjectId("52692d730b082a1bbb727d96"), "PATENT" : 3070806 }
09
{ "_id" : ObjectId("52692d730b082a1bbb727d97"), "PATENT" : 3070807 }
10
{ "_id" : ObjectId("52692d730b082a1bbb727d98"), "PATENT" : 3070808 }
11
{ "_id" : ObjectId("52692d730b082a1bbb727d99"), "PATENT" : 3070809 }
12
> db.guest.count()
13
10
结果同上面一样,key"PATENT "中并没有因BOM引起的空格
3.mongoimport命令解释
1
mongoimport -d TYK -c guest --type csv --file d:\text.csv --headerline
2
-d 数据库
3
-c 集合
4
--type 数据格式
5
--file 文件路径
6
--headerline 貌似指定这个后以第一行为key,另 -f 可以指定key “-f Name, age”
二、统计分析
1.根据性别统计
由于数据不规范,先查询一下有多少种方式来表示性别的
1
db.runCommand({"distinct" : "guestHouse", "key" : "Gender"})
01
{
02
"values" : [
03
"M",
04
"F",
05
"0",
06
" ",
07
"1",
08
"",
09
"19790522",
10
"#0449",
11
"#M",
12
"
13 "N"
14 ],
15 "stats" : {
16 "n" : 20048891,
17 "nscanned" : 20048891,
18 "nscannedObjects" : 20048891,
19 "timems" : 377764,
20 "cursor" : "BasicCursor"
21 },
22 "ok" : 1
23 }
一共有11中方式表示性别的...那就以M、F做下统计吧
1 #总数据
2 db.guestHouse.count()
3 20048891
1 #男 M
2 db.guestHouse.count({"Gender":"M"})
3 12773070
4 64%
1 #女 F
2 db.guestHouse.count({"Gender":"F"})
3 6478745
4 32%
饼状图
总结:
1.带条件count时速度是非常慢的,猜测在count时可能先进行的查询操作,如果是查询加索引效果会好很多。对Gender加索引,效果明显提高了,但仍然是N秒级别的。显然在实时情况下还是不行的。另外随意加索引也会遇其它方面的问题。在用索引时能达到一个平衡点很重要的啊。
1 2013-10-24
2 查看count的js解释
3 > db.guestHouse.count
4 function ( x ){
5 return this.find( x ).count();
6 }
7 >
8 果然是先find,后count
2、根据身份证分析性别
从上面数据看,大约有4%的数据性别不详。
15位身份证号码:第7、8位为出生年份(两位数),第9、10位为出生月份,第11、12位代表出生日期,第15位代表性别,奇数为男,偶数为女。 18位身份证号码:第7、8、9、10位为出生年份(四位数),第11、第12位为出生月份,第13、14位代表出生日期,第17位代表性别,奇数为男,偶数为女。
要根据身份证来分析的话,明显不好直接处理分析了。那么就尝试一下编写MapReduce算一下吧,但是单机MapReduce速度会更慢。
先了解一下数据,看看有多少证件类型
01 > db.runCommand({"distinct" : "guestHouse", "key" : "CtfTp"})
02 {
03 "values" : [
04 "OTH",
05 "GID",
06 "ID",
07 "TBZ",
08 "VSA",
09 "TXZ",
10 "JID",
11 "JZ",
12 "HXZ",
13 "JLZ",
14 "#ID",
15 "hvz",
16 "待定",
17 "11",
18 "",
19 "SBZ",
20 "JGZ",
21 "HKB",
22 "TSZ",
23 "JZ1",
24 " ",
25 "Id",
26 "#GID",
27 "1"
28 ],
29 "stats" : {
30 "n" : 20048891,
31 "nscanned" : 20048891,
32 "nscannedObjects" : 20048891,
33 "timems" : 610036,
34 "cursor" : "BasicCursor"
35 },
36 "ok" : 1
37 }
38 >
数据依旧的乱,那就暂且以"ID"来统计一下吧
01 >map = function() {
02 if (this.CtfTp == "ID") {
03 if (this.CtfId.length == 18){
04 emit(parseInt(this.CtfId.charAt(16)) % 2, {count : 1}) //1为男,0为女
05 }else if (this.CtfId.length == 15) {
06 emit(parseInt(this.CtfId.charAt(14)) % 2, {count : 1}) //无法解析时为NaN
07 }
08 } else {
09 emit(-1, {count : 1})
10 }
11 }
12
13 >reduce = function(key, emits) {
14 total = 0;
15 for (var i in emits) {
16 total += emits[i].count;
17 }
18 return {"count" : total};
19 }
20
21 >mr = db.runCommand(
22 {
23 mapReduce: "guestHouse",
24 map: map,
25 reduce: reduce,
26 out: "TYK.guestHouse.output",
27 verbose: true
28 }
29 )
30 >{
31 "result" : "guestHouse.output",
32 "timeMillis" : 999097,
33 "timing" : {
34 "mapTime" : 777955,
35 "emitLoop" : 995248,
36 "reduceTime" : 111217,
37 "mode" : "mixed",
38 "total" : 999097
39 },
40 "counts" : {
41 "input" : 20048891,
42 "emit" : 19928098,
43 "reduce" : 594610,
44 "output" : 4
45 },
46 "ok" : 1
47 }
结果:
1 > db.guestHouse.output.find()
2 { "_id" : NaN, "value" : { "count" : 1360 } }
3 { "_id" : -1, "value" : { "count" : 1161164 } }
4 { "_id" : 0, "value" : { "count" : 6831007 } }
5 { "_id" : 1, "value" : { "count" : 11934567 } }
6 >