search
尋找貓咪~QQ 地點 桃園市桃園區 Taoyuan , Taoyuan

大數據下基於Tensorflow框架的深度學習示例教程

近幾年,信息時代的快速發展產生了海量數據,誕生了無數前沿的大數據技術與應用。在當今大數據時代的產業界,商業決策日益基於數據的分析作出。當數據膨脹到一定規模時,基於機器學習對海量複雜數據的分析更能產生較好的價值,而深度學習在大數據場景下更能揭示數據內部的邏輯關係。本文就以大數據作為場景,通過自底向上的教程詳述在大數據架構體系中如何應用深度學習這一技術。大數據架構中採用的是hadoop系統以及Kerberos安全認證,深度學習採用的是分散式的Tensorflow架構,hadoop解決了大數據的存儲問題,而分散式Tensorflow解決了大數據訓練的問題。本教程是我們團隊在開發基於深度學習的實時欺詐預警服務時,部署深度學習這一模塊時總結出的經驗,感興趣的歡迎深入交流。

安裝Tensorflow

我們安裝Tensorflow選擇的是Centos7,因為Tensorflow需要使用GNU發布的1.5版本的libc庫,Centos6系統並不適用該版本庫而被拋棄。對於如何聯網在線安裝Tensorflow,官網有比較詳盡的教程。本教程著重講一下網上資料較少的離線安裝方式,系統的安裝更需要在意的是各軟體版本的一致性,下面教程也是解決了很多版本不一致的問題后給出的一個方案。首先我們先將整個系統搭建起來吧。

1. 安裝編程語言Python3.5:在官網下載軟體並解壓后執行如下安裝命令:

./configuremakemaketest sudomakeinstall

2. 安裝基於Python的科學計算包python-numpy:在官網下載軟體並解壓后執行如下安裝命令:

python setup.pyinstall

3. 安裝Python模塊管理的工具wheel:在官網下載軟體后執行如下安裝命令:

pip install wheel-0.30.0a0-py2.py3-none-any.whl

4. 安裝自動下載、構建、安裝和管理 python 模塊的工具setuptools:在官網下載軟體並解壓后執行如下安裝命令:

5. 安裝Python開發包python-devel:在官網下載軟體后執行如下安裝命令:

sudo rpm -i --nodeps python3-devel-3.5.2-4.fc25.x86_64.rpm

6. 安裝Python包安裝管理工具six:在官網下載軟體后執行如下安裝命令:

sudo pip installsix-1.10.0-py2.py3-none-any.whl

7. 安裝Java 開發環境JDK8:在官網下載軟體並解壓后執行如下移動命令:

mv java1.8/usr/local/software/jdk

設置JDK的環境變數,編輯文件 .bashrc,加入下面內容

exportJAVA_HOME=/usr/local/software/jdkexportJRE_HOME=${JAVA_HOME}/jreexportCLASSPATH=$CLASSPATH:${JAVA_HOME}/lib:${JRE_HOME}/libexportPATH=$PATH:${JAVA_HOME}/bin

進行Java版本的切換,選擇對應的版本

sudoupdate-alternatives --config javasudoupdate-alternatives --config javac

8. 安裝Bazel:Bazel是一個類似於Make的工具,是Google為其內部軟體開發的特點量身定製的工具,構建Tensorflow項目。在官網下載后執行如下安裝命令:

chmod+x bazel-0.4.3-installer-linux-x86_64.sh./bazel-0.4.3-installer-linux-x86_64.sh –user

9. 安裝Tensorflow:在官網下載軟體后執行如下安裝命令:

pip install--upgrade tensorflow-0.12.1-cp35-cp35m-linux_x86_64.whl

Tensorflow訪問HDFS的部署

1. 首先安裝Hadoop客戶端,在官網下載后執行下面解壓移動命令:

tar zxvf hadoop-2.6.0.tar.gzmv hadoop-2.6.0.tar.gz/usr/local/software/Hadoop

進行環境變數的配置/etc/profile,加入如下內容

exportPATH=$PATH:/usr/local/software/hadoop/binexportLD_LIBRARY_PATH=$LD_LIBRARY_PATH:$JAVA_HOME/jre/lib/amd64/server exportHADOOP_HOME=/usr/local/software/hadoopexportHADOOP_HDFS_HOME=/usr/local/software/hadoop

配置完後進行配置更新source /etc/profile

2. 其次,安裝完客戶端后,配置自己的hadoop集群環境文件。

Tensorflow與Kerberos驗證的部署

在Tesorflow0.12版本中已經支持了Kerberos驗證,本機只要配置好Kerberos文件即可使用。該文中不詳述Kerberos的配置內容,羅列一下相關的配置流程。

  • 首先在/etc/krb5.conf文件中進行伺服器跟驗證策略的配置;

  • 然後在Kerberos服務端生成一個用戶文件傳至本機;

  • 最後進行Kerberos客戶端的許可權認證並設置定時任務。

大數據場景下基於分散式Tensorflow的深度學習示例

一、進行數據格式的轉換

本文的示例是做的MNIST數據的識別模型,為了更好的讀取數據更好的利用內存,我們將本地GZ文件轉換成Tensorflow的內定標準格式TFRecord,然後再將轉換后的文件上傳到HDFS存儲。在實際應用中,我們實際利用Spark做了大規模格式轉換的處理程序。我們對本地數據處理的相應的轉換代碼為:

from__future__importabsolute_importfrom__future__importdivisionfrom__future__importprint_functionimportargparseimportosimporttensorflowastffromtensorflow.contrib.learn.python.learn.datasetsimportmnist SOURCE_URL ='http://yann.lecun.com/exdb/mnist/'TRAIN_IMAGES ='train-images-idx3-ubyte.gz' # MNIST filenamesTRAIN_LABELS ='train-labels-idx1-ubyte.gz'TEST_IMAGES ='t10k-images-idx3-ubyte.gz'TEST_LABELS ='t10k-labels-idx1-ubyte.gz'FLAGS =Nonedef_int64_feature(value): returntf.train.Feature(int64_list=tf.train.Int64List(value=[value]))def_bytes_feature(value): returntf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))defconvert_to(data_set, name): images = data_set.images labels = data_set.labels num_examples = data_set.num_examples ifimages.shape[0] != num_examples: raiseValueError('Images size %d does not match label size %d.'% (images.shape[0], num_examples)) rows = images.shape[1] cols = images.shape[2] depth = images.shape[3] filename = os.path.join(FLAGS.directory, name +'.tfrecords') print('Writing', filename) writer = tf.python_io.TFRecordWriter(filename) forindexinrange(num_examples): image_raw = images[index].tostring example = tf.train.Example(features=tf.train.Features(feature={ 'height': _int64_feature(rows), 'width': _int64_feature(cols), 'depth': _int64_feature(depth), 'label': _int64_feature(int(labels[index])), 'image_raw': _bytes_feature(image_raw)})) writer.write(example.SerializeToString) writer.closedefmain(argv): # Get the data. data_sets = mnist.read_data_sets(FLAGS.directory, dtype=tf.uint8, reshape=False, validation_size=FLAGS.validation_size) # Convert to Examples and write the result to TFRecords. convert_to(data_sets.train,'train') convert_to(data_sets.validation,'validation') convert_to(data_sets.test,'test')if__name__ =='__main__': parser = argparse.ArgumentParser parser.add_argument( '--directory', type=str, default='/tmp/data', help='Directory to download data files and write the converted result' ) parser.add_argument( '--validation_size', type=int, default=5000, help="""\ Number of examples to separate from the training data for the validation set.\ """ ) FLAGS = parser.parse_args tf.app.run

二、Tensorflow讀取HDFS數據的設置

文中前面內容介紹了HDFS的配置以及將數據轉換后存儲到HDFS,Tensorflow讀取HDFS時只需要簡單的兩步,首先執行項目時需要加入環境前綴:

CLASSPATH=$($HADOOP_HDFS_HOME/bin/hadoop classpath --glob)pythonexample.py

其次讀取數據時,需要在數據的路徑前面加入HDFS前綴,比如:

hdfs://default/user/data/example.txt

三、分散式模型的示例代碼

該示例代碼是讀取HDFS上的MNIST數據,建立相應的server與work集群構建出一個三層的深度網路,包含兩層卷積層以及一層SoftMax層。代碼如下:

from__future__importprint_functionimportmathimportosimporttensorflowastf flags = tf.app.flags# Flags for configuring the taskflags.DEFINE_string("job_name"None"job name: worker or ps") flags.DEFINE_integer("task_index"0, "Worker task index, should be >= 0. task_index=0 is " "the chief worker task the performs the variable " "initialization") flags.DEFINE_string("ps_hosts""""Comma-separated list of hostname:port pairs""worker_hosts""")# Training related flagsflags.DEFINE_string("data_dir"None"Directory where the mnist data is stored""train_dir"None"Directory for storing the checkpoints""hidden1"128"Number of units in the 1st hidden layer of the NN""hidden2"128"Number of units in the 2nd hidden layer of the NN""batch_size"100"Training batch size") flags.DEFINE_float("learning_rate"0.01"Learning rate") FLAGS = flags.FLAGS TRAIN_FILE ="train.tfrecords"NUM_CLASSES =10IMAGE_SIZE =28IMAGE_PIXELS = IMAGE_SIZE * IMAGE_SIZEdefinference(images, hidden1_units, hidden2_units): withtf.name_scope('hidden1'): weights = tf.Variable( tf.truncated_normal([IMAGE_PIXELS, hidden1_units], stddev=1.0/ math.sqrt(float(IMAGE_PIXELS))),name='weights') biases = tf.Variable(tf.zeros([hidden1_units]),name='biases') hidden1 = tf.nn.relu(tf.matmul(images, weights) + biases) withtf.name_scope('hidden2'): weights = tf.Variable( tf.truncated_normal([hidden1_units, hidden2_units], stddev=1.0/ math.sqrt(float(hidden1_units))), name='weights') biases = tf.Variable(tf.zeros([hidden2_units]), name='biases') hidden2 = tf.nn.relu(tf.matmul(hidden1, weights) + biases) withtf.name_scope('softmax_linear'): weights = tf.Variable( tf.truncated_normal([hidden2_units, NUM_CLASSES], stddev=1.0/ math.sqrt(float(hidden2_units))),name='weights') biases = tf.Variable(tf.zeros([NUM_CLASSES]),name='biases') logits = tf.matmul(hidden2, weights) + biases returnlogitsdeflossFunction(logits, labels): labels = tf.to_int64(labels) cross_entropy = tf.nn.sparse_softmax_cross_entropy_with_logits( logits, labels, name='xentropy') loss = tf.reduce_mean(cross_entropy, name='xentropy_mean') returnlossdeftraining(loss, learning_rate): tf.summary.scalar(loss.op.name, loss) optimizer = tf.train.GradientDescentOptimizer(learning_rate) global_step = tf.Variable(0, name='global_step', trainable=False) train_op = optimizer.minimize(loss, global_step=global_step) returntrain_opdefread_and_decode(filename_queue): reader = tf.TFRecordReader _, serialized_example = reader.read(filename_queue) features = tf.parse_single_example( serialized_example, # Defaults are not specified since both keys are required. features={ 'image_raw': tf.FixedLenFeature(, tf.string), 'label': tf.FixedLenFeature(, tf.int64), }) # Convert from a scalar string tensor (whose single string has # length mnist.IMAGE_PIXELS) to a uint8 tensor with shape # [mnist.IMAGE_PIXELS]. image = tf.decode_raw(features['image_raw'], tf.uint8) image.set_shape([IMAGE_PIXELS]) image = tf.cast(image, tf.float32) * (1./255) -0.5 # Convert label from a scalar uint8 tensor to an int32 scalar. label = tf.cast(features['label'], tf.int32) returnimage, labeldefinputs(batch_size): """Reads input data. Args: batch_size: Number of examples per returned batch. Returns: A tuple (images, labels), where: * images is a float tensor with shape [batch_size, mnist.IMAGE_PIXELS] in the range [-0.5, 0.5]. * labels is an int32 tensor with shape [batch_size] with the true label, a number in the range [0, mnist.NUM_CLASSES). """ filename = os.path.join(FLAGS.data_dir, TRAIN_FILE) withtf.name_scope('input'): filename_queue = tf.train.string_input_producer([filename]) # Even when reading in multiple threads, share the filename # queue. image, label = read_and_decode(filename_queue) # Shuffle the examples and collect them into batch_size batches. # (Internally uses a RandomShuffleQueue.) # We run this in two threads to avoid being a bottleneck. images, sparse_labels = tf.train.shuffle_batch( [image, label], batch_size=batch_size, num_threads=2, capacity=1000+3* batch_size, # Ensures a minimum amount of shuffling of examples. min_after_dequeue=1000) returnimages, sparse_labelsdefdevice_and_target: # If FLAGS.job_name is not set, we're running single-machine TensorFlow. # Don't set a device. ifFLAGS.job_nameisNone: raiseValueError("Must specify an explicit `job_name`") # Otherwise we're running distributed TensorFlow. print("Running distributed training") ifFLAGS.task_indexisNoneorFLAGS.task_index =="": raiseValueError("Must specify an explicit `task_index`") ifFLAGS.ps_hostsisNoneorFLAGS.ps_hosts =="": raiseValueError("Must specify an explicit `ps_hosts`") ifFLAGS.worker_hostsisNoneorFLAGS.worker_hosts =="": raiseValueError("Must specify an explicit `worker_hosts`") cluster_spec = tf.train.ClusterSpec({ "ps": FLAGS.ps_hosts.split(","), "worker": FLAGS.worker_hosts.split(","), }) server = tf.train.Server( cluster_spec, job_name=FLAGS.job_name, task_index=FLAGS.task_index) return( cluster_spec, server, )defmain(unused_argv): ifFLAGS.data_dirisNoneorFLAGS.data_dir =="": raiseValueError("Must specify an explicit `data_dir`") ifFLAGS.train_dirisNoneorFLAGS.train_dir =="": raiseValueError("Must specify an explicit `train_dir`") cluster_spec, server = device_and_target ifFLAGS.job_name =="ps": server.join elifFLAGS.job_name =="worker": withtf.device(tf.train.replica_device_setter(worker_device ="/job:worker/task:{}".format(FLAGS.task_index), cluster=cluster_spec)): images, labels = inputs(FLAGS.batch_size) logits = inference(images, FLAGS.hidden1, FLAGS.hidden2) loss = lossFunction(logits, labels) train_op = training(loss, FLAGS.learning_rate) withtf.train.MonitoredTrainingSession( master=server.target, is_chief=(FLAGS.task_index ==0), checkpoint_dir=FLAGS.train_dir)assess: whilenotsess.should_stop: sess.run(train_op)if__name__ =="__main__": tf.app.run

四、分散式模型的啟動

首先關閉防火牆

sudoiptable –F

然後在不同的機器上面啟動服務

#在246.1機器上面運行參數伺服器,命令:CLASSPATH=$($HADOOP_HDFS_HOME/bin/hadoopclasspath--glob)python/home/bdusr01/tine/Distributed_Tensorflow_MNIST_Model_Used_NN_Read_TFRecords_On_HDFS_Support_Kerberos.py--ps_hosts=10.142.246.1:1120--worker_hosts=10.142.78.41:112110.142.78.45:1122--data_dir=hdfs://default/user/bdusr01/asy/MNIST_data--train_dir=/home/bdusr01/checkpoint/--job_name=ps--task_index=0#在78.41機器上面運行worker0,命令:classpath--glob)python.py--ps_hosts=10.142.246.1:1120--worker_hosts=10.142.78.41:112110.142.78.45:1122------job_name=worker--task_index=0#在78.45機器上面運行worker1,命令:classpath--glob)python.py--ps_hosts=10.142.246.1:1120--worker_hosts=10.142.78.41:112110.142.78.45:1122------job_name=worker--task_index=1#在78.41機器上面運行監控,命令:tensorboard--logdir=/home/bdusr01/checkpoint/

五、模型監控

我們在剛剛的41機器上面啟動了TensorBoard,可以通過地址進行模型的監控。模型訓練過程中參數可以動態的進行觀測,示例如下:

當我們利用分散式的Tensorflow對大數據進行訓練完成後,可以利用Bazel構建一個靈活高可用的服務–TensorFlow Serving,能夠很方便的將深度學習生產化,解決了模型無法提供服務的弊端。到此為止,本文就將自己項目中的一個基礎模塊的示例介紹完了,本項目更有含金量的是模型建立、工程開發、業務邏輯部分,如有機會再進行更詳細的交流。

作者:丁廷鶴,碩士期間在復旦大學計算機學院上海市智能信息重點實驗室從事數據挖掘學習,目前在上海一家央企總部工作,從事大數據領域spark全棧模塊、機器學習、深度學習方面的開發和研究。



熱門推薦

本文由 yidianzixun 提供 原文連結

寵物協尋 相信 終究能找到回家的路
寫了7763篇文章,獲得2次喜歡
留言回覆
回覆
精彩推薦