= 4
bs = list(string.ascii_lowercase) letters
DataLoaders
DataLoader
类DataLoader 辅助函数
fastai 包含一个替代 PyTorch DataLoader 的类,它在很大程度上与 API 兼容,并增加了许多有用的功能和灵活性。在我们查看该类之前,需要先定义几个辅助函数。
fa_collate
fa_collate (t)
一个替代 PyTorch default_collate
的函数,它保留类型并处理 Sequence
#e.g. x is int, y is tuple
= [(1,(2,3)),(1,(2,3))]
t
test_eq(fa_collate(t), default_collate(t))map(type), [Tensor,tuple])
test_eq(L(fa_collate(t)).
= [(1,(2,(3,4))),(1,(2,(3,4)))]
t
test_eq(fa_collate(t), default_collate(t))map(type), [Tensor,tuple])
test_eq(L(fa_collate(t)).1]).map(type), [Tensor,tuple]) test_eq(L(fa_collate(t)[
fa_convert
fa_convert (t)
一个替代 PyTorch default_convert
的函数,它保留类型并处理 Sequence
= array([1,2])
t0 = [t0,(t0,t0)]
t
test_eq(fa_convert(t), default_convert(t))map(type), [Tensor,tuple]) test_eq(L(fa_convert(t)).
SkipItemException
抛出此异常通知 DataLoader
跳过一个项目
collate_error
collate_error (e:Exception, batch)
当批次无法 collate 时引发错误,说明批次中哪些项目大小不同及其类型
DataLoader
DataLoader (dataset=None, bs=None, num_workers=0, pin_memory=False, timeout=0, batch_size=None, shuffle=False, drop_last=False, indexed=None, n=None, device=None, persistent_workers=False, pin_memory_device='', wif=None, before_iter=None, after_item=None, before_batch=None, after_batch=None, after_iter=None, create_batches=None, create_item=None, create_batch=None, retain=None, get_idxs=None, sample=None, shuffle_fn=None, do_batch=None)
继承此类,以便 self._xtra
中的所有属性访问都传递给 self.default
DataLoader
的参数
dataset
:从中加载数据的数据集。可以是映射式数据集或可迭代式数据集。bs
(int):每个批次加载多少个样本(如果提供了batch_size
,则batch_size
将覆盖bs
)。如果bs=None
,则假定dataset.__getitem__
返回一个批次。num_workers
(int):用于数据加载的子进程数量。0
表示数据将在主进程中加载。pin_memory
(bool):如果为True
,数据加载器将在返回 Tensors 之前将其复制到 CUDA 固定内存中。timeout
(float>0):从工作进程收集批次的超时时间(秒)。batch_size
(int):仅为 PyTorch 兼容性提供。请使用bs
。shuffle
(bool):如果为True
,则数据加载器在每次完全读取/迭代时都会打乱数据。drop_last
(bool):如果为True
,则丢弃最后一个不完整的批次。indexed
(bool):DataLoader
会猜测数据集是否可索引(或可迭代),但您可以通过此参数覆盖它。默认为True
。n
(int):默认为len(dataset)
。如果您使用可迭代式数据集,可以通过n
指定大小。device
(torch.device):默认为default_device()
,默认为 CUDA。您可以指定设备为torch.device('cpu')
。
覆盖 create_item
并使用默认的无限采样器获取未知长度的数据流(在需要停止数据流时调用 stop()
)。
class RandDL(DataLoader):
def create_item(self, s):
= random.random()
r return r if r<0.95 else stop()
L(RandDL())
(#9) [0.09071201211613367,0.03249811556595483,0.6517029228593939,0.8584412116263038,0.759838440232556,0.3725873327679504,0.1445316323722865,0.18876233969606782,0.25518635091544917]
=4, drop_last=True)).map(len) L(RandDL(bs
(#1) [4]
= RandDL(bs=4, num_workers=4, drop_last=True)
dl map(len) L(dl).
(#1) [4]
= 0 if sys.platform in ("win32","darwin") else 4
test_num_workers
test_eq(dl.fake_l.num_workers, test_num_workers)with dl.fake_l.no_multiproc():
0)
test_eq(dl.fake_l.num_workers, map(len)
L(dl). test_eq(dl.fake_l.num_workers, test_num_workers)
def _rand_item(s):
= random.random()
r return r if r<0.95 else stop()
=_rand_item)) L(DataLoader(create_item
(#2) [0.624781366539204,0.39823513973618685]
如果您未设置 bs
,则假定 dataset
提供一个迭代器或一个返回批次的 __getitem__
方法。
= DataLoader(letters)
ds1
test_eq(L(ds1), letters)len(ds1), 26)
test_eq(
=True)), letters)
test_shuffled(L(DataLoader(letters, shuffle
= DataLoader(letters, indexed=False)
ds1
test_eq(L(ds1), letters)len(ds1), 26)
test_eq(
= L(tensor([0,1,2]),tensor([3,4,5]))
t2 = DataLoader(t2)
ds2
test_eq_type(L(ds2), t2)
= L(array([0,1,2], dtype=np.int64),array([3,4,5], dtype=np.int64))
t3 = DataLoader(t3)
ds3 map(tensor))
test_eq_type(L(ds3), t3.
= DataLoader(t3, create_batch=noop, after_iter=lambda: setattr(t3, 'f', 1))
ds4
test_eq_type(L(ds4), t3)1) test_eq(t3.f,
如果您设置了 bs
,则假定 dataset
提供一个迭代器或一个返回批次中单个项目的 __getitem__
方法。
def twoepochs(d): return ' '.join(''.join(list(o)) for _ in range(2) for o in d)
= DataLoader(letters, bs=4, drop_last=True, num_workers=0)
ds1 'abcd efgh ijkl mnop qrst uvwx abcd efgh ijkl mnop qrst uvwx')
test_eq(twoepochs(ds1),
= DataLoader(letters,4,num_workers=2)
ds1 'abcd efgh ijkl mnop qrst uvwx yz abcd efgh ijkl mnop qrst uvwx yz')
test_eq(twoepochs(ds1),
= DataLoader(range(12), bs=4, num_workers=3)
ds1 0,1,2,3]),tensor([4,5,6,7]),tensor([8,9,10,11])))
test_eq_type(L(ds1), L(tensor([
= DataLoader([str(i) for i in range(11)], bs=4, after_iter=lambda: setattr(t3, 'f', 2))
ds1 '0','1','2','3'],['4','5','6','7'],['8','9','10']))
test_eq_type(L(ds1), L([2)
test_eq(t3.f,
= iter(DataLoader(map(noop,range(20)), bs=4, num_workers=1))
it next(it) for _ in range(3)], [tensor([0,1,2,3]),tensor([4,5,6,7]),tensor([8,9,10,11])]) test_eq_type([
可迭代数据加载器需要进行特定测试。
class DummyIterableDataset(IterableDataset):
def __iter__(self):
yield from range(11)
= DataLoader(DummyIterableDataset(), bs=4)
ds1 # Check it yields fine, and check we can do multiple passes
for i in range(3):
0,1,2,3]),tensor([4,5,6,7]),tensor([8,9,10])))
test_eq_type(L(ds1), L(tensor([
# Check `drop_last` works fine (with multiple passes, since this will prematurely terminate the iterator)
= DataLoader(DummyIterableDataset(), bs=4, drop_last=True)
ds1 for i in range(3):
0,1,2,3]),tensor([4,5,6,7]))) test_eq_type(L(ds1), L(tensor([
class SleepyDL(list):
def __getitem__(self,i):
/50)
time.sleep(random.random()return super().__getitem__(i)
= SleepyDL(letters)
t
= DataLoader(t, shuffle=True, num_workers=1)
dl
test_shuffled(L(dl), letters)
test_shuffled(L(dl), L(dl)) L(dl)
CPU times: user 3.35 ms, sys: 890 µs, total: 4.24 ms
Wall time: 307 ms
CPU times: user 6.93 ms, sys: 860 µs, total: 7.79 ms
Wall time: 333 ms
CPU times: user 7.78 ms, sys: 722 µs, total: 8.51 ms
Wall time: 331 ms
(#26) ['l','h','f','r','z','s','u','x','m','p'...]
class SleepyQueue():
"Simulate a queue with varying latency"
def __init__(self, q): self.q=q
def __iter__(self):
while True:
/100)
time.sleep(random.random()try: yield self.q.get_nowait()
except queues.Empty: return
= Queue()
q for o in range(30): q.put(o)
= SleepyQueue(q)
it
if not (sys.platform == "win32" and IN_NOTEBOOK):
class A(TensorBase): pass
for nw in (0,2):
= A(tensor([1,2]))
t = DataLoader([t,t,t,t,t,t,t,t], bs=4, num_workers=nw)
dl = first(dl)
b type(b), A)
test_eq(
= (A(tensor([1,2])),)
t = DataLoader([t,t,t,t,t,t,t,t], bs=4, num_workers=nw)
dl = first(dl)
b type(b[0]), A) test_eq(
list(DataLoader(list(range(50)),bs=32,shuffle=True,num_workers=3))
[tensor([42, 12, 44, 21, 8, 6, 3, 37, 33, 9, 27, 34, 18, 26, 1, 23, 11, 41,
15, 0, 49, 4, 38, 46, 48, 14, 40, 36, 17, 45, 30, 29]),
tensor([19, 10, 22, 13, 25, 32, 35, 5, 2, 20, 47, 39, 16, 28, 43, 7, 31, 24])]
class A(TensorBase): pass
= A(tensor(1,2))
t
= DataLoader([t,t,t,t,t,t,t,t], bs=4, num_workers=2, after_batch=to_device)
tdl = first(tdl)
b type(b), A)
test_eq(
# Unknown attributes are delegated to `dataset`
1,2)) test_eq(tdl.pop(), tensor(
覆盖 get_idxs
以在 DL 消耗完成之前返回相同的索引。这旨在测试当 num_workers
>1 时一致的采样行为。
class AdamantDL(DataLoader):
def get_idxs(self):
=random.randint(0,self.n-1)
rreturn [r] * self.n
tuple(AdamantDL((list(range(50))),bs=16,num_workers=4))).unique().numel(),1) test_eq(torch.cat(
# from subprocess import Popen, PIPE
# # test num_workers > 0 in scripts works when python process start method is spawn
# process = Popen(["python", "dltest.py"], stdout=PIPE)
# _, err = process.communicate(timeout=15)
# exit_code = process.wait()
# test_eq(exit_code, 0)