search
尋找貓咪~QQ 地點 桃園市桃園區 Taoyuan , Taoyuan

大數據實時處理實戰

作者:武智暉,北京移動網路運行維護中心大數據系統架構師,北京郵電大學軟體工程碩士,高級工程師。多年從事系統架構設計,軟體開發,運營商大數據分析挖掘工作。

隨著互聯網時代的發展,運營商作為內容傳送的管道服務商,在數據領域具有巨大的優勢,如何將這些數據轉化為價值,越來越被運營商所重視。

運營商的大數據具有體量大,種類多的特點,如各類話單、信令等,通常一種話單每天的數據量就有上百億條。隨著業務分析需求對數據處理實時性的要求越來越高,也給我們的大數據處理架構帶來了巨大的挑戰,參照網路上可查的例子,運用到實際處理架構上,經常會因為實時數據流量大,造成系統運行不穩定及各種異常。從大數據實時處理架構開發到上線,耗時近2個月時間,經過大量優化,我們的系統才趨於穩定。最終我們使用10台伺服器的集群,實時處理每天上百億條的數據,這裡每條數據的欄位數量有100個,最長的欄位內容超過1000位元組。

下面就來分享一下我們在實時大數據處理大體量數據的過程中,總結出來的酸甜苦辣。

  • 項目目標

在有限伺服器集群數量的基礎上,實現對每天超過百億條、體量超過20T的某話單進行實時處理。具體需求是FTP收集多台話單伺服器上的詳單,進行實時處理后將數據存儲到Hbase資料庫供用戶即時詳單查詢,同時將話單存儲到Hdfs供離線分析使用。

  • 硬體資源

10台x86伺服器,單機配置16盒CPU,128G內存,2T硬碟*10,300G硬碟*2(系統盤)。

  • 系統架構

10台伺服器組成hadoop集群,其中NameNode節點同時作為採集機安裝FTP和Flume,選取其他5台伺服器安裝Kafka,Zookeeper和Storm實現大數據實時流處理架構,為了充分利用集群計算資源,這5台伺服器也配置了少量的Yarn計算資源,參與日常的離線數據分析需求。剩下的4台伺服器我們安裝了Hbase滿足大數據下的秒級查詢需求,系統拓撲圖如下:

圖一 系統拓撲圖

1.使用的相關技術

我們先來回顧一下相關的大數據架構和開源技術,大數據處理分離線分析架構和實時處理架構。離線分析架構(如Hive,Map/Reduce,Spark Sql等)可以滿足數據後分析,數據挖掘的應用需求。對於實時性要求高的應用,如用戶即時詳單查詢,業務量監控等,需要應用實時處理架構。目前大數據開源實時處理架構最常見的是Storm和Spark Streaming,相比Spark Streaming准實時批處理系統,Strom是更純粹的實時處理系統,即來一條事件就處理一條,具有更高的實時性。

Flume是Cloudera提供的一個高可用的,高可靠的,分散式的海量日誌採集、聚合和傳輸的系統。Flume支持單機也支持集群,支持多種數據源,如不斷寫入的文件、Socket、不斷生成新文件的文件夾等,支持多種輸出,如Hdfs、Kafka、Mysql資料庫等。Flume使用時僅需實現簡單配置,無需開發程序。

Kafka是一種高吞吐量的分散式發布訂閱消息系統,類似一個大數據量的緩存池,支持一份數據多用戶消費。ZooKeeper是一個分散式的,開源的分散式應用程序協調服務,負責存儲集群間部分組件的狀態同步信息。Storm分散式實時計算系統,包含Nimbus主節點和Supervisor從節點(從storm1.0以後,增加了Nimbus備份節點),節點之間需要依靠Zookeeper做狀態同步。Storm集群組件:

  • Nimbus:是Storm集群的master節點,負責資源分配和任務調度。

  • Supervisor:是Storm集群的slave節點,負責接受nimbus分配的任務,啟動和停止屬於自己管理的worker進程,是真正意義上的分散式計算節點。

圖二 Storm集群組件

Storm應用涉及到Java程序的開發,編程模型中涉及的概念:

  • Topology:Storm中運行的一個實時應用程序,各個組件間的消息流動形成邏輯上的一個拓撲結構,Topology一旦啟動,就會常駐內存並佔用worker資源。

  • Spout:在一個Topology中產生源數據流的組件。通常情況下Spout會從外部數據源中讀取數據,然後轉換為Topology內部的源數據。

  • Bolt:在一個Topology中接受數據然後執行處理的組件。Bolt可以執行過濾、函數操作、合併、寫資料庫等任何操作。

  • Tuple:一次消息傳遞的基本單元。

2.開源組件安裝及配置

a)Flume安裝及配置

從http://flume.apache.org/下載flume的安裝包,解壓縮;如果使用Cloudera Manager或者Ambari安裝,僅需通過相應的管理頁面安裝配置。我們僅安裝了單機的Flume,未安裝Flume集群,單機Flume處理效率非常高,完全能夠滿足我們每天處理上百億條數據的需求,但需要說明一點的是Flume魯棒性非常差,經常出現進程在、但數據不處理的進程卡死狀態,使用Flume時要注意以下幾點:

  • flume監控目錄中不能含有目錄;

  • flume正在處理的文件,其他進程不能更改(如FTP正在傳送中的文件,需要設置過濾條件,避免flume處理)。建議flume監控目錄與FTP實時傳送目錄分開,避免flume處理FTP傳送中的文件,導致異常,也可以設置正則表達式忽略正在傳送的文件:

a1.sources.r1.ignorePattern= ^(.)*\\.tmp$
  • flume處理的文件中可能含有特殊字元,導致flume進程卡死。設置遇到不能識別的字元忽略跳過:

a1.sources.r1.decodeErrorPolicy= IGNORE
  • flume運行過程中出現GC over的內存溢出錯誤,配置flume-env.sh中內存配置(默認值很小);

exportJAVA_OPTS="-Xms1024m -Xmx2048m -Dcom.sun.management.jmxremote"
  • flume啟動時-c後面要給全到詳細flume配置文件目錄,否則flume-env.sh中的配置不會載入,會使用默認配置,例如下面啟動命令給全配置文件目錄:

/hadoop/apache-flume-1.6.0-bin/bin/flume-ngagent-c/hadoop/apache-flume-1.6.0-bin/conf/
  • 如果使用內存隊列,請注意內存隊列消息數的配置,設置transactionCapacity隊列大小必須大於等於batchSize;

a1.channels.c1.transactionCapacity=2000a1.sinks.k1.batchSize=2000

增加batchSize可以提升flume處理速度,原理是flume處理的event都保存在transaction隊列中,直到滿足了batchSize的數量條件,才一次性批量向sink發送。但是要注意實際數據量的大小,如果實際數據量很小,batchSize就不能配置過大,否則數據達不到batchSize的數量條件,會長時間積壓在transaction隊列中,後面的實時處理程序反而得不到數據,導致實時性變差;

flume中讀取的一條記錄長度超過2048字元,也就是4096位元組就會被截斷,可以在配置文件中增加如下配置項解決:

producer.sources.s.deserializer.maxLineLength=65535
  • flume字元轉換異常問題,java.nio.charset.MalformedInputException: Input length = 1,可以在配置文件中增加如下配置項解決:

a1.sources.r1.inputCharset= ISO8859-1
  • flume遇到亂碼停止,報異常:java.nio.charset.MalformedInputException,可以在配置文件中增加如下配置,忽略錯誤數據(默認是FAIL,拋異常報錯,flume會停止)解決;

producer.sources.s.decodeErrorPolicy=IGNORE
  • 默認情況下,Flume處理完成的文件會增加.completed後綴,在數據量很大的情況下,會很快撐滿採集機硬碟,可以在配置文件中增加如下配置,讓flume處理完后自動刪除該數據文件解決。

a1.sources.r1.deletePolicy= immediate

a1

.sources

=

r1

a1

.sinks

= k1 a1

.channels

= c1

# Describe/configure the source

a1

.sources.r

1

.type

= spooldir a1

.sources.r

1

.channels

= c1 a1

.sources.r

1

.spoolDir

= /ftpdata/xdr/HTTP_tmp a1

.sources.r

1

.ignorePattern

= ^(.)*\\

.tmp

$ a1

.sources.r

1

.fileHeader

= false a1

.sources.r

1

.deletePolicy

= immediate a1

.sources.r

1

.inputCharset

= ISO8859-

1

a1

.sources.r

1

.deserializer.maxLineLength

=

8192

a1

.sources.r

1

.decodeErrorPolicy

= IGNORE

# Describe the sink

a1

.sinks.k

1

.channel

= c1 a1

.sinks.k

1

.type

= org

.apache.flume.sink.kafka.KafkaSink

a1

.sinks.k

1

.batchSize

=

10000

a1

.sinks.k

1

.brokerList

= stormmaster:

9092

,storm01:

9092

,storm02:

9092

,storm03:

9092

,storm04:

9092

a1

.sinks.k

1

.serializer.class

= kafka

.serializer.StringEncoder

a1

.sinks.k

1

.requiredAcks

=

0

a1

.sinks.k

1

.producer.type

= async a1

.sinks.k

1

.topic

= sighttpnew

# Use a channel which buffers events in memory

a1

.channels.c

1

.type

= memory a1

.channels.c

1

.capacity

=

80000

a1

.channels.c

1

.transactionCapacity

=

10000

a1

.channels.c

1

.keep

-alive =

30

Flume-env.sh配置:

# Enviroment variables can be set here.

exportJAVA_HOME=/usr/java/jdk1.7.0_80

exportFLUME_HOME=/hadoop/apache-flume-1.6.0-bin

# Give Flume more memory and pre-allocate, enable remote monitoring via JMX

exportJAVA_OPTS="-Xms1024m -Xmx2048m -Dcom.sun.management.jmxremote"

# Note that the Flume conf directory is always included in the classpath.

exportFLUME_CLASSPATH="/hadoop/apache-flume-1.6.0-bin/lib"

Flume啟動命令:

/hadoop/apache-flume-1.6.0-bin/bin/flume-ngagent-c/hadoop/apache-flume-1.6.0-bin/conf/-f/hadoop/apache-flume-1.6.0-bin/conf/viewdata.conf-nproducer –Dflume.root.logger=ERROR&

注意一定要給全Flume配置文件的路徑,否則啟動Flume不能正確載入Flume-env.sh的配置。

b)Kafka集群安裝及配置

從下載kafka安裝包:kafka_*.tgz,解壓后,配置server.properties文件。

server.properties配置:

#本機在kafka集群中的id

broker.id=48

#服務埠

port=9092

#主機名

host.name=storm01

# The number of threads handling network requests

num.network.threads=3

# The number of threads doing disk I/O

num.io.threads=8

# The send buffer (SO_SNDBUF) used by the socket server

socket.send.buffer.bytes=102400

# The receive buffer (SO_RCVBUF) used by the socket server

socket.receive.buffer.bytes=102400

# The maximum size of a request that the socket server will accept (protection against OOM)

socket.request.max.bytes=104857600

#kafka數據存儲位置(數據量大時,需要存儲的目錄大小也要充分)

log.dirs=/data1/kafka-logs

#默認topic創建partition的數量

num.partitions=1

# This value is recommended to be increased for installations with data dirs located in RAID array.

num.recovery.threads.per.data.dir=1

#kafka事件只有flash到硬碟才能被後續消費者消費,因此要配置flash時間參數,避免小數據量情況下數據刷新時間過久

log.flush.interval.messages=10000

log.flush.interval.ms=1000

# 數據在kafka中保存的時間,單位小時,超時的數據kafka會自動刪除

log.retention.hours=48

# The maximum size of a log segment file. When this size is reached a new log segment will be created.

log.segment.bytes=1073741824

# The interval at which log segments are checked to see if they can be deleted according

to the retention policies

log.retention.check.interval.ms=300000

# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs can then be marked for log compaction.

log.cleaner.enable=false

# zookeeper集群配置

zookeeper.connect=master:2181,storm01:2181,storm02:2181,storm03:2181,storm04:2181

# Timeout in ms for connecting to zookeeper

zookeeper.connection.timeout.ms=6000

#是否能夠刪除topic的配置,默認false不能刪除topic

delete.topic.enable=true

Kafka服務啟動:jps命令可以看到kafka的進程名,說明kafka已經成功啟動。

nohup kafka-server-start.sh /home/hadoop/kafka_2.9.1-0.8.2.1/config/server.properties &

創建topic:創建複製因子2,有24個partition的topic,創建多個partition的目的是增加并行性,複製因子的目的是數據安全冗餘。

kafka-topics.sh--create--zookeepermaster:2181storm01:2181storm02:2181storm03:2181storm04:2181--replication-factor2--partitions24--topicsighttp

kafka數據存儲方式:在kafka數據存儲目錄下,可以看到以每個-方式命名的文件夾,例如sighttp-19表示topic:sighttp,partition:19,如下圖所示:

圖三

進入topic-partition目錄,可以看到很多.index和.log結尾的文件。其中.log是數據文件,其中存儲的是kafka緩存池中的數據,.index是索引文件,數據文件和索引文件成對出現,文件名為一串數字,標識了該文件中存儲數據的起始序列號,如下:

圖四

kafka數據消費狀態查詢:消費者從kafka消費數據狀態是記錄在zookeeper中的,使用zkCli.sh命令可以查看,如下圖查詢了消費topic:sighttp,partition:0的狀態,offset表明已經處理到49259227840行,如下圖所示:

圖五

經驗:通過消費到的行數與存儲到的行數,可以判斷數據處理程序的速度是否滿足數據生成速度的需求。

kafka消費典型異常:

[2016-10-2716:15:42536] ERROR [Replica ManageronBroker51]: Error when processing fetch requestforpartition [sighttp,3]offset6535061966fromconsumerwithcorrelationid0.Possible cause: Requestforoffset6535061966butwe only havelogsegmentsintherange6580106664to6797636149.(kafka.server.ReplicaManager)

異常原因:kafka中由於消息過期已經把序號是6535061966的消息刪除了,目前kafka中只有範圍是6580106664到6797636149的日誌,但是消費者還要處理過期刪除的消息,那就會出現此異常消息(通常是由於數據處理速度慢,無法滿足數據生成速度的要求,導致消息積壓,積壓的消息到達kafka配置的過期時間,被kafka刪除)。

c)Storm集群安裝及配置

在下載Storm安裝包,建議使用Storm 0.10.0 released以上版本,因為最新版本修正了很多bug,特別是STORM-935的問題(拓撲啟動後會佔用大量系統資源,導致Topology運行不穩定)。

storm.yaml文件配置:

#zookeeper集群伺服器配置

storm.zookeeper.servers: -"master" -"storm01" -"storm02" -"storm03" -"storm04"

#storm主節點

nimbus.host:"master"

#strom管理頁面服務埠

ui.port:8081

#storm從節點服務埠配置,默認6700-6703共4個埠,意味著每台伺服器可以提供4個worker插槽,這裡增加了6704和6705埠,即為單台伺服器增加了2個worker插槽,worker數增加意味著storm集群可以提供更多的計算資源。

supervisor.slots.ports:-6700-6701-6702-6703-6704-6705

#狀態信息存儲位置,避免使用/tmp

storm.local.dir:"/home/hadoop/apache-storm-0.10.0/workdir"

#主節點的內存

nimbus.childopts:"-Xmx3072m"

#從節點的內存

supervisor.childopts:"-Xmx3072m"

#worker的內存,增加內存可以減少GC overload的問題

worker.childopts:"-Xmx3072m"

#默認為30,增加netty超時時長等參數,降低因Netty通信問題,造成worker不穩定

storm.messaging.netty.max_retries:60

#增加storm.messaging.netty.max_wait_ms設置,默認為1000

storm.messaging.netty.max_wait_ms:2000

Storm管理頁面:

瀏覽器輸入Storm UI所在伺服器地址+8081埠號,打開Strom管理頁面如下圖:

圖六

從圖六Cluster Summary中可以看出Storm集群共有4個Supervisor節點,因每台Supervisor提供6個slot(如果在storm.yaml配置文件中不配置supervisor.slots.ports屬性,則每個Supervisor默認提供4個slot),因此共有4*6=24個slot,已使用22個,還有2個空閑。需要注意的是每個拓撲一旦發布,將長久佔用slot,如果沒有足夠的slot,最新發布的拓撲只會佔用空閑的slot,不會搶佔其他已經被佔用的slot資源;如果沒有slot,將無法發布新的拓撲,此時需要挖潛Storm集群伺服器,通過配置文件增加slot資源或增加新的伺服器。

從圖六Topology Summary中可以看出,集群上已經發布了7個Topology,每個Topology佔用的worker資源,啟動的executor線程數,具體資源佔用多少是在Storm Topology開發程序中指定的。

d)Kafka+Storm+Hdfs+Hbase拓撲開發

我們使用Eclipse創建MAVEN工程,在pom.xml配置文件中添加Storm及Hdfs的相關依賴,本例是Storm從Kafka中消費數據,經過ETL處理后存儲到Hdfs和Hbase中,因此需要添加Storm-Kafka、Storm-Hdfs、Storm-Hbase等依賴,注意依賴包版本要與集群一致。

抽取過程繼承BaseRichBolt類:

publicclasssplitBoltextendsBaseRichBolt{ privatestaticfinalString TAB =",";

privateOutputCollector collector;

publicvoidprepare(Map config,TopologyContext context,OutputCollector collector){

this.collector=collector; }

publicvoidexecute(Tuple input){ String line=input.getString(0); String words=line.split(TAB);

if(words.length>74) { String Account;

if(words[0].length>0) Account=words[0];

elseAccount="NULL"; String LocalIPv4;

if(words[1].length>0) LocalIPv4=words[1];

elseLocalIPv4="NULL"; String RemoteIPv4;

if(words[2].length>0) RemoteIPv4=words[2];

elseRemoteIPv4="NULL"; String newline=Account+"|"+LocalIPv4+"|"+RemoteIPv4; collector.emit(input,newValues(newline)); } collector.ack(input); }

publicvoiddeclareOutputFields(OutputFieldsDeclarer declarer){ declarer.declare(newFields("newline")); } }

寫Hbase需要實現HBaseMapper類:

publicclassmyHbaseMapperimplementsHBaseMapper{ publicColumnListcolumns(Tuple tuple) { String line=tuple.getString(0); String words=line.split("\\|"); ColumnList cols =newColumnList;

//參數依次是列族名,列名,值 if(words[1].length>0) cols.addColumn("content".getBytes,"LocalIPv4".getBytes, words[1].getBytes);

if(words[2].length>0) cols.addColumn("content".getBytes,"RemoteIPv4".getBytes, words[2].getBytes);

returncols; }

publicbyterowKey0"\\|"); String key;

//rowkey設置成Account的反字元串,便於hbase表內分區的數據均衡 key=newStringBuilder(words[0]).reverse.toString;

returnkey.getBytes; } }

main函數:

public staticvoidmain(String args) { Stringzks ="master:2181,storm01:2181,storm02:2181 ";//zookeeper集群 Stringtopic ="topicname";//kafka中topic名稱 StringzkRoot ="/storm";//zookeeper中存儲狀態信息的根目錄 Stringid ="kafkatopicname";//zookeeper中存儲本拓撲狀態信息的子目錄 FileNameFormat fileNameFormat =newDefaultFileNameFormat .withPath("/storm/tmp/").withPrefix("tmp_").withExtension(".dat"); RecordFormat format =newDelimitedRecordFormat .withFieldDelimiter("|");//寫到hdfs的目錄文件名以』tmp_』開頭,』.dat』結尾 //每10分鐘重寫一個hdfs的新文件 FileRotationPolicy rotationPolicy =newTimedRotationPolicy(10.0f, TimeUnit.MINUTES); BrokerHosts brokerHosts =newZkHosts(zks);

//配置storm拓撲的spout SpoutConfig spoutConf =newSpoutConfig(brokerHosts, topic, zkRoot, id); spoutConf.scheme =newSchemeAsMultiScheme(newMessageScheme); spoutConf.zkServers = Arrays.asList(newString {"master""storm01""storm02"}); spoutConf.zkPort =2181; spoutConf.ignoreZkOffsets =false;//重啟拓撲時,需要從zookeeper中讀取偏移量 //如果偏移量中的數據已經從kafka中刪除,則從kafka中保存的最早數據開始處理。 spoutConf.startOffsetTime = kafka.api.OffsetRequest.EarliestTime; spoutConf.useStartOffsetTimeIfOffsetOutOfRange =true; //配置hdfs bolt HdfsBolt hdfsBolt =newHdfsBolt .withFsUrl("hdfs://hdfsmaster:9000") .withFileNameFormat(fileNameFormat) .withRecordFormat(format) .withRotationPolicy(rotationPolicy)

//hdfs數據文件寫完后,move到新目錄 .addRotationAction(newMoveFileAction.toDestination("/storm/http/")); //實例化HBaseMapper HBaseMapper mapper =newmyHbaseMapper;

//實例化HBaseBolt,指定hbase中的表名 HBaseBolt hBolt =newHBaseBolt("hbasetable", mapper).withConfigKey("hbase.conf"); TopologyBuilder builder =newTopologyBuilder; //配置spout線程數為24,此數要與kafka中topic的partition數一致,partition數越多,則spout讀取數據的并行性越高,處理速度越快 builder.setSpout("kafka-reader"newKafkaSpout(spoutConf),24); //配置bolt,此bolt開發處理邏輯,bolt可以串接多個 builder.setBolt("etl"newsplitBolt,24).shuffleGrouping("kafka-reader"); builder.setBolt("hdfs-bolt", hdfsBolt,24).shuffleGrouping("etl"); builder.setBolt("hbase-bolt", hBolt,24).shuffleGrouping("etl"); Config conf =newConfig;

//增加hbase配置,指定hbase在hdfs集群上的目錄,zookeeper伺服器集群 Map<StringObject> hbConf =newHashMap<StringObject>; hbConf.put("hbase.rootdir""hdfs://hdfsmaster:9000/hbase"); hbConf.put("hbase.zookeeper.quorum""master,storm01,storm02"); conf.put("hbase.conf", hbConf); Stringname = sighttphdfs.class.getSimpleName; if(args !=null&& args.length >0) { conf.put(Config.NIMBUS_HOST, args[0]); conf.put(Config.TOPOLOGY_ACKER_EXECUTORS,0); //設置拓撲佔用worker數為4,根據實時處理數據量大小按需配置 conf.setNumWorkers(4); StormSubmitter.submitTopologyWithProgressBar(name, conf, builder.createTopology); } }

上面程序實現了Storm讀Kafka寫Hdfs和Hbase的例子,抽取類中可以根據不同的業務需求,通過Java代碼實現不同的邏輯。編譯后的jar包上傳到集群,使用storm命令行提交Topology:

storm jar ./kafkastream.jarsighdfs.sighttphdfsstormmaster

總結

經過幾個月的實際運行,我們的大數據實時處理架構能夠始終保持穩定,話單處理速度高於話單生成速度,有效的支撐了運營商大數據的各種分析查詢需求。開發和優化過程充滿挑戰,經過各種研究和嘗試,問題逐漸解決,在此我們也積累了大量的開發和優化經驗。

最後再分享2個我們實際遇到的問題:

  • Zookeeper配置造成Storm拓撲運行不穩定

因Storm集群需要Zookeeper集群作狀態同步,因此所有是Storm伺服器worker進程都會不停連接Zookeeper節點,Zookeeper節點的默認連接數是60,當Storm計算拓撲數量較多時,需要修改Zookeeper配置maxClientCnxns=1000,增加Zookeeper連接數。

  • Hdfs節點磁碟I/O高造成Storm拓撲運行不穩定

由於Storm是實時計算,每個環節的擁塞都將引起Storm拓撲的不穩定,在開發中我們遇到Hdfs某個節點磁碟I/O高,導致Storm寫Hdfs超時,最終引發Supervisor殺掉worker,造成拓撲不穩定的問題。究其原因是在某個Hdfs節點上,Yarn任務正在進行Reduce操作,用iostat -x 1 10命令查看,Yarn的中間盤I/O長時間被100%佔用,同時Yarn的中間盤也是Hdfs的數據盤,導致寫入請求無法響應,最終導致Storm寫Hdfs的worker超時,引發拓撲運行不穩定。此處建議配置Yarn的中間盤時,不要使用操作系統根盤,不要使用Hdfs的數據盤,可以有效避免Storm寫Hdfs超時的問題。



熱門推薦

本文由 yidianzixun 提供 原文連結

寵物協尋 相信 終究能找到回家的路
寫了7763篇文章,獲得2次喜歡
留言回覆
回覆
精彩推薦