Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions dali/pipeline/data/copy_to_external.h
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2020-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// Copyright (c) 2020-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -48,7 +48,7 @@ inline void CopyToExternalImpl(void* dst,
AccessOrder order, bool use_copy_kernel) {
DeviceGuard d(src.device_id());
const auto &type_info = src.type_info();
type_info.template Copy<DstBackend, SrcBackend>(dst, src.raw_data(), src.size(), order.stream(),
type_info.template Copy<DstBackend, SrcBackend>(dst, src.raw_data(), src.size(), order,
use_copy_kernel);
}

Expand All @@ -70,7 +70,7 @@ inline void CopyToExternalImpl(void* dst,

if (src.IsContiguous()) {
type_info.template Copy<DstBackend, SrcBackend>(dst, unsafe_raw_data(src), src._num_elements(),
order.stream(), use_copy_kernel);
order, use_copy_kernel);
} else {
const auto &src_shape = src.shape();
BatchVector<const void *> from;
Expand All @@ -84,7 +84,7 @@ inline void CopyToExternalImpl(void* dst,
}

type_info.template Copy<DstBackend, SrcBackend>(dst, from.data(), sizes.data(),
num_samples, order.stream(), use_copy_kernel);
num_samples, order, use_copy_kernel);
}
}

Expand Down Expand Up @@ -115,7 +115,7 @@ inline void CopyToExternalImpl(void** dsts,

if (src.IsContiguous() && samples_to_copy == num_samples) {
type_info.template Copy<DstBackend, SrcBackend>(dsts, unsafe_raw_data(src), sizes.data(),
num_samples, order.stream(), use_copy_kernel);
num_samples, order, use_copy_kernel);

} else {
BatchVector<const void *> from;
Expand All @@ -130,7 +130,7 @@ inline void CopyToExternalImpl(void** dsts,
}

type_info.template Copy<DstBackend, SrcBackend>(
to.data(), from.data(), sizes.data(), samples_to_copy, order.stream(), use_copy_kernel);
to.data(), from.data(), sizes.data(), samples_to_copy, order, use_copy_kernel);
}
}

Expand Down
8 changes: 4 additions & 4 deletions dali/pipeline/data/tensor.h
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2017-2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// Copyright (c) 2017-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -86,7 +86,7 @@ class Tensor : public Buffer<Backend> {
order.wait(order_);

type_.template Copy<Backend, CPUBackend>(this->raw_mutable_data(),
data.data(), this->size(), order.stream());
data.data(), this->size(), order);
order_.wait(order);
}

Expand All @@ -102,7 +102,7 @@ class Tensor : public Buffer<Backend> {

order.wait(order_);
type_.template Copy<Backend, CPUBackend>(this->raw_mutable_data(),
data.data(), this->size(), order.stream());
data.data(), this->size(), order);
order_.wait(order);
}

Expand All @@ -127,7 +127,7 @@ class Tensor : public Buffer<Backend> {
this->SetSourceInfo(other.GetSourceInfo());
this->SetSkipSample(other.ShouldSkipSample());
type_.template Copy<Backend, InBackend>(this->raw_mutable_data(),
other.raw_data(), this->size(), order.stream());
other.raw_data(), this->size(), order);
order_.wait(order);
}

Expand Down
10 changes: 5 additions & 5 deletions dali/pipeline/data/tensor_list.cc
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2020-2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// Copyright (c) 2020-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -92,7 +92,7 @@ void CopySamplewiseImpl(DstBatch<DstBackend> &dst, const SrcBatch<SrcBackend> &s
}

type_info.Copy<SrcBackend, DstBackend>(dsts.data(), srcs.data(), sizes.data(), num_samples,
order.stream(), use_copy_kernel);
order, use_copy_kernel);
}


Expand All @@ -114,7 +114,7 @@ void CopySamplewiseImpl(DstBatch<DstBackend> &dst, const void *src, const TypeIn
}

type_info.Copy<DstBackend, SrcBackend>(dsts.data(), src, sizes.data(), num_samples,
order.stream(), use_copy_kernel);
order, use_copy_kernel);
}


Expand All @@ -136,7 +136,7 @@ void CopySamplewiseImpl(void *dst, const SrcBatch<SrcBackend> &src, const TypeIn
}

type_info.Copy<DstBackend, SrcBackend>(dst, srcs.data(), sizes.data(), num_samples,
order.stream(), use_copy_kernel);
order, use_copy_kernel);
}

/**
Expand All @@ -149,7 +149,7 @@ void CopyImpl(DstBatch<DstBackend> &dst, const SrcBatch<SrcBackend> &src, const
AccessOrder copy_order, bool use_copy_kernel = false) {
if (dst.IsContiguous() && src.IsContiguous()) {
type_info.Copy<DstBackend, SrcBackend>(unsafe_raw_mutable_data(dst), unsafe_raw_data(src),
dst.shape().num_elements(), copy_order.stream(),
dst.shape().num_elements(), copy_order,
use_copy_kernel);
} else if (dst.IsContiguous() && !src.IsContiguous()) {
copy_impl::CopySamplewiseImpl<DstBackend, SrcBackend>(unsafe_raw_mutable_data(dst), src,
Expand Down
109 changes: 70 additions & 39 deletions dali/pipeline/data/types.cc
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright (c) 2017-2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// Copyright (c) 2017-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -31,6 +31,7 @@ const auto &_type_info_##Id = TypeTable::GetTypeId<Type>()

#include "dali/pipeline/data/backend.h"
#include "dali/core/per_stream_pool.h"
#include "dali/core/cuda_stream_pool.h"
#include "dali/kernels/common/scatter_gather.h"

namespace dali {
Expand All @@ -45,36 +46,54 @@ ScatterGatherPool& ScatterGatherPoolInstance() {
}

void ScatterGatherCopy(void **dsts, const void **srcs, const Index *sizes, int n, int element_size,
cudaStream_t stream) {
auto sc = ScatterGatherPoolInstance().Get(stream, kMaxSizePerBlock);
AccessOrder order) {
if (!order.is_device()) {
CUDAStreamLease stream = CUDAStreamPool::instance().Get();
ScatterGatherCopy(dsts, srcs, sizes, n, element_size, stream);
AccessOrder::host().wait(stream);
return;
}
auto sc = ScatterGatherPoolInstance().Get(order.stream(), kMaxSizePerBlock);
for (int i = 0; i < n; i++) {
sc->AddCopy(dsts[i], srcs[i], sizes[i] * element_size);
}
sc->Run(stream, true, kernels::ScatterGatherGPU::Method::Kernel);
sc->Run(order.stream(), true, kernels::ScatterGatherGPU::Method::Kernel);
}

void ScatterGatherCopy(void *dst, const void **srcs, const Index *sizes, int n, int element_size,
cudaStream_t stream) {
auto sc = ScatterGatherPoolInstance().Get(stream, kMaxSizePerBlock);
AccessOrder order) {
if (!order.is_device()) {
CUDAStreamLease stream = CUDAStreamPool::instance().Get();
ScatterGatherCopy(dst, srcs, sizes, n, element_size, stream);
AccessOrder::host().wait(stream);
return;
}
auto sc = ScatterGatherPoolInstance().Get(order.stream(), kMaxSizePerBlock);
auto *sample_dst = reinterpret_cast<uint8_t*>(dst);
for (int i = 0; i < n; i++) {
auto nbytes = sizes[i] * element_size;
sc->AddCopy(sample_dst, srcs[i], nbytes);
sample_dst += nbytes;
}
sc->Run(stream, true, kernels::ScatterGatherGPU::Method::Kernel);
sc->Run(order.stream(), true, kernels::ScatterGatherGPU::Method::Kernel);
}

void ScatterGatherCopy(void **dsts, const void *src, const Index *sizes, int n, int element_size,
cudaStream_t stream) {
auto sc = ScatterGatherPoolInstance().Get(stream, kMaxSizePerBlock);
AccessOrder order) {
if (!order.is_device()) {
CUDAStreamLease stream = CUDAStreamPool::instance().Get();
ScatterGatherCopy(dsts, src, sizes, n, element_size, stream);
AccessOrder::host().wait(stream);
return;
}
auto sc = ScatterGatherPoolInstance().Get(order.stream(), kMaxSizePerBlock);
auto *sample_src = reinterpret_cast<const uint8_t*>(src);
for (int i = 0; i < n; i++) {
auto nbytes = sizes[i] * element_size;
sc->AddCopy(dsts[i], sample_src, nbytes);
sample_src += nbytes;
}
sc->Run(stream, true, kernels::ScatterGatherGPU::Method::Kernel);
sc->Run(order.stream(), true, kernels::ScatterGatherGPU::Method::Kernel);
}

} // namespace detail
Expand All @@ -86,120 +105,132 @@ TypeTable &TypeTable::instance() {

template <typename DstBackend, typename SrcBackend>
void TypeInfo::Copy(void *dst,
const void *src, Index n, cudaStream_t stream, bool use_copy_kernel) const {
const void *src, Index n, AccessOrder order, bool use_copy_kernel) const {
constexpr bool is_host_to_host = std::is_same<DstBackend, CPUBackend>::value &&
std::is_same<SrcBackend, CPUBackend>::value;
if (n == 0)
return;

if (is_host_to_host) {
if (stream)
if (order.is_device())
throw std::logic_error("Cannot issue a H2H copy on a stream");

// Call our copy function
copier_(dst, src, n);
} else if (use_copy_kernel) {
detail::LaunchCopyKernel(dst, src, n * size(), stream);
if (order.is_device()) {
detail::LaunchCopyKernel(dst, src, n * size(), order.stream());
} else {
CUDAStreamLease stream = CUDAStreamPool::instance().Get();
detail::LaunchCopyKernel(dst, src, n * size(), stream);
CUDA_CALL(cudaStreamSynchronize(stream));
}
} else {
MemCopy(dst, src, n*size(), stream);
if (order.is_device()) {
MemCopy(dst, src, n*size(), order.stream());
} else {
CUDAStreamLease stream = CUDAStreamPool::instance().Get();
MemCopy(dst, src, n*size(), stream);
CUDA_CALL(cudaStreamSynchronize(stream));
}
}
}

template void TypeInfo::Copy<CPUBackend, CPUBackend>(void *dst,
const void *src, Index n, cudaStream_t stream, bool use_copy_kernel) const;
const void *src, Index n, AccessOrder order, bool use_copy_kernel) const;

template void TypeInfo::Copy<CPUBackend, GPUBackend>(void *dst,
const void *src, Index n, cudaStream_t stream, bool use_copy_kernel) const;
const void *src, Index n, AccessOrder order, bool use_copy_kernel) const;

template void TypeInfo::Copy<GPUBackend, CPUBackend>(void *dst,
const void *src, Index n, cudaStream_t stream, bool use_copy_kernel) const;
const void *src, Index n, AccessOrder order, bool use_copy_kernel) const;

template void TypeInfo::Copy<GPUBackend, GPUBackend>(void *dst,
const void *src, Index n, cudaStream_t stream, bool use_copy_kernel) const;
const void *src, Index n, AccessOrder order, bool use_copy_kernel) const;

template <typename DstBackend, typename SrcBackend>
void TypeInfo::Copy(void **dsts, const void** srcs, const Index* sizes, int n,
cudaStream_t stream, bool use_copy_kernel) const {
AccessOrder order, bool use_copy_kernel) const {
constexpr bool is_host_to_host = std::is_same<DstBackend, CPUBackend>::value &&
std::is_same<SrcBackend, CPUBackend>::value;
if (!is_host_to_host && use_copy_kernel) {
detail::ScatterGatherCopy(dsts, srcs, sizes, n, size(), stream);
detail::ScatterGatherCopy(dsts, srcs, sizes, n, size(), order);
} else {
for (int i = 0; i < n; i++) {
Copy<DstBackend, SrcBackend>(dsts[i], srcs[i], sizes[i], stream);
Copy<DstBackend, SrcBackend>(dsts[i], srcs[i], sizes[i], order);
}
}
}

template void TypeInfo::Copy<CPUBackend, CPUBackend>(void **dsts,
const void **src, const Index *sizes, int n, cudaStream_t stream, bool use_copy_kernel) const;
const void **src, const Index *sizes, int n, AccessOrder order, bool use_copy_kernel) const;

template void TypeInfo::Copy<CPUBackend, GPUBackend>(void **dsts,
const void **src, const Index *sizes, int n, cudaStream_t stream, bool use_copy_kernel) const;
const void **src, const Index *sizes, int n, AccessOrder order, bool use_copy_kernel) const;

template void TypeInfo::Copy<GPUBackend, CPUBackend>(void **dsts,
const void **src, const Index *sizes, int n, cudaStream_t stream, bool use_copy_kernel) const;
const void **src, const Index *sizes, int n, AccessOrder order, bool use_copy_kernel) const;

template void TypeInfo::Copy<GPUBackend, GPUBackend>(void **dsts,
const void **src, const Index *sizes, int n, cudaStream_t stream, bool use_copy_kernel) const;
const void **src, const Index *sizes, int n, AccessOrder order, bool use_copy_kernel) const;


template <typename DstBackend, typename SrcBackend>
void TypeInfo::Copy(void *dst, const void** srcs, const Index* sizes, int n,
cudaStream_t stream, bool use_copy_kernel) const {
AccessOrder order, bool use_copy_kernel) const {
constexpr bool is_host_to_host = std::is_same<DstBackend, CPUBackend>::value &&
std::is_same<SrcBackend, CPUBackend>::value;
if (!is_host_to_host && use_copy_kernel) {
detail::ScatterGatherCopy(dst, srcs, sizes, n, size(), stream);
detail::ScatterGatherCopy(dst, srcs, sizes, n, size(), order);
} else {
auto sample_dst = static_cast<uint8_t*>(dst);
for (int i = 0; i < n; i++) {
Copy<DstBackend, SrcBackend>(sample_dst, srcs[i], sizes[i], stream);
Copy<DstBackend, SrcBackend>(sample_dst, srcs[i], sizes[i], order);
sample_dst += sizes[i] * size();
}
}
}


template void TypeInfo::Copy<CPUBackend, CPUBackend>(void *dst,
const void **src, const Index *sizes, int n, cudaStream_t stream, bool use_copy_kernel) const;
const void **src, const Index *sizes, int n, AccessOrder order, bool use_copy_kernel) const;

template void TypeInfo::Copy<CPUBackend, GPUBackend>(void *dst,
const void **src, const Index *sizes, int n, cudaStream_t stream, bool use_copy_kernel) const;
const void **src, const Index *sizes, int n, AccessOrder order, bool use_copy_kernel) const;

template void TypeInfo::Copy<GPUBackend, CPUBackend>(void *dst,
const void **src, const Index *sizes, int n, cudaStream_t stream, bool use_copy_kernel) const;
const void **src, const Index *sizes, int n, AccessOrder order, bool use_copy_kernel) const;

template void TypeInfo::Copy<GPUBackend, GPUBackend>(void *dst,
const void **src, const Index *sizes, int n, cudaStream_t stream, bool use_copy_kernel) const;
const void **src, const Index *sizes, int n, AccessOrder order, bool use_copy_kernel) const;


template <typename DstBackend, typename SrcBackend>
void TypeInfo::Copy(void **dsts, const void* src, const Index* sizes, int n,
cudaStream_t stream, bool use_copy_kernel) const {
AccessOrder order, bool use_copy_kernel) const {
constexpr bool is_host_to_host = std::is_same<DstBackend, CPUBackend>::value &&
std::is_same<SrcBackend, CPUBackend>::value;
if (!is_host_to_host && use_copy_kernel) {
detail::ScatterGatherCopy(dsts, src, sizes, n, size(), stream);
detail::ScatterGatherCopy(dsts, src, sizes, n, size(), order);
} else {
auto sample_src = reinterpret_cast<const uint8_t*>(src);
for (int i = 0; i < n; i++) {
Copy<DstBackend, SrcBackend>(dsts[i], sample_src, sizes[i], stream);
Copy<DstBackend, SrcBackend>(dsts[i], sample_src, sizes[i], order);
sample_src += sizes[i] * size();
}
}
}

template void TypeInfo::Copy<CPUBackend, CPUBackend>(void **dsts,
const void *src, const Index *sizes, int n, cudaStream_t stream, bool use_copy_kernel) const;
const void *src, const Index *sizes, int n, AccessOrder order, bool use_copy_kernel) const;

template void TypeInfo::Copy<CPUBackend, GPUBackend>(void **dsts,
const void *src, const Index *sizes, int n, cudaStream_t stream, bool use_copy_kernel) const;
const void *src, const Index *sizes, int n, AccessOrder order, bool use_copy_kernel) const;

template void TypeInfo::Copy<GPUBackend, CPUBackend>(void **dsts,
const void *src, const Index *sizes, int n, cudaStream_t stream, bool use_copy_kernel) const;
const void *src, const Index *sizes, int n, AccessOrder order, bool use_copy_kernel) const;

template void TypeInfo::Copy<GPUBackend, GPUBackend>(void **dsts,
const void *src, const Index *sizes, int n, cudaStream_t stream, bool use_copy_kernel) const;
const void *src, const Index *sizes, int n, AccessOrder order, bool use_copy_kernel) const;

} // namespace dali
Loading