Efficient training of deep recommenders on cloud.

Overview

HybridBackend

Tensorflow 1.15 CPU CI Build Badge Documentation Status

Introduction

HybridBackend is a training framework for deep recommenders which bridges the gap between evolving cloud infrastructure and complex training process. See documentation for more information.

bridging

Installation

Install latest CPU version:

pip install hybridbackend-cpu  # tensorflow/tensorflow:1.15.5-py3

Contributing

We appreciate all contributions to improve HybridBackend. Please follow below steps to contribute:

1. Clone the repository and checkout a new branch.

git clone <git_repo_addr>
git pull -r
git checkout -b features/my_feature

2. Commit changes, check code style and test.

git commit
cibuild/run cibuild/format
cibuild/run cibuild/lint
cibuild/run make -j8
cibuild/run make test

3. Create pull request for code review.

Comments
  • Using shuffle or rebatch may cause OOM problem

    Using shuffle or rebatch may cause OOM problem

    1. Current behavior

    Using shuffle or rebatch may cause OOM problem.

    1.1 小文件测试记录

    total parquet file count: 15780

    total parquet file size: 126G

    total sample count: 600w

    No. | Test Scenarios | RAM Usage -- | -- | -- 1 | no shuffle no rebatch | use 50G ram stable 2 | shuffle,buffer_size=2048 rebatch,batch_size=8192 | start at 53G ram, and increasingly use more ram rapidly, exceed 94G ram limit within 2 minutes 3 | shuffle,buffer_size=8 rebatch,batch_size=8192 | increasingly use more ram slowly, exceed 94G ram limit after 2 hours 4 | no shuffle rebatch,batch_size=8192 | increasingly use more ram slowly, exceed 94G ram limit after 2 hours 5 | shuffle,buffer_size=8 no rebatch | use 50G ram stable at first, but increasingly use more ram after 1 hour

    image-20220303211253747

    image-20220303211236154

    image-20220303211125580

    image-20220303211314882

    image-20220303211215059

    1.2 大文件测试记录

    total parquet file count: 240

    total parquet file size: 126G

    单个parquet文件大小:500MB

    total sample count: 600w

    微信图片_20220323124551

    微信图片_20220323124557

    现象:每个epoch训练结束时,有明显内存回收的过程,但是回收不干净,导致每个epoch后使用的内存峰值越来越多,最终OOM。但是如果1、2个epoch内能训练完,内存不会爆。

    1.3 不同训练方式测试对比

    | | 新sdk | 旧sdk | | ------------ | ------------- | ------------------------------------------------------------ | | 是否内存溢出 | 是 | 否 | | 环境 | T4 单机单卡 | T4 单机单卡 | | 训练方式 | session.run() | tf.estimator.train_and_evaluate(estimator, train_spec, eval_spec) | | 调用链 | 见下图1 | 见下图2 |

    微信图片_20220323124545

    微信图片_20220323124542

    2. Expected behavior

    During the training process, tensorflow should use stable amount of RAM, not using more and more RAM.

    3. System information

    • OS Platform and Distribution: Ubuntu 18.04.5 LTS
    • TensorFlow version: 1.15.0
    • Python version: 3.6
    • CUDA/cuDNN version: 10.1
    • RAM: 94G
    • GPU model and memory: Tesla T4, 16G

    4. Code to reproduce

    BATCH_SIZE = 8192
    
    parquet_file_list = ['some_parquet_file1.snappy.parquet', 'some_parquet_file2.snappy.parquet', ...]
    filenames_ds = tf.data.TFRecordDataset.from_tensor_slices(file_list)
    hb_fields = []
    hb_fields.append(hb.data.DataFrame.Field('int_field', tf.int64, ragged_rank=0))
    hb_fields.append(hb.data.DataFrame.Field('float_field', tf.float32, ragged_rank=0))
    hb_fields.append(hb.data.DataFrame.Field('array_field', tf.float32, ragged_rank=1))   # ... and some anthor fields
    
    
    # 1. no shuffle, no rebatch
    ds = filenames_ds.apply(hb.data.read_parquet(BATCH_SIZE, hb_fields, num_parallel_reads=tf.data.experimental.AUTOTUNE))
    
    # 2. big shuffle and rebatch
    ds = filenames_ds.apply(hb.data.read_parquet(BATCH_SIZE, hb_fields, num_parallel_reads=tf.data.experimental.AUTOTUNE))
    ds.shuffle(2048)
    ds.apply(hb.data.rebatch(BATCH_SIZE, fields=fields))
    
    # 3. small shuffle and rebatch
    ds = filenames_ds.apply(hb.data.read_parquet(BATCH_SIZE, hb_fields, num_parallel_reads=tf.data.experimental.AUTOTUNE))
    ds.shuffle(8)
    ds.apply(hb.data.rebatch(BATCH_SIZE, fields=fields))
    
    # 4. no shuffle and rebatch
    ds = filenames_ds.apply(hb.data.read_parquet(BATCH_SIZE, hb_fields, num_parallel_reads=tf.data.experimental.AUTOTUNE))
    ds.apply(hb.data.rebatch(BATCH_SIZE, fields=fields))
    
    # 5. small shuffle and no rebatch
    ds = filenames_ds.apply(hb.data.read_parquet(BATCH_SIZE, hb_fields, num_parallel_reads=tf.data.experimental.AUTOTUNE))
    ds.shuffle(8)
    

    Willing to contribute

    Yes

    opened by liurcme 5
  • Question: Data Loading Performance with 150G Byte/s

    Question: Data Loading Performance with 150G Byte/s

    Hi, thanks for open source this project,it's a great job!🥂 🍻

    I saw the Data Loading doc here,the ParquetDataset is to solve IO performance issues on cloud.

    According to the doc, the speed of reading and decoding ParquetDataset is about 150G Byte/s (3346.10MB/21.67ms) , equals max throughput of 12X 100G bit/s NIC, it's nearly impossible on cloud(hdfs/oss/s3).

    File Format | Size (MB) | Framework | #Threads | Elapsed (ms) ----------- | --------- | ------------- | -------- | ------------ CSV | 11062.61 | Tensorflow | 1 | 8558.38 Parquet | 3346.10 | Tensorflow IO | 1 | 103056.71 Parquet | 3346.10 | HybridBackend | 1 | 397.88 Parquet | 3346.10 | HybridBackend | 20 | 21.67

    Is it convenient to provide details of test environment? Apart from code of Dataset module, will HybridBackend engine code be released in the future?

    Thanks 🥂 🍻

    opened by neuzxy 4
  • Feature Request: Supports prefetching data to GPU

    Feature Request: Supports prefetching data to GPU

    User Story

    As a recommender system engineer, I want to read large batch of tabular data on GPU efficiently, so that training performance of large deep recommenders can be improved.

    Detailed requirements

    • It should be easy to use with TensorFlow Dataset API

    API Compatibility

    • Only new APIs should be introduced.

    Willing to contribute

    Yes

    enhancement 
    opened by 2sin18 3
  • What version of snappy should I install for building HB from source?

    What version of snappy should I install for building HB from source?

    Summary

    I am trying to build HB from source, when i use the make -j8 command from the work dir, i get the following error :

    (base) [email protected]:/HybridBackend# make -j8
    mkdir -p /root/projects/tmp/HybridBackend/arrow/build/
    ARROW_INSTALL=/root/projects/tmp/HybridBackend/arrow/dist \
    ARROW_BUILD=/root/projects/tmp/HybridBackend/arrow/build \
    ARROW_OSX_TARGET= \
    USE_CXX11_ABI=0 \
    WITH_ARROW_HDFS=ON \
    WITH_ARROW_S3=ON \
    SIMD_LEVEL=AVX2 \
    OS=Linux \
    bash arrow/build.sh
    -- Building using CMake version: 3.16.3
    -- Arrow version: 5.0.0 (full: '5.0.0')
    -- Arrow SO version: 500 (full: 500.0.0)
    -- clang-tidy not found
    -- clang-format not found
    -- Could NOT find ClangTools (missing: CLANG_FORMAT_BIN CLANG_TIDY_BIN)
    -- infer not found
    -- Found cpplint executable at /root/projects/tmp/HybridBackend/arrow/src/cpp/build-support/cpplint.py
    -- System processor: x86_64
    -- Arrow build warning level: PRODUCTION
    Using ld linker
    Configured for RELEASE build (set with cmake -DCMAKE_BUILD_TYPE={release,debug,...})
    -- Build Type: RELEASE
    -- Using CONDA approach to find dependencies
    -- Using CONDA_PREFIX for ARROW_PACKAGE_PREFIX: /root/miniconda3
    -- Setting (unset) dependency *_ROOT variables: /root/miniconda3
    -- ARROW_ABSL_BUILD_VERSION: 0f3bb466b868b523cf1dc9b2aaaed65c77b28862
    -- ARROW_AWSSDK_BUILD_VERSION: 1.8.133
    -- ARROW_AWS_CHECKSUMS_BUILD_VERSION: v0.1.10
    -- ARROW_AWS_C_COMMON_BUILD_VERSION: v0.5.10
    -- ARROW_AWS_C_EVENT_STREAM_BUILD_VERSION: v0.1.5
    -- ARROW_BOOST_BUILD_VERSION: 1.75.0
    -- ARROW_BROTLI_BUILD_VERSION: v1.0.9
    -- ARROW_BZIP2_BUILD_VERSION: 1.0.8
    -- ARROW_CARES_BUILD_VERSION: 1.17.1
    -- ARROW_GBENCHMARK_BUILD_VERSION: v1.5.2
    -- ARROW_GFLAGS_BUILD_VERSION: v2.2.2
    -- ARROW_GLOG_BUILD_VERSION: v0.4.0
    -- ARROW_GRPC_BUILD_VERSION: v1.35.0
    -- ARROW_GTEST_BUILD_VERSION: 1.10.0
    -- ARROW_JEMALLOC_BUILD_VERSION: 5.2.1
    -- ARROW_LZ4_BUILD_VERSION: v1.9.3
    -- ARROW_MIMALLOC_BUILD_VERSION: v1.7.2
    -- ARROW_ORC_BUILD_VERSION: 1.6.6
    -- ARROW_PROTOBUF_BUILD_VERSION: v3.14.0
    -- ARROW_RAPIDJSON_BUILD_VERSION: 1a803826f1197b5e30703afe4b9c0e7dd48074f5
    -- ARROW_RE2_BUILD_VERSION: 2021-02-02
    -- ARROW_SNAPPY_BUILD_VERSION: 1.1.8
    -- ARROW_THRIFT_BUILD_VERSION: 0.13.0
    -- ARROW_THRIFT_BUILD_MD5_CHECKSUM: 38a27d391a2b03214b444cb13d5664f1
    -- ARROW_UTF8PROC_BUILD_VERSION: v2.6.1
    -- ARROW_XSIMD_BUILD_VERSION: e9234cd6e6f4428fc260073b2c34ffe86fda1f34
    -- ARROW_ZLIB_BUILD_VERSION: 1.2.11
    -- ARROW_ZSTD_BUILD_VERSION: v1.5.0
    -- Boost include dir: /usr/include
    -- Boost libraries: Boost::system;Boost::filesystem
    CMake Error at /usr/share/cmake-3.16/Modules/FindPackageHandleStandardArgs.cmake:146 (message):
      Could NOT find Snappy (missing: Snappy_LIB Snappy_INCLUDE_DIR)
    Call Stack (most recent call first):
      /usr/share/cmake-3.16/Modules/FindPackageHandleStandardArgs.cmake:393 (_FPHSA_FAILURE_MESSAGE)
      cmake_modules/FindSnappy.cmake:55 (find_package_handle_standard_args)
      cmake_modules/ThirdpartyToolchain.cmake:235 (find_package)
      cmake_modules/ThirdpartyToolchain.cmake:948 (resolve_dependency)
      CMakeLists.txt:515 (include)
    
    
    -- Configuring incomplete, errors occurred!
    See also "/root/projects/tmp/HybridBackend/arrow/build/CMakeFiles/CMakeOutput.log".
    See also "/root/projects/tmp/HybridBackend/arrow/build/CMakeFiles/CMakeError.log".
    make: *** [arrow/Makefile:8: /root/projects/tmp/HybridBackend/arrow/build/install_manifest.txt] Error 1
    
    

    I have install libsnappy-dev and can found it from /usr/local/include/snappy.h and /usr/local/lib/libsnappy.a, but that error still exists, so how should I install the correct snappy version for building HB?

    I also tryed the docker images from registry.cn-shanghai.aliyuncs.com/pai-dlc/hybridbackend:developer-tensorflow1.15-manylinux_2_27-py3.6-cu114, same error exists.

    Installation environment

    • GPU model and memory:
    • OS Platform: "20.04.3 LTS (Focal Fossa)"
    • Docker version:
    • GCC/CUDA/cuDNN version: 11.4
    • Python/conda version: Python 3.8.10/ conda 4.10.3
    • TensorFlow/PyTorch version: 1.15.5+deeprec2201

    Willing to contribute

    Yes

    opened by fuhailin 3
  • to_sparse failed for Value with ragged_rank > 1 read from parquet file

    to_sparse failed for Value with ragged_rank > 1 read from parquet file

    Current behavior

    when hb read some nested lists with ragged_rank > 1,the read Value cannot be transformed to SparseTensor by function hb.data.to_sparse.

    For example: dense_feature is one of the features read by hb.data.ParquetDataset, and to_sparse does not work for it. image

    Moreover, if I swap the order of the two nested_row_splits, then it can be to_sparse.

    image

    So maybe the order of the nested_row_splits when reading parquet file is incorrect?

    Expected behavior

    the Value read from parquet file can be transformed to SparseTensor.

    System information

    • GPU model and memory: No
    • OS Platform: Ubuntu
    • Docker version: No
    • GCC/CUDA/cuDNN version: 7.4/No/No
    • Python/conda version:3.6.13/4.13.0
    • TensorFlow/PyTorch version:1.14.0

    Code to reproduce

    import tensorflow as tf
    import hybridbackend.tensorflow as hb
    dataset = hb.data.ParquetDataset("test2.zstd.parquet", batch_size=1)
    dataset = dataset.apply(hb.data.to_sparse())
    iterator = dataset.make_one_shot_iterator()
    next_element = iterator.get_next()
    sess = tf.Session()
    vals = sess.run(next_element)
    
    # One more simple demo:
    import tensorflow as tf
    import hybridbackend.tensorflow as hb
    val = hb.data.dataframe.DataFrame.Value(values = np.array([1,2,3,4,5]), nested_row_splits=(np.array([0,1,3,4,5]), np.array([0,2,4])))
    sess = tf.Session()
    sess.run(val.to_sparse())
    

    Willing to contribute

    Yes

    bug 
    opened by SamJia 2
  • rebatch api produce an Check failed: limit <= dim0_size error

    rebatch api produce an Check failed: limit <= dim0_size error

    Current behavior

    After rebatch(), data iterator get_next() produce an error:

    F tensorflow/core/framework/tensor.cc:833] Check failed: limit <= dim0_size (8194 vs. 8193)
    

    Expected behavior

    no error

    System information

    • OS Platform and Distribution: Ubuntu 18.04.5 LTS
    • TensorFlow version: 1.15.0
    • Python version: 3.6
    • CUDA/cuDNN version: 10.1
    • RAM: 94G
    • GPU model and memory: Tesla T4, 16G

    Code to reproduce

    Step 1: Generate a parquet file by running following code

    import numpy as np
    import pandas as pd
    import random
    
    data_list = []
    for i in range(1, 10000):
        int_feature = random.randint(1, 100)
        # float_feature = random.random()
        array_feature = [random.randint(1, 10) for x in range(0, 4)]
        data_list.append([int_feature, array_feature])
    
    df = pd.DataFrame(data_list, columns=["int_feature", "array_feature"])
    df.to_parquet("parquet_sample_file.parquet")
    

    Step 2: Load generated parquet in step 1 by HybridBackend

    import tensorflow as tf
    import hybridbackend.tensorflow as hb
    
    
    filenames_ds = tf.data.Dataset.from_tensor_slices(['file1.snappy.parquet', 'file2.snappy.parquet', ... 'fileN.snappy.parquet'])
    
    
    hb_fields = []
    hb_fields.append(hb.data.DataFrame.Field("feature1", tf.int64, ragged_rank=0))
    hb_fields.append(hb.data.DataFrame.Field("feature2", tf.float32, ragged_rank=1))
    hb_fields.append(hb.data.DataFrame.Field("feature3", tf.int64, ragged_rank=1))
    
    ds = filenames_ds.apply(hb.data.read_parquet(8192, hb_fields, num_parallel_reads=tf.data.experimental.AUTOTUNE))
    iterator = ds.apply(hb.data.rebatch(8192, fields=hb_fields))
    
    it = iterator.make_one_shot_iterator()
    item = it.get_next()
    
    batch_size_dict = {}
    with tf.Session() as sess:
        print("======  start ======")
        total_batch_size = 0
        while True:
            try:
                batch = sess.run(item)
                batch_size = len(batch['mod_series'])
                batch_size_dict[batch_size] = batch_size_dict.get(batch_size, 0) + 1
            except tf.errors.OutOfRangeError:
                break
    
    

    Running above code in a pyhon3 shell, an error shall be thrown:

    F tensorflow/core/framework/tensor.cc:833] Check failed: limit <= dim0_size (8194 vs. 8193)
    

    Willing to contribute

    Yes

    bug 
    opened by liurcme 2
  • Bump tensorflow from 1.15.5 to 2.5.3 in /docs

    Bump tensorflow from 1.15.5 to 2.5.3 in /docs

    Bumps tensorflow from 1.15.5 to 2.5.3.

    Release notes

    Sourced from tensorflow's releases.

    TensorFlow 2.5.3

    Release 2.5.3

    Note: This is the last release in the 2.5 series.

    This releases introduces several vulnerability fixes:

    • Fixes a floating point division by 0 when executing convolution operators (CVE-2022-21725)
    • Fixes a heap OOB read in shape inference for ReverseSequence (CVE-2022-21728)
    • Fixes a heap OOB access in Dequantize (CVE-2022-21726)
    • Fixes an integer overflow in shape inference for Dequantize (CVE-2022-21727)
    • Fixes a heap OOB access in FractionalAvgPoolGrad (CVE-2022-21730)
    • Fixes an overflow and divide by zero in UnravelIndex (CVE-2022-21729)
    • Fixes a type confusion in shape inference for ConcatV2 (CVE-2022-21731)
    • Fixes an OOM in ThreadPoolHandle (CVE-2022-21732)
    • Fixes an OOM due to integer overflow in StringNGrams (CVE-2022-21733)
    • Fixes more issues caused by incomplete validation in boosted trees code (CVE-2021-41208)
    • Fixes an integer overflows in most sparse component-wise ops (CVE-2022-23567)
    • Fixes an integer overflows in AddManySparseToTensorsMap (CVE-2022-23568)
    • Fixes a number of CHECK-failures in MapStage (CVE-2022-21734)
    • Fixes a division by zero in FractionalMaxPool (CVE-2022-21735)
    • Fixes a number of CHECK-fails when building invalid/overflowing tensor shapes (CVE-2022-23569)
    • Fixes an undefined behavior in SparseTensorSliceDataset (CVE-2022-21736)
    • Fixes an assertion failure based denial of service via faulty bin count operations (CVE-2022-21737)
    • Fixes a reference binding to null pointer in QuantizedMaxPool (CVE-2022-21739)
    • Fixes an integer overflow leading to crash in SparseCountSparseOutput (CVE-2022-21738)
    • Fixes a heap overflow in SparseCountSparseOutput (CVE-2022-21740)
    • Fixes an FPE in BiasAndClamp in TFLite (CVE-2022-23557)
    • Fixes an FPE in depthwise convolutions in TFLite (CVE-2022-21741)
    • Fixes an integer overflow in TFLite array creation (CVE-2022-23558)
    • Fixes an integer overflow in TFLite (CVE-2022-23559)
    • Fixes a dangerous OOB write in TFLite (CVE-2022-23561)
    • Fixes a vulnerability leading to read and write outside of bounds in TFLite (CVE-2022-23560)
    • Fixes a set of vulnerabilities caused by using insecure temporary files (CVE-2022-23563)
    • Fixes an integer overflow in Range resulting in undefined behavior and OOM (CVE-2022-23562)
    • Fixes a vulnerability where missing validation causes tf.sparse.split to crash when axis is a tuple (CVE-2021-41206)
    • Fixes a CHECK-fail when decoding resource handles from proto (CVE-2022-23564)
    • Fixes a CHECK-fail with repeated AttrDef (CVE-2022-23565)
    • Fixes a heap OOB write in Grappler (CVE-2022-23566)
    • Fixes a CHECK-fail when decoding invalid tensors from proto (CVE-2022-23571)
    • Fixes an unitialized variable access in AssignOp (CVE-2022-23573)
    • Fixes an integer overflow in OpLevelCostEstimator::CalculateTensorSize (CVE-2022-23575)
    • Fixes an integer overflow in OpLevelCostEstimator::CalculateOutputSize (CVE-2022-23576)
    • Fixes a null dereference in GetInitOp (CVE-2022-23577)
    • Fixes a memory leak when a graph node is invalid (CVE-2022-23578)
    • Fixes an abort caused by allocating a vector that is too large (CVE-2022-23580)
    • Fixes multiple CHECK-failures during Grappler's IsSimplifiableReshape (CVE-2022-23581)
    • Fixes multiple CHECK-failures during Grappler's SafeToRemoveIdentity (CVE-2022-23579)
    • Fixes multiple CHECK-failures in TensorByteSize (CVE-2022-23582)
    • Fixes multiple CHECK-failures in binary ops due to type confusion (CVE-2022-23583)

    ... (truncated)

    Changelog

    Sourced from tensorflow's changelog.

    Release 2.5.3

    This releases introduces several vulnerability fixes:

    • Fixes a floating point division by 0 when executing convolution operators (CVE-2022-21725)
    • Fixes a heap OOB read in shape inference for ReverseSequence (CVE-2022-21728)
    • Fixes a heap OOB access in Dequantize (CVE-2022-21726)
    • Fixes an integer overflow in shape inference for Dequantize (CVE-2022-21727)
    • Fixes a heap OOB access in FractionalAvgPoolGrad (CVE-2022-21730)
    • Fixes an overflow and divide by zero in UnravelIndex (CVE-2022-21729)
    • Fixes a type confusion in shape inference for ConcatV2 (CVE-2022-21731)
    • Fixes an OOM in ThreadPoolHandle (CVE-2022-21732)
    • Fixes an OOM due to integer overflow in StringNGrams (CVE-2022-21733)
    • Fixes more issues caused by incomplete validation in boosted trees code (CVE-2021-41208)
    • Fixes an integer overflows in most sparse component-wise ops (CVE-2022-23567)
    • Fixes an integer overflows in AddManySparseToTensorsMap (CVE-2022-23568)
    • Fixes a number of CHECK-failures in MapStage (CVE-2022-21734)
    • Fixes a division by zero in FractionalMaxPool (CVE-2022-21735)
    • Fixes a number of CHECK-fails when building invalid/overflowing tensor shapes (CVE-2022-23569)
    • Fixes an undefined behavior in SparseTensorSliceDataset (CVE-2022-21736)
    • Fixes an assertion failure based denial of service via faulty bin count operations (CVE-2022-21737)
    • Fixes a reference binding to null pointer in QuantizedMaxPool (CVE-2022-21739)
    • Fixes an integer overflow leading to crash in SparseCountSparseOutput (CVE-2022-21738)
    • Fixes a heap overflow in SparseCountSparseOutput (CVE-2022-21740)
    • Fixes an FPE in BiasAndClamp in TFLite (CVE-2022-23557)
    • Fixes an FPE in depthwise convolutions in TFLite (CVE-2022-21741)

    ... (truncated)

    Commits
    • 959e9b2 Merge pull request #54213 from tensorflow/fix-sanity-on-r2.5
    • d05fcbc Fix sanity build
    • f2526a0 Merge pull request #54205 from tensorflow/disable-flaky-tests-on-r2.5
    • a5f94df Disable flaky test
    • 7babe52 Merge pull request #54201 from tensorflow/cherrypick-510ae18200d0a4fad797c0bf...
    • 0e5d378 Set Env Variable to override Setuptools new behavior
    • fdd4195 Merge pull request #54176 from tensorflow-jenkins/relnotes-2.5.3-6805
    • 4083165 Update RELEASE.md
    • a2bb7f1 Merge pull request #54185 from tensorflow/cherrypick-d437dec4d549fc30f9b85c75...
    • 5777ea3 Update third_party/icu/workspace.bzl
    • Additional commits viewable in compare view

    Dependabot compatibility score

    Dependabot will resolve any conflicts with this PR as long as you don't alter it yourself. You can also trigger a rebase manually by commenting @dependabot rebase.


    Dependabot commands and options

    You can trigger Dependabot actions by commenting on this PR:

    • @dependabot rebase will rebase this PR
    • @dependabot recreate will recreate this PR, overwriting any edits that have been made to it
    • @dependabot merge will merge this PR after your CI passes on it
    • @dependabot squash and merge will squash and merge this PR after your CI passes on it
    • @dependabot cancel merge will cancel a previously requested merge and block automerging
    • @dependabot reopen will reopen this PR if it is closed
    • @dependabot close will close this PR and stop Dependabot recreating it. You can achieve the same result by closing it manually
    • @dependabot ignore this major version will close this PR and stop Dependabot creating any more for this major version (unless you reopen the PR or upgrade to it yourself)
    • @dependabot ignore this minor version will close this PR and stop Dependabot creating any more for this minor version (unless you reopen the PR or upgrade to it yourself)
    • @dependabot ignore this dependency will close this PR and stop Dependabot creating any more for this dependency (unless you reopen the PR or upgrade to it yourself)
    • @dependabot use these labels will set the current labels as the default for future PRs for this repo and language
    • @dependabot use these reviewers will set the current reviewers as the default for future PRs for this repo and language
    • @dependabot use these assignees will set the current assignees as the default for future PRs for this repo and language
    • @dependabot use this milestone will set the current milestone as the default for future PRs for this repo and language

    You can disable automated security fix PRs for this repo from the Security Alerts page.

    dependencies 
    opened by dependabot[bot] 2
  • Question: When to release the code?

    Question: When to release the code?

    Hi, I saw the code is not public according to the architecture doc here https://hybridbackend.readthedocs.io/en/latest/architecture.html

    Do you have a plan to open source it? Or is it just focused on data io?

    I'd appreciate it if anyone could help me.

    Thanks :clinking_glasses: :beers:

    opened by gaocegege 2
  • tf.keras.layers.DenseFeatures  api as the candidate of hb.feature_column.DenseFeatures can not work with tf.feature_column.shared_embedding_columns

    tf.keras.layers.DenseFeatures api as the candidate of hb.feature_column.DenseFeatures can not work with tf.feature_column.shared_embedding_columns

    Current behavior

    HB version: HybridBackend 0.7.0-e277c15f3843f98901f0795bc9b7d0768056d5a3; tf1.15.5-v1.15.5+nv22.06-0-g55be3962f8; g++ 7.5.0; CUDA 11.4 (70,75,80,86) new hb package removes the hb.feature_column.DenseFeatures api. However, tf.keras.layers.DenseFeatures api can not deal with tf.feature_column.shared_embedding_columns.

    Expected behavior

    I want a new way to run the code successfully without hb.feature_column.DenseFeatures.

    System information

    • GPU model and memory:
    • OS Platform:
    • Docker version:
    • GCC/CUDA/cuDNN version:
    • Python/conda version:
    • TensorFlow/PyTorch version:

    Code to reproduce

    Willing to contribute

    Yes

    opened by taoyun951753 1
  • hb.keras.Model's fit() func support dataset multiple labels

    hb.keras.Model's fit() func support dataset multiple labels

    User Story

    I want to train model using multiple labels data, but fit function throw Exception like Error when checking model target:expected no data....

    When the dataset only contain one label, It's OK.

    Detailed requirements

    • The dataset like this,the labels maybe tuple or dict:
    ds = hb.data.ParquetDataset(XXX)
    def map_fn(batch):
        labels = tuple([batch[l] for l in labels])
        features = {}
        #pass
        return features, labels
    ds = ds.map(map_fn)
    
    • The fit() like this:
    m.fit(
        x=train_ds,
        validation_data=valid_ds,
        #XXX
        verbose=0)
    

    I wish fit() can support dataset like above.

    API Compatibility

    hb.keras.Model's fit()

    Willing to contribute

    Yes

    opened by karterotte 1
  • support keras fit history in estimator's train_and_evaluate

    support keras fit history in estimator's train_and_evaluate

    User Story

    I want to hold a record of the loss values and metric values during training, like keras History object: https://www.tensorflow.org/api_docs/python/tf/keras/callbacks/History https://keras.io/guides/training_with_built_in_methods/ image

    Detailed requirements

    I have to decide saving or not models depends on it's metrics(maybe lastest one).

    API Compatibility

    hb.estimator.train_and_evaluate

    Willing to contribute

    Yes

    enhancement 
    opened by karterotte 1
  • How to realize gradient truncation function in HB Pkg

    How to realize gradient truncation function in HB Pkg

    Current behavior

    Recently, when I trained the model, the loss function got a Nan value. I want to perform gradient truncation. However HB can only use tf.train.XxxOptimizer api, which is used with computes_ gradients, tf.clip_ by_ value(), tf.clip_ by_ Norm() . But now HB uses tf.keras compile, fit mode. The community tf.keras.optimizers Adam () supports the clipvalue/clipnorm parameter .

    Expected behavior

    I want the HB pkg can support the gradient truncation function.

    System information

    • GPU model and memory:
    • OS Platform:
    • Docker version:
    • GCC/CUDA/cuDNN version:
    • Python/conda version:
    • TensorFlow/PyTorch version:

    Code to reproduce

    Willing to contribute

    Yes

    opened by taoyun951753 0
  • feature_column bucket_size is 6, use 8 gpus,  then worker-5 and worker-6 'save/RestoreV2' failed

    feature_column bucket_size is 6, use 8 gpus, then worker-5 and worker-6 'save/RestoreV2' failed

    feature_column bucket_size is 6, use 8 gpus, then worker-5 and worker-6 'save/RestoreV2' failed; backtrace: Traceback (most recent call last): File "neg_feedback_multi.py", line 1252, in tf.app.run() File "/home/pai/lib/python3.6/site-packages/tensorflow_core/python/platform/app.py", line 40, in run _run(main=main, argv=argv, flags_parser=_parse_flags_tolerate_undef) File "/home/pai/lib/python3.6/site-packages/absl/app.py", line 308, in run _run_main(main, args) File "/home/pai/lib/python3.6/site-packages/absl/app.py", line 254, in _run_main sys.exit(main(argv)) File "neg_feedback_multi.py", line 1235, in main model.run() File "neg_feedback_multi.py", line 1227, in run classifier.train_and_evaluate(train_spec, eval_spec) File "/home/pai/lib/python3.6/site-packages/hybridbackend/tensorflow/estimator/estimator.py", line 276, in train_and_evaluate return executor.run() File "/home/pai/lib/python3.6/site-packages/tensorflow_estimator/python/estimator/training.py", line 640, in run getattr(self, task_to_run)() File "/home/pai/lib/python3.6/site-packages/tensorflow_estimator/python/estimator/training.py", line 650, in run_worker return self._start_distributed_training() File "/home/pai/lib/python3.6/site-packages/tensorflow_estimator/python/estimator/training.py", line 796, in _start_distributed_training saving_listeners=saving_listeners) File "/home/pai/lib/python3.6/site-packages/hybridbackend/tensorflow/estimator/estimator.py", line 188, in train saving_listeners=saving_listeners) File "/home/pai/lib/python3.6/site-packages/tensorflow_estimator/python/estimator/estimator.py", line 370, in train loss = self._train_model(input_fn, hooks, saving_listeners) File "/home/pai/lib/python3.6/site-packages/tensorflow_estimator/python/estimator/estimator.py", line 1161, in _train_model return self._train_model_default(input_fn, hooks, saving_listeners) File "/home/pai/lib/python3.6/site-packages/tensorflow_estimator/python/estimator/estimator.py", line 1195, in _train_model_default saving_listeners) File "/home/pai/lib/python3.6/site-packages/tensorflow_estimator/python/estimator/estimator.py", line 1490, in _train_with_estimator_spec log_step_count_steps=log_step_count_steps) as mon_sess: File "/home/pai/lib/python3.6/site-packages/hybridbackend/tensorflow/training/session.py", line 131, in HybridBackendMonitoredTrainingSession sess = fn(*args, **kwargs) File "/home/pai/lib/python3.6/site-packages/tensorflow_core/python/training/monitored_session.py", line 678, in MonitoredTrainingSession stop_grace_period_secs=stop_grace_period_secs) File "/home/pai/lib/python3.6/site-packages/hybridbackend/tensorflow/training/session.py", line 64, in init session_creator, hooks, should_recover=True, **kwargs) File "/home/pai/lib/python3.6/site-packages/tensorflow_core/python/training/monitored_session.py", line 827, in init self._sess = _RecoverableSession(self._coordinated_creator) File "/home/pai/lib/python3.6/site-packages/tensorflow_core/python/training/monitored_session.py", line 1309, in init _WrappedSession.init(self, self._create_session()) File "/home/pai/lib/python3.6/site-packages/tensorflow_core/python/training/monitored_session.py", line 1314, in _create_session return self._sess_creator.create_session() File "/home/pai/lib/python3.6/site-packages/tensorflow_core/python/training/monitored_session.py", line 980, in create_session self.tf_sess = self._session_creator.create_session() File "/home/pai/lib/python3.6/site-packages/tensorflow_core/python/training/monitored_session.py", line 733, in create_session self._scaffold.finalize() File "/home/pai/lib/python3.6/site-packages/tensorflow_core/python/training/monitored_session.py", line 252, in finalize self._saver.build() File "/home/pai/lib/python3.6/site-packages/tensorflow_core/python/training/saver.py", line 1059, in build self._build(self._filename, build_save=True, build_restore=True) File "/home/pai/lib/python3.6/site-packages/hybridbackend/tensorflow/training/saver.py", line 258, in _build super()._build(*args, **kwargs) File "/home/pai/lib/python3.6/site-packages/tensorflow_core/python/training/saver.py", line 1137, in _build build_restore=build_restore) File "/home/pai/lib/python3.6/site-packages/tensorflow_core/python/training/saver.py", line 660, in _build_internal restore_sequentially, reshape) File "/home/pai/lib/python3.6/site-packages/hybridbackend/tensorflow/training/saver.py", line 200, in _AddShardedRestoreOps filename_tensor, per_device, restore_sequentially, reshape) File "/home/pai/lib/python3.6/site-packages/tensorflow_core/python/training/saver.py", line 536, in _AddShardedRestoreOps name="restore_shard")) File "/home/pai/lib/python3.6/site-packages/tensorflow_core/python/training/saver.py", line 476, in _AddRestoreOps restore_sequentially) File "/home/pai/lib/python3.6/site-packages/tensorflow_core/python/training/saver.py", line 744, in bulk_restore return io_ops.restore_v2(filename_tensor, names, slices, dtypes) File "/home/pai/lib/python3.6/site-packages/tensorflow_core/python/ops/gen_io_ops.py", line 2380, in restore_v2 name=name) File "/home/pai/lib/python3.6/site-packages/tensorflow_core/python/framework/op_def_library.py", line 794, in _apply_op_helper op_def=op_def) File "/home/pai/lib/python3.6/site-packages/tensorflow_core/python/util/deprecation.py", line 507, in new_func return func(*args, **kwargs) File "/home/pai/lib/python3.6/site-packages/tensorflow_core/python/framework/ops.py", line 3360, in create_op attrs, op_def, compute_device) File "/home/pai/lib/python3.6/site-packages/tensorflow_core/python/framework/ops.py", line 3429, in _create_op_internal op_def=op_def) File "/home/pai/lib/python3.6/site-packages/tensorflow_core/python/framework/ops.py", line 1773, in init control_input_ops) File "/home/pai/lib/python3.6/site-packages/tensorflow_core/python/framework/ops.py", line 1613, in _create_c_op raise ValueError(str(e)) ValueError: Expected non-negative start and positive length but got start = 6, length = 0: string = 6,0:0,10 for 'save/RestoreV2' (op: 'RestoreV2') with input shapes: [], [382], [382] and with computed input tensors: input[2] = <144150 23 108114,18018:0,23 144150 23 108114,18018:0,23 195

    opened by zhbhhb 0
  • the EarlyStopping callback not working well on multi worker distribute training job

    the EarlyStopping callback not working well on multi worker distribute training job

    Current behavior

    If there is only one worker ,training with EarlyStopping callback is ok. When multi workers with EarlyStopping callback doing distribute training, all workers will be hanging and waiting for synchronizing.

    09D96DCB-F298-4941-8C85-CDB56A5C0ABB

    Expected behavior

    I want the EarlyStopping callback works well not only on one worker task but also on multi workers distribute training job.

    System information

    • GPU model and memory:
    • OS Platform:
    • Docker version:
    • GCC/CUDA/cuDNN version:
    • Python/conda version:
    • TensorFlow/PyTorch version:

    Code to reproduce

    .... callbacks_list.append(EarlyStopping(monitor="val_loss", min_delta=self.ctx.min_delta, patience=self.ctx.patience, verbose=verbose, mode="min", baseline=None, restore_best_weights=True) )

    ....

    keras_model.fit( x=None, y=None, validation_data=valid_ds, steps_per_epoch=self.ctx.steps_per_epoch, validation_steps=self.ctx.valid_steps_per_epoch, epochs=self.ctx.callback_num, callbacks=callbacks_list, checkpoint_dir=self.ctx.model_save_path, keep_checkpoint_max=1, verbose=0)

    Willing to contribute

    Yes

    opened by taoyun951753 0
  • Dataset iterator can't be warpped in the hybridBackend scope

    Dataset iterator can't be warpped in the hybridBackend scope

    Current behavior

    I am using hybridBackend to do data parallelism, I create a dataset and make it an iterator, when I use hybridBackend scope to wrap the whole pipeline, an exception occurred after the iterator step, here is the error log:

    Traceback (most recent call last):
      File "/usr/local/lib/python3.6/dist-packages/tensorflow_core/python/framework/tensor_util.py", line 324, in _AssertCompatible
        fn(values)
      File "/usr/local/lib/python3.6/dist-packages/tensorflow_core/python/framework/tensor_util.py", line 276, in _check_not_tensor
        _ = [_check_failed(v) for v in nest.flatten(values)
      File "/usr/local/lib/python3.6/dist-packages/tensorflow_core/python/framework/tensor_util.py", line 277, in <listcomp>
        if isinstance(v, ops.Tensor)]
      File "/usr/local/lib/python3.6/dist-packages/tensorflow_core/python/framework/tensor_util.py", line 248, in _check_failed
        raise ValueError(v)
    ValueError: Tensor("Iterator_1/Identity:0", shape=(?,), dtype=int64, device=/job:chief/task:0/device:GPU:0)
    
    During handling of the above exception, another exception occurred:
    
    Traceback (most recent call last):
      File "demo.py", line 332, in <module>
        app.run(runner)
      File "/usr/local/lib/python3.6/dist-packages/absl/app.py", line 308, in run
        _run_main(main, args)
      File "/usr/local/lib/python3.6/dist-packages/absl/app.py", line 254, in _run_main
        sys.exit(main(argv))
      File "demo.py", line 213, in runner
        features, labels = datasource.iter.get_next()
      File "/usr/local/lib/python3.6/dist-packages/hybridbackend/tensorflow/data/iterators.py", line 120, in get_next
        DataSyncRewriting.accept(should_stop)
      File "/usr/local/lib/python3.6/dist-packages/hybridbackend/tensorflow/data/iterators.py", line 169, in accept
        should_stop = math_ops.cast(should_stop, dtypes.int32)
      File "/usr/local/lib/python3.6/dist-packages/tensorflow_core/python/util/dispatch.py", line 180, in wrapper
        return target(*args, **kwargs)
      File "/usr/local/lib/python3.6/dist-packages/tensorflow_core/python/ops/math_ops.py", line 702, in cast
        x = ops.convert_to_tensor(x, name="x")
      File "/usr/local/lib/python3.6/dist-packages/tensorflow_core/python/framework/ops.py", line 1184, in convert_to_tensor
        return convert_to_tensor_v2(value, dtype, preferred_dtype, name)
      File "/usr/local/lib/python3.6/dist-packages/tensorflow_core/python/framework/ops.py", line 1242, in convert_to_tensor_v2
        as_ref=False)
      File "/usr/local/lib/python3.6/dist-packages/tensorflow_core/python/framework/ops.py", line 1297, in internal_convert_to_tensor
        ret = conversion_func(value, dtype=dtype, name=name, as_ref=as_ref)
      File "/usr/local/lib/python3.6/dist-packages/tensorflow_core/python/framework/constant_op.py", line 286, in _constant_tensor_conversion_function
        return constant(v, dtype=dtype, name=name)
      File "/usr/local/lib/python3.6/dist-packages/tensorflow_core/python/framework/constant_op.py", line 227, in constant
        allow_broadcast=True)
      File "/usr/local/lib/python3.6/dist-packages/tensorflow_core/python/framework/constant_op.py", line 265, in _constant_impl
        allow_broadcast=allow_broadcast))
      File "/usr/local/lib/python3.6/dist-packages/tensorflow_core/python/framework/tensor_util.py", line 449, in make_tensor_proto
        _AssertCompatible(values, dtype)
      File "/usr/local/lib/python3.6/dist-packages/tensorflow_core/python/framework/tensor_util.py", line 328, in _AssertCompatible
        raise TypeError("List of Tensors when single Tensor expected")
    TypeError: List of Tensors when single Tensor expected
    

    Expected behavior

    System information

    • GPU model and memory: Tesla P100
    • OS Platform: Ubuntu 18.04
    • Docker version: Docker Engine - Community Version: 20.10.14
    • GCC/CUDA/cuDNN version:
    • Python/conda version: Python 3.6.9
    • TensorFlow/PyTorch version: TensorFlow:DeepRec2208

    Code to reproduce

    import numpy as np
    import pandas as pd
    
    new_dtypes = {"uid": np.int64, "packagename": np.int64, "label_play": np.float64}
    
    train_df = pd.DataFrame(np.random.randint(0, 100, (5, 3)), columns=['uid', 'packagename', 'label_play'])
    train_df = train_df.astype(new_dtypes)
    train_df.to_parquet('train.parquet')
    
    import tensorflow as tf
    import hybridbackend.tensorflow as hb
    from hybridbackend.tensorflow.data import ParquetDataset
    from tensorflow.python.data.ops import dataset_ops
    from tensorflow.python.data.experimental.ops.dataframe import to_sparse
    
    
    
    def parquet_map(record):
        for key in record:
            record[key] = tf.reshape(record[key], [-1])
        label = record.pop("label_play")
        return record, label
    
    
    # Create model
    def neural_net(features):
        with tf.device("/CPU:0"):
            var = tf.get_embedding_variable(
                "var_0",
                embedding_dim=3,
                initializer=tf.ones_initializer(tf.float32),
                partitioner=tf.fixed_size_partitioner(num_shards=4),
            )
    
        emb = tf.nn.embedding_lookup(var, features["uid"])
        fun = tf.multiply(emb, 2.0, name="multiply")
        loss = tf.reduce_sum(fun, name="reduce_sum")
        opt = tf.train.AdagradOptimizer(0.1)
    
        g_v = opt.compute_gradients(loss)
        train_op = opt.apply_gradients(g_v)
        return train_op, loss
    
    
    with hb.scope():
        with tf.device("/cpu:0"):
            dataset = tf.data.Dataset.list_files(["train.parquet"])
            dataset = dataset.apply(
                tf.data.experimental.parallel_interleave(
                    lambda tmp_file: ParquetDataset(
                        tmp_file,
                        drop_remainder=True,
                        batch_size=2,
                        num_parallel_reads=1,
                        fields=[
                            hb.data.DataFrame.Field("uid", tf.int64, ragged_rank=0),
                            hb.data.DataFrame.Field("packagename", tf.int64, ragged_rank=0),
                            hb.data.DataFrame.Field("label_play", tf.float64, ragged_rank=0),
                        ],
                    ).apply(
                        to_sparse()
                    ),
                    cycle_length=1,
                    block_length=1,
                )
            )
            dataset = dataset.batch(2, drop_remainder=True,).map(
                map_func=parquet_map,
                num_parallel_calls=dataset_ops.AUTOTUNE,
            )
        
        iterator = dataset.make_one_shot_iterator()
        # iterator = tf.data.make_one_shot_iterator(dataset)
        features, labels = iterator.get_next()
    
        train_op, loss = neural_net(features)
    
        scaffold = tf.train.Scaffold(
            init_op=tf.group(
                tf.global_variables_initializer(),
            ),
        )
    
        with tf.train.MonitoredTrainingSession(
            master="", scaffold=scaffold) as mon_sess:
            while not mon_sess.should_stop():
                _, ev = mon_sess.run([train_op, loss])
                print(ev)
    
    

    Willing to contribute

    Yes

    opened by fuhailin 0
  • error: Variables not initialized: communicator/1/HbNcclCommHandleOp

    error: Variables not initialized: communicator/1/HbNcclCommHandleOp

    Current behavior

    2022-10-19 12:39:39.948019: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcudart.so.11.0
    2022-10-19 12:39:39.948020: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcudart.so.11.0
    INFO:tensorflow:Parsing ../data//train.csv
    INFO:tensorflow:Parsing ../data//train.csv
    WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
    WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
    WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
    WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
    WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
    WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
    WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
    WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
    WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
    WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
    WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
    WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
    WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
    WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
    WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
    WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
    WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
    WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
    WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
    WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
    WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
    WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
    WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
    WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
    WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
    WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
    WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
    WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
    WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
    WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
    WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
    WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
    WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
    WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
    WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
    WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
    WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
    WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
    WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
    WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
    WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
    WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
    WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
    WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
    WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
    WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
    WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
    WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
    WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
    WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
    WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
    WARNING:tensorflow:The default value of combiner will change from "mean" to "sqrtn" after 2016/11/01.
    INFO:tensorflow:Aggregate 12 dense gradients (33.35MB) and 0 sparse gradients (0.00MB), skip 26 aggregated gradients
    INFO:tensorflow:Aggregate 12 dense gradients (33.35MB) and 0 sparse gradients (0.00MB), skip 26 aggregated gradients
    2022-10-19 12:39:43.135528: I tensorflow/core/platform/profile_utils/cpu_utils.cc:94] CPU Frequency: 3792945000 Hz
    2022-10-19 12:39:43.136209: I tensorflow/compiler/xla/service/service.cc:168] XLA service 0x53d08e0 initialized for platform Host (this does not guarantee that XLA will be used). Devices:
    2022-10-19 12:39:43.136227: I tensorflow/compiler/xla/service/service.cc:176]   StreamExecutor device (0): Host, Default Version
    2022-10-19 12:39:43.137613: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcuda.so.1
    2022-10-19 12:39:43.147796: I tensorflow/core/platform/profile_utils/cpu_utils.cc:94] CPU Frequency: 3792945000 Hz
    2022-10-19 12:39:43.148600: I tensorflow/compiler/xla/service/service.cc:168] XLA service 0x4190950 initialized for platform Host (this does not guarantee that XLA will be used). Devices:
    2022-10-19 12:39:43.148638: I tensorflow/compiler/xla/service/service.cc:176]   StreamExecutor device (0): Host, Default Version
    2022-10-19 12:39:43.150144: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcuda.so.1
    2022-10-19 12:39:43.263791: I tensorflow/stream_executor/cuda/cuda_driver.cc:404] Cuda add device primary context 0x6a91880
    2022-10-19 12:39:43.263977: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:1092] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
    2022-10-19 12:39:43.264217: I tensorflow/compiler/xla/service/service.cc:168] XLA service 0x6a644c0 initialized for platform CUDA (this does not guarantee that XLA will be used). Devices:
    2022-10-19 12:39:43.264241: I tensorflow/compiler/xla/service/service.cc:176]   StreamExecutor device (0): NVIDIA GeForce RTX 2080 Ti, Compute Capability 7.5
    2022-10-19 12:39:43.264400: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:1092] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
    2022-10-19 12:39:43.264809: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1687] Found device 0 with properties: 
    name: NVIDIA GeForce RTX 2080 Ti major: 7 minor: 5 memoryClockRate(GHz): 1.635
    pciBusID: 0000:08:00.0
    2022-10-19 12:39:43.264837: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcudart.so.11.0
    2022-10-19 12:39:43.267560: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcublas.so.11
    2022-10-19 12:39:43.267587: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcublasLt.so.11
    2022-10-19 12:39:43.272210: I tensorflow/stream_executor/cuda/cuda_driver.cc:404] Cuda add device primary context 0x51f3c70
    2022-10-19 12:39:43.272373: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:1092] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
    2022-10-19 12:39:43.272643: I tensorflow/compiler/xla/service/service.cc:168] XLA service 0x51bf3f0 initialized for platform CUDA (this does not guarantee that XLA will be used). Devices:
    2022-10-19 12:39:43.272666: I tensorflow/compiler/xla/service/service.cc:176]   StreamExecutor device (0): NVIDIA GeForce RTX 2080 Ti, Compute Capability 7.5
    2022-10-19 12:39:43.272784: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:1092] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
    2022-10-19 12:39:43.272986: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1687] Found device 0 with properties: 
    name: NVIDIA GeForce RTX 2080 Ti major: 7 minor: 5 memoryClockRate(GHz): 1.635
    pciBusID: 0000:07:00.0
    2022-10-19 12:39:43.273007: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcudart.so.11.0
    2022-10-19 12:39:43.275711: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcublas.so.11
    2022-10-19 12:39:43.275737: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcublasLt.so.11
    2022-10-19 12:39:43.288994: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcufft.so.10
    2022-10-19 12:39:43.289162: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcurand.so.10
    2022-10-19 12:39:43.289498: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcusolver.so.11
    2022-10-19 12:39:43.290069: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcusparse.so.11
    2022-10-19 12:39:43.290150: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcudnn.so.8
    2022-10-19 12:39:43.290231: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:1092] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
    2022-10-19 12:39:43.290471: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:1092] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
    2022-10-19 12:39:43.290643: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1815] Adding visible gpu devices: 0
    2022-10-19 12:39:43.292542: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1170] Device interconnect StreamExecutor with strength 1 edge matrix:
    2022-10-19 12:39:43.292558: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1176]      0 
    2022-10-19 12:39:43.292564: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1189] 0:   N 
    2022-10-19 12:39:43.292640: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:1092] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
    2022-10-19 12:39:43.292843: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:1092] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
    2022-10-19 12:39:43.293058: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1372] Created TensorFlow device (/job:worker/replica:0/task:0/device:GPU:0 with 9793 MB memory) -> physical GPU (device: 0, name: NVIDIA GeForce RTX 2080 Ti, pci bus id: 0000:08:00.0, compute capability: 7.5)
    2022-10-19 12:39:43.294188: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:258] Initialize GrpcChannelCache for job chief -> {0 -> 127.0.0.1:20001}
    2022-10-19 12:39:43.294199: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:258] Initialize GrpcChannelCache for job worker -> {0 -> localhost:20002}
    2022-10-19 12:39:43.294986: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:374] Started server with target: grpc://localhost:20002
    2022-10-19 12:39:43.297217: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcufft.so.10
    2022-10-19 12:39:43.297453: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcurand.so.10
    2022-10-19 12:39:43.297814: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcusolver.so.11
    2022-10-19 12:39:43.298428: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcusparse.so.11
    2022-10-19 12:39:43.298520: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcudnn.so.8
    2022-10-19 12:39:43.298607: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:1092] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
    2022-10-19 12:39:43.298845: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:1092] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
    2022-10-19 12:39:43.299025: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1815] Adding visible gpu devices: 0
    2022-10-19 12:39:43.301007: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1170] Device interconnect StreamExecutor with strength 1 edge matrix:
    2022-10-19 12:39:43.301024: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1176]      0 
    2022-10-19 12:39:43.301031: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1189] 0:   N 
    2022-10-19 12:39:43.301114: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:1092] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
    2022-10-19 12:39:43.301327: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:1092] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
    2022-10-19 12:39:43.301552: I tensorflow/core/common_runtime/gpu/gpu_device.cc:1372] Created TensorFlow device (/job:chief/replica:0/task:0/device:GPU:0 with 9729 MB memory) -> physical GPU (device: 0, name: NVIDIA GeForce RTX 2080 Ti, pci bus id: 0000:07:00.0, compute capability: 7.5)
    2022-10-19 12:39:43.302687: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:258] Initialize GrpcChannelCache for job chief -> {0 -> localhost:20001}
    2022-10-19 12:39:43.302705: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:258] Initialize GrpcChannelCache for job worker -> {0 -> 127.0.0.1:20002}
    2022-10-19 12:39:43.303434: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:374] Started server with target: grpc://localhost:20001
    INFO:tensorflow:Graph was finalized.
    INFO:tensorflow:run without loading checkpoint
    INFO:tensorflow:Graph was finalized.
    INFO:tensorflow:run without loading checkpoint
    INFO:tensorflow:Running local_init_op.
    INFO:tensorflow:Running local_init_op.
    INFO:tensorflow:Done running local_init_op.
    INFO:tensorflow:Done running local_init_op.
    Using TensorFlow version 1.15.5
    Checking dataset...
    Numbers of training dataset is 8000000
    The training steps is 100
    Traceback (most recent call last):
      File "benchmark_hb.py", line 405, in <module>
        main()
      File "/usr/local/lib/python3.6/dist-packages/hybridbackend/tensorflow/training/function.py", line 144, in wrapped_fn
        return fn(*args, **kwargs)
      File "benchmark_hb.py", line 339, in main
        config=sess_config) as sess:
      File "/usr/local/lib/python3.6/dist-packages/hybridbackend/tensorflow/training/session.py", line 174, in HybridBackendMonitoredTrainingSession
        sess = fn(*args, **kwargs)
      File "/usr/local/lib/python3.6/dist-packages/tensorflow_core/python/training/monitored_session.py", line 633, in MonitoredTrainingSession
        stop_grace_period_secs=stop_grace_period_secs)
      File "/usr/local/lib/python3.6/dist-packages/hybridbackend/tensorflow/training/session.py", line 69, in __init__
        session_creator, hooks, should_recover=True, **kwargs)
      File "/usr/local/lib/python3.6/dist-packages/tensorflow_core/python/training/monitored_session.py", line 775, in __init__
        self._sess = _RecoverableSession(self._coordinated_creator)
      File "/usr/local/lib/python3.6/dist-packages/tensorflow_core/python/training/monitored_session.py", line 1257, in __init__
        _WrappedSession.__init__(self, self._create_session())
      File "/usr/local/lib/python3.6/dist-packages/tensorflow_core/python/training/monitored_session.py", line 1262, in _create_session
        return self._sess_creator.create_session()
      File "/usr/local/lib/python3.6/dist-packages/tensorflow_core/python/training/monitored_session.py", line 928, in create_session
        self.tf_sess = self._session_creator.create_session()
      File "/usr/local/lib/python3.6/dist-packages/tensorflow_core/python/training/monitored_session.py", line 697, in create_session
        init_fn=self._scaffold.init_fn)
      File "/usr/local/lib/python3.6/dist-packages/tensorflow_core/python/training/session_manager.py", line 323, in prepare_session
        (_maybe_name(init_op), init_fn, self._local_init_op, msg))
    RuntimeError: Init operations did not make model ready.  Init op: group_deps_2, init fn: None, local_init_op: name: "group_deps_1"
    op: "NoOp"
    input: "^group_deps_1/NoOp"
    input: "^group_deps_1/NoOp_1"
    device: "/job:chief/task:0/device:GPU:0"
    , error: Variables not initialized: communicator/0/HbNcclCommHandleOp
    Using TensorFlow version 1.15.5
    Checking dataset...
    Numbers of training dataset is 8000000
    The training steps is 100
    Traceback (most recent call last):
      File "benchmark_hb.py", line 405, in <module>
        main()
      File "/usr/local/lib/python3.6/dist-packages/hybridbackend/tensorflow/training/function.py", line 144, in wrapped_fn
        return fn(*args, **kwargs)
      File "benchmark_hb.py", line 339, in main
        config=sess_config) as sess:
      File "/usr/local/lib/python3.6/dist-packages/hybridbackend/tensorflow/training/session.py", line 174, in HybridBackendMonitoredTrainingSession
        sess = fn(*args, **kwargs)
      File "/usr/local/lib/python3.6/dist-packages/tensorflow_core/python/training/monitored_session.py", line 633, in MonitoredTrainingSession
        stop_grace_period_secs=stop_grace_period_secs)
      File "/usr/local/lib/python3.6/dist-packages/hybridbackend/tensorflow/training/session.py", line 69, in __init__
        session_creator, hooks, should_recover=True, **kwargs)
      File "/usr/local/lib/python3.6/dist-packages/tensorflow_core/python/training/monitored_session.py", line 775, in __init__
        self._sess = _RecoverableSession(self._coordinated_creator)
      File "/usr/local/lib/python3.6/dist-packages/tensorflow_core/python/training/monitored_session.py", line 1257, in __init__
        _WrappedSession.__init__(self, self._create_session())
      File "/usr/local/lib/python3.6/dist-packages/tensorflow_core/python/training/monitored_session.py", line 1262, in _create_session
        return self._sess_creator.create_session()
      File "/usr/local/lib/python3.6/dist-packages/tensorflow_core/python/training/monitored_session.py", line 928, in create_session
        self.tf_sess = self._session_creator.create_session()
      File "/usr/local/lib/python3.6/dist-packages/tensorflow_core/python/training/monitored_session.py", line 697, in create_session
        init_fn=self._scaffold.init_fn)
      File "/usr/local/lib/python3.6/dist-packages/tensorflow_core/python/training/session_manager.py", line 323, in prepare_session
        (_maybe_name(init_op), init_fn, self._local_init_op, msg))
    RuntimeError: Init operations did not make model ready.  Init op: group_deps_2, init fn: None, local_init_op: name: "group_deps_1"
    op: "NoOp"
    input: "^group_deps_1/NoOp"
    input: "^group_deps_1/NoOp_1"
    device: "/job:worker/task:0/device:GPU:0"
    , error: Variables not initialized: communicator/1/HbNcclCommHandleOp
    

    Expected behavior

    code run well

    System information

    • GPU model and memory: 2080Ti
    • OS Platform:
    • Docker version:
    • GCC/CUDA/cuDNN version: cuda 11.4
    • Python/conda version:
    • TensorFlow/PyTorch version: DeepRec, commit message: 6bca2cc4e6acaca3766e0425b53bdd

    Code to reproduce

    1. Download the train dataset(in csv format) from https://storage.googleapis.com/dataset-uploader/criteo-kaggle/large_version/train.csv
    2. The training script
    # Copyright (c) 2022 Intel Corporation
    #
    # Licensed under the Apache License, Version 2.0 (the "License");
    # you may not use this file except in compliance with the License.
    # You may obtain a copy of the License at
    #
    #    http://www.apache.org/licenses/LICENSE-2.0
    #
    # Unless required by applicable law or agreed to in writing, software
    # distributed under the License is distributed on an "AS IS" BASIS,
    # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    # See the License for the specific language governing permissions and
    # limitations under the License.
    # ==============================================================================
    
    from tensorflow.python.framework import dtypes
    import numpy as np
    from ast import arg
    import time
    import argparse
    import tensorflow as tf
    import os
    import sys
    import math
    import collections
    from tensorflow.python.client import timeline
    import json
    
    from tensorflow.python.framework import sparse_tensor
    from tensorflow.python.feature_column import feature_column_v2 as fc
    from tensorflow.python.ops import partitioned_variables
    from tensorflow.python.framework import ops
    os.environ["TF_GPU_THREAD_MODE"] = "global"
    import hybridbackend.tensorflow as hb
    
    # Set to INFO for tracking training, default is WARN. ERROR for least messages
    tf.logging.set_verbosity(tf.logging.INFO)
    print("Using TensorFlow version %s" % (tf.__version__))
    
    # Definition of some constants
    CONTINUOUS_COLUMNS = ['I' + str(i) for i in range(1, 14)]  # 1-13 inclusive
    CATEGORICAL_COLUMNS = ['C' + str(i) for i in range(1, 27)]  # 1-26 inclusive
    LABEL_COLUMN = ['clicked']
    TRAIN_DATA_COLUMNS = LABEL_COLUMN + CONTINUOUS_COLUMNS + CATEGORICAL_COLUMNS
    FEATURE_COLUMNS = CONTINUOUS_COLUMNS + CATEGORICAL_COLUMNS
    HASH_BUCKET_SIZES = {
        'C1': 2500,
        'C2': 2000,
        'C3': 300000,
        'C4': 250000,
        'C5': 1000,
        'C6': 100,
        'C7': 20000,
        'C8': 4000,
        'C9': 20,
        'C10': 100000,
        'C11': 10000,
        'C12': 250000,
        'C13': 40000,
        'C14': 100,
        'C15': 100,
        'C16': 200000,
        'C17': 50,
        'C18': 10000,
        'C19': 4000,
        'C20': 20,
        'C21': 250000,
        'C22': 100,
        'C23': 100,
        'C24': 250000,
        'C25': 400,
        'C26': 100000
    }
    
    EMBEDDING_DIMENSIONS = {
        'C1': 64,
        'C2': 64,
        'C3': 128,
        'C4': 128,
        'C5': 64,
        'C6': 64,
        'C7': 64,
        'C8': 64,
        'C9': 64,
        'C10': 128,
        'C11': 64,
        'C12': 128,
        'C13': 64,
        'C14': 64,
        'C15': 64,
        'C16': 128,
        'C17': 64,
        'C18': 64,
        'C19': 64,
        'C20': 64,
        'C21': 128,
        'C22': 64,
        'C23': 64,
        'C24': 128,
        'C25': 64,
        'C26': 128
    }
    
    
    def transform_numeric(feature):
        r'''Transform numeric features.
        '''
        # Notes: Statistics of Kaggle's Criteo Dataset has been calculated in advance to save time.
        mins_list = [
            0.0, -3.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0
        ]
        range_list = [
            1539.0, 22069.0, 65535.0, 561.0, 2655388.0, 233523.0, 26297.0, 5106.0,
            24376.0, 9.0, 181.0, 1807.0, 6879.0
        ]
    
        def make_minmaxscaler(min, range):
            def minmaxscaler(col):
                return (col - min) / range
    
            return minmaxscaler
    
        numeric_list = []
    
        for column_name in CONTINUOUS_COLUMNS:
            normalizer_fn = None
            i = CONTINUOUS_COLUMNS.index(column_name)
            normalizer_fn = make_minmaxscaler(mins_list[i], range_list[i])
            numeric = normalizer_fn(feature[column_name])
            numeric_list.append(tf.reshape(numeric, shape=[-1, 1]))
        return numeric_list
    
    
    def transform_categorical(feature):
        r'''Transform categorical features.
        '''
        deep_features = []
        max_value = np.iinfo(dtypes.int64.as_numpy_dtype).max
    
        variables = []
        indices = []
        for column_name in CATEGORICAL_COLUMNS:
            ev_opt = tf.EmbeddingVariableOption(
                evict_option=None, filter_option=None)
            device_str = '/gpu'
            with tf.device(device_str), hb.scope(sharding=True):
                embedding_weights = tf.get_embedding_variable(
                    f'{column_name}_weight',
                    initializer=tf.random_normal_initializer(
                        mean=0.0, stddev=0.05
                    ),
                    embedding_dim=EMBEDDING_DIMENSIONS[column_name],
                    ev_option=ev_opt
                )
    
            category = tf.strings.to_hash_bucket_fast(
                feature[column_name], max_value)
            sparse_tensor = fc._to_sparse_input_and_drop_ignore_values(category)
            sparse_tensor = tf.sparse.reshape(sparse_tensor, (-1, 1))
            
            deep_features.append(tf.nn.embedding_lookup_sparse(
                embedding_weights, sparse_tensor, None))
            
            variables.append(embedding_weights)
            indices.append(sparse_tensor)
        return deep_features
    
    
    def stacked_dcn_v2(features, mlp_dims):
        r'''Stacked DCNv2.
    
        DCNv2: Improved Deep & Cross Network and Practical Lessons for Web-scale
        Learning to Rank Systems.
    
        See https://arxiv.org/abs/2008.13535 for more information.
        '''
        with tf.name_scope('cross'):
            cross_input = tf.concat(features, axis=-1)
            cross_input_shape = [-1, sum([f.shape[-1] for f in features])]
            cross_input = tf.reshape(cross_input, cross_input_shape)
            cross_input_sq = tf.layers.dense(
                cross_input, cross_input.shape[-1],
                activation=tf.nn.relu,
                kernel_initializer=tf.truncated_normal_initializer(),
                bias_initializer=tf.zeros_initializer())
            cross_output = cross_input * cross_input_sq + cross_input
            cross_output = tf.reshape(cross_output, [-1, cross_input.shape[1]])
            cross_output_dim = (len(features) * (len(features) + 1)) / 2
    
        with tf.name_scope('mlp'):
            prev_layer = cross_output
            prev_dim = cross_output_dim
            for i, d in enumerate(mlp_dims[:-1]):
                prev_layer = tf.layers.dense(
                    prev_layer, d,
                    activation=tf.nn.relu,
                    kernel_initializer=tf.random_normal_initializer(
                        mean=0.0,
                        stddev=math.sqrt(2.0 / (prev_dim + d))),
                    bias_initializer=tf.random_normal_initializer(
                        mean=0.0,
                        stddev=math.sqrt(1.0 / d)),
                    name=f'mlp_{i}')
                prev_dim = d
            return tf.layers.dense(
                prev_layer, mlp_dims[-1],
                activation=tf.nn.sigmoid,
                kernel_initializer=tf.random_normal_initializer(
                    mean=0.0,
                    stddev=math.sqrt(2.0 / (prev_dim + mlp_dims[-1]))),
                bias_initializer=tf.random_normal_initializer(
                    mean=0.0,
                    stddev=math.sqrt(1.0 / mlp_dims[-1])),
                name=f'mlp_{len(mlp_dims) - 1}')
    
    
    # generate dataset pipline
    def build_model_input(filename, batch_size, num_epochs):
        def parse_csv(value):
            tf.logging.info('Parsing {}'.format(filename))
            cont_defaults = [[0.0] for i in range(1, 14)]
            cate_defaults = [[' '] for i in range(1, 27)]
            label_defaults = [[0]]
            column_headers = TRAIN_DATA_COLUMNS
            record_defaults = label_defaults + cont_defaults + cate_defaults
            columns = tf.io.decode_csv(value, record_defaults=record_defaults)
            all_columns = collections.OrderedDict(zip(column_headers, columns))
            labels = all_columns.pop(LABEL_COLUMN[0])
            features = all_columns
            return features, labels
    
        '''Work Queue Feature'''
        if args.workqueue:
            from tensorflow.python.ops.work_queue import WorkQueue
            work_queue = WorkQueue([filename])
            # For multiple files:
            # work_queue = WorkQueue([filename, filename1,filename2,filename3])
            files = work_queue.input_dataset()
        else:
            files = filename
        # Extract lines from input files using the Dataset API.
        dataset = tf.data.TextLineDataset(files)
        dataset = dataset.shuffle(buffer_size=20000,
                                  seed=args.seed)  # fix seed for reproducing
        dataset = dataset.repeat(num_epochs)
        dataset = dataset.batch(batch_size)
        dataset = dataset.map(parse_csv, num_parallel_calls=28)
        dataset = dataset.prefetch(2)
        return dataset
    
    @hb.function()
    def main():
        
        # check dataset and count data set size
        print("Checking dataset...")
        train_file = args.data_location + '/train.csv'
        if (not os.path.exists(train_file)):
            print("Dataset does not exist in the given data_location.")
            sys.exit()
        no_of_training_examples = sum(1 for line in open(train_file))
        print("Numbers of training dataset is {}".format(no_of_training_examples))
    
        # set batch size, eporch & steps
        batch_size = args.batch_size
    
        if args.steps == 0:
            no_of_epochs = 1
            train_steps = math.ceil(
                (float(no_of_epochs) * no_of_training_examples) / batch_size)
        else:
            no_of_epochs = math.ceil(
                (float(batch_size) * args.steps) / no_of_training_examples)
            train_steps = args.steps
        print("The training steps is {}".format(train_steps))
    
        # set fixed random seed
        tf.set_random_seed(args.seed)
    
        # create data pipline of train & test dataset
        with tf.device('/cpu:0'):
            train_dataset = build_model_input(train_file, batch_size, no_of_epochs)
    
            iterator = tf.data.Iterator.from_structure(train_dataset.output_types,
                                                    train_dataset.output_shapes)
            next_element = iterator.get_next()
    
        train_init_op = iterator.make_initializer(train_dataset)
    
        # create feature column
        feature, labels = next_element[0], next_element[1]
    
        deep_features = transform_categorical(feature)
        wide_features = transform_numeric(feature)
        logits = stacked_dcn_v2(features=deep_features + wide_features,
                                mlp_dims=[1024, 1024, 512, 256, 1]
                                )
        loss = tf.reduce_mean(tf.keras.losses.binary_crossentropy(tf.reshape(labels, (-1, 1)), logits))
    
        step = tf.train.get_or_create_global_step()
        opt = tf.train.AdagradOptimizer(learning_rate=0.01)
        train_op = opt.minimize(loss, global_step=step)
    
        # Session config
        sess_config = tf.ConfigProto()
    
        # # Session hooks
        hooks = []
    
        # if args.smartstaged and not args.tf:
        #     '''Smart staged Feature'''
        #     next_element = tf.staged(next_element, num_threads=4, capacity=40)
        #     sess_config.graph_options.optimizer_options.do_smart_stage = True
        #     hooks.append(tf.make_prefetch_hook())
        # if args.op_fusion and not args.tf:
        #     '''Auto Graph Fusion'''
        #     sess_config.graph_options.optimizer_options.do_op_fusion = True
        # if args.micro_batch and not args.tf:
        #     '''Auto Mirco Batch'''
        #     sess_config.graph_options.optimizer_options.micro_batch_num = args.micro_batch
    
        scaffold = tf.train.Scaffold(
            local_init_op=tf.group(
                tf.local_variables_initializer(), train_init_op),
        )
    
        stop_hook = tf.train.StopAtStepHook(last_step=train_steps)
        log_hook = tf.train.LoggingTensorHook(
            {
                'steps': step,
                'loss': loss,
            }, every_n_iter=1)
        hooks.append(stop_hook)
        hooks.append(log_hook)
    
        with tf.train.MonitoredTrainingSession(
                master='',
                hooks=hooks,
                scaffold=scaffold,
                config=sess_config) as sess:
            while not sess.should_stop():
                print(sess.run([feature]))
                sess.run([loss, train_op])
        print("Training completed.")
    
    
    def boolean_string(string):
        low_string = string.lower()
        if low_string not in {'false', 'true'}:
            raise ValueError('Not a valid boolean string')
        return low_string == 'true'
    
    
    # Get parse
    def get_arg_parser():
        parser = argparse.ArgumentParser()
        parser.add_argument('--data_location',
                            help='Full path of train data',
                            required=False,
                            default='./data')
        parser.add_argument('--steps',
                            help='set the number of steps on train dataset',
                            type=int,
                            default=0)
        parser.add_argument('--batch_size',
                            help='Batch size to train. Default is 512',
                            type=int,
                            default=512)
        parser.add_argument('--seed',
                            help='set the random seed for tensorflow',
                            type=int,
                            default=2021)
        parser.add_argument('--workqueue',
                            help='Whether to enable Work Queue. Default to False.',
                            type=boolean_string,
                            default=False)
        return parser
    
    
    # Some DeepRec's features are enabled by ENV.
    # This func is used to set ENV and enable these features.
    # A triple quotes comment is used to introduce these features and play an emphasizing role.
    def set_env_for_DeepRec():
        '''
        Set some ENV for these DeepRec's features enabled by ENV. 
        More Detail information is shown in https://deeprec.readthedocs.io/zh/latest/index.html.
        START_STATISTIC_STEP & STOP_STATISTIC_STEP: On CPU platform, DeepRec supports memory optimization
            in both stand-alone and distributed trainging. It's default to open, and the 
            default start and stop steps of collection is 1000 and 1100. Reduce the initial 
            cold start time by the following settings.
        MALLOC_CONF: On CPU platform, DeepRec can use memory optimization with the jemalloc library.
            Please preload libjemalloc.so by `LD_PRELOAD=./libjemalloc.so.2 python ...`
        '''
        os.environ['START_STATISTIC_STEP'] = '100'
        os.environ['STOP_STATISTIC_STEP'] = '110'
        os.environ['MALLOC_CONF'] = \
            'background_thread:true,metadata_thp:auto,dirty_decay_ms:20000,muzzy_decay_ms:20000'
    
    
    if __name__ == '__main__':
        parser = get_arg_parser()
        args = parser.parse_args()
    
        set_env_for_DeepRec()
    
        main()
    
    1. Training command:
    python -m hybridbackend.run  python benchmark_hb.py --data_location ../data/ --steps 100 
    

    Willing to contribute

    Yes

    opened by shijieliu 0
  • Error when drop_reminder=True using rebatch API

    Error when drop_reminder=True using rebatch API

    Current behavior

    Using rebatch API with drop_reminder=True will make program exit with segmentation fault

    Expected behavior

    No error

    System information

    • GPU model and memory:
    • OS Platform: ubuntu 18
    • Docker version:
    • GCC/CUDA/cuDNN version:
    • Python/conda version: python 3.6
    • TensorFlow/PyTorch version: 1.5.0
    • HybridBackend version: 0.6.0a0

    Code to reproduce

    (1) First generate a random parquet file.

    import pandas as pd
    import random
    
    data_list = []
    for i in range(1, 100000):
        int_feature = random.randint(1, 1000)
        array_feature = [random.randint(1, 1000) for x in range(0, 50)]
        data_list.append([int_feature, array_feature, 0.8])
    
    df = pd.DataFrame(data_list, columns=["int_feature", "array_feature", "label"])
    df['label'] = pd.to_numeric(df["label"], downcast="float")
    df.to_parquet("parquet_sample_file.parquet")
    

    (2) Then read data

    import tensorflow as tf
    import tensorflow.keras as keras
    import hybridbackend.tensorflow as hb
    
    BATCH_SIZE = 1000
    
    
    def get_parquet_ds():
        filenames_ds = tf.data.Dataset.from_tensor_slices([
            'parquet_sample_file.parquet'
        ]*1)
        hb_fields = []
    
        def _map(elem):
            features = {
                "int_feature": tf.cast(tf.reshape(elem["int_feature"], [-1, 1]), dtype=tf.float32),
                "array_feature": tf.cast(tf.reshape(elem["array_feature"].values, [-1, 50]),
                                                  dtype=tf.float32)
            }
            labels = tf.reshape(elem["label"], [-1, 1])
            return features, labels
    
        hb_fields.append(hb.data.DataFrame.Field("int_feature", tf.int64, ragged_rank=0))
        hb_fields.append(hb.data.DataFrame.Field("array_feature", tf.int64, ragged_rank=1))
        hb_fields.append(hb.data.DataFrame.Field("label", tf.float32, ragged_rank=0))
        iterator = filenames_ds.apply(
            hb.data.read_parquet(BATCH_SIZE, hb_fields, num_parallel_reads=tf.data.experimental.AUTOTUNE))
        iterator = iterator.apply(hb.data.rebatch(BATCH_SIZE*2, fields=hb_fields, drop_remainder=True)).map(_map)
    
        return iterator
    
    
    def train():
        global_init_op = tf.compat.v1.global_variables_initializer()
    
        ds = get_parquet_ds()
        iterator = ds.make_one_shot_iterator()
        get_data_op = iterator.get_next()
    
        with tf.compat.v1.Session() as sess:
            a = sess.run([global_init_op])
            i = 1
            while True:
                try:
                    sample = sess.run([get_data_op])
    
                    f_category = sample[0][0]["int_feature"]
                    f_list = sample[0][0]["array_feature"]
                    labels_ = sample[0][1]
    
                    if i % 100 == 0:
                        print(f"step={i}")
                    i += 1
    
                except tf.errors.OutOfRangeError:
                    break
    
    
    if __name__ == '__main__':
        train()
    

    Willing to contribute

    Yes

    opened by liurcme 0
Releases(v0.7.0)
  • v0.7.0(Oct 21, 2022)

    Objectives:

    1. Memory-efficient loading of categorical data
    2. GPU-efficient orchestration of embedding layers
    3. Communication-efficient training and evaluation at scale
    4. Easy to use with existing AI workflows

    Features:

    1. Performance
    • Support of automatic embedding fusion on PAI DLC / PAI DSW
    • Support of data transfer prefetching
    1. Usability
    • Support of embedding_lookup_* API
    • Support of Keras Model API
    • Support direct pip install via Pypi
    Source code(tar.gz)
    Source code(zip)
  • v0.5.4(Jul 25, 2022)

    Objectives:

    • Easy to use with existing AI workflows

    Features:

    • Support fixed length list in ParquetDataset
    • Support schema parsing in ParquetDataset
    • Provide validation tools for parquet files

    Bug Fixes:

    • Fixes indices calculation in rebatching
    Source code(tar.gz)
    Source code(zip)
  • v0.6.0(Apr 16, 2022)

    Objectives:

    1. Communication-efficient training and evaluation at scale
    2. Easy to use with existing AI workflows

    Features:

    1. Data-Parallel Training and Evaluation
    • Bucketized Gradients Aggregation using AllReduce
    • Global Metric Operations
    • Out-Of-Range Coordination
    1. Hybrid-Parallel Embedding Learning
    • Bucketized Embedding Exchanging using AllToAllv
    • Fusion and Quantization of AllToAllv
    • Fusion of Partitioning and Stitching
    1. Usability
    • Support of MonitoredSession and Estimator
    • Declarative API for Model Definition
    1. Compatibility
    • Support of NVIDIA TensorFlow and DeepRec
    1. Interoperability
    • Inference Pipeline Needs No Change
    • Support of SavedModel
    • Support of Variable, XDL HashTable and PAI Embedding Variable

    Bug Fixes:

    [#46] Fixes rebatching in ParquetDataset.

    Source code(tar.gz)
    Source code(zip)
  • v0.5.3(Jul 25, 2022)

  • v0.5.2(Dec 2, 2021)

    Objectives:

    • Memory-efficient loading of categorical data
    • Easy to use with existing AI workflows

    Features:

    1. Parquet Dataset
    • Reading batch of tensors from numeric fields in zero-copy way
    • Reading batch of sparse tensors from numeric list fields in zero-copy way
    • Support of string fields
    • Support of local filesystem, HDFS, S3 and OSS
    1. Data Pipeline Functions
    • Resizing batch of tensors and ragged tensors
    • Converting ragged tensors to sparse tensors
    • Objective: "Easy to use with existing AI workflows"
    1. Compatibility
    • Support of TensorFlow 1.15 and Tensorflow 1.14
    • GitHub actions for uploading wheels to PyPI

    Bug Fixes:

    • [#11][#12][#13] Supports manylinux_2_24 platform.
    Source code(tar.gz)
    Source code(zip)
Owner
Alibaba
Alibaba Open Source
Alibaba
A check for whether the dependency jobs are all green.

alls-green A check for whether the dependency jobs are all green. Why? Do you have more than one job in your GitHub Actions CI/CD workflows setup? Do

Re:actors 33 Jan 03, 2023
Self-Supervised Deep Blind Video Super-Resolution

Self-Blind-VSR Paper | Discussion Self-Supervised Deep Blind Video Super-Resolution By Haoran Bai and Jinshan Pan Abstract Existing deep learning-base

Haoran Bai 35 Dec 09, 2022
Reinforcement learning algorithms in RLlib

raylab Reinforcement learning algorithms in RLlib and PyTorch. Installation pip install raylab Quickstart Raylab provides agents and environments to b

Ângelo 50 Sep 08, 2022
Official PyTorch implementation of "ArtFlow: Unbiased Image Style Transfer via Reversible Neural Flows"

ArtFlow Official PyTorch implementation of the paper: ArtFlow: Unbiased Image Style Transfer via Reversible Neural Flows Jie An*, Siyu Huang*, Yibing

123 Dec 27, 2022
PyTorch implementation of MoCo: Momentum Contrast for Unsupervised Visual Representation Learning

MoCo: Momentum Contrast for Unsupervised Visual Representation Learning This is a PyTorch implementation of the MoCo paper: @Article{he2019moco, aut

Meta Research 3.7k Jan 02, 2023
A set of tools for converting a darknet dataset to COCO format working with YOLOX

darknet格式数据→COCO darknet训练数据目录结构(详情参见dataset/darknet): darknet ├── class.names ├── gen_config.data ├── gen_train.txt ├── gen_valid.txt └── images

RapidAI-NG 148 Jan 03, 2023
JudeasRx - graphical app for doing personalized causal medicine using the methods invented by Judea Pearl et al.

JudeasRX Instructions Read the references given in the Theory and Notation section below Fire up the Jupyter Notebook judeas-rx.ipynb The notebook dra

Robert R. Tucci 19 Nov 07, 2022
Testing and Estimation of structural breaks in Stata

xtbreak estimating and testing for many known and unknown structural breaks in time series and panel data. For an overview of xtbreak test see xtbreak

Jan Ditzen 13 Jun 19, 2022
MoViNets PyTorch implementation: Mobile Video Networks for Efficient Video Recognition;

MoViNet-pytorch Pytorch unofficial implementation of MoViNets: Mobile Video Networks for Efficient Video Recognition. Authors: Dan Kondratyuk, Liangzh

189 Dec 20, 2022
TSIT: A Simple and Versatile Framework for Image-to-Image Translation

TSIT: A Simple and Versatile Framework for Image-to-Image Translation This repository provides the official PyTorch implementation for the following p

Liming Jiang 255 Nov 23, 2022
VR-Caps: A Virtual Environment for Active Capsule Endoscopy

VR-Caps: A Virtual Environment for Capsule Endoscopy Overview We introduce a virtual active capsule endoscopy environment developed in Unity that prov

DeepMIA Lab 90 Dec 27, 2022
Implementation EfficientDet: Scalable and Efficient Object Detection in PyTorch

Implementation EfficientDet: Scalable and Efficient Object Detection in PyTorch

tonne 1.4k Dec 29, 2022
Official implementation of the paper WAV2CLIP: LEARNING ROBUST AUDIO REPRESENTATIONS FROM CLIP

Wav2CLIP 🚧 WIP 🚧 Official implementation of the paper WAV2CLIP: LEARNING ROBUST AUDIO REPRESENTATIONS FROM CLIP 📄 🔗 Ho-Hsiang Wu, Prem Seetharaman

Descript 240 Dec 13, 2022
Pgn2tex - Scripts to convert pgn files to latex document. Useful to build books or pdf from pgn studies

Pgn2Latex (WIP) A simple script to make pdf from pgn files and studies. It's sti

12 Jul 23, 2022
Implementation of Hierarchical Transformer Memory (HTM) for Pytorch

Hierarchical Transformer Memory (HTM) - Pytorch Implementation of Hierarchical Transformer Memory (HTM) for Pytorch. This Deepmind paper proposes a si

Phil Wang 63 Dec 29, 2022
MetaShift: A Dataset of Datasets for Evaluating Contextual Distribution Shifts and Training Conflicts (ICLR 2022)

MetaShift: A Dataset of Datasets for Evaluating Distribution Shifts and Training Conflicts This repo provides the PyTorch source code of our paper: Me

88 Jan 04, 2023
Deconfounding Temporal Autoencoder: Estimating Treatment Effects over Time Using Noisy Proxies

Deconfounding Temporal Autoencoder (DTA) This is a repository for the paper "Deconfounding Temporal Autoencoder: Estimating Treatment Effects over Tim

Milan Kuzmanovic 3 Feb 04, 2022
ESPNet: Efficient Spatial Pyramid of Dilated Convolutions for Semantic Segmentation

ESPNet: Efficient Spatial Pyramid of Dilated Convolutions for Semantic Segmentation This repository contains the source code of our paper, ESPNet (acc

Sachin Mehta 515 Dec 13, 2022
Watch faces morph into each other with StyleGAN 2, StyleGAN, and DCGAN!

FaceMorpher FaceMorpher is an innovative project to get a unique face morph (or interpolation for geeks) on a website. Yes, this means you can see fac

Anish 9 Jun 24, 2022
A python implementation of Deep-Image-Analogy based on pytorch.

Deep-Image-Analogy This project is a python implementation of Deep Image Analogy.https://arxiv.org/abs/1705.01088. Some results Requirements python 3

Peng Lu 171 Dec 14, 2022