Jednym z alternatywnych rozwiązań byłoby użycie dask narzędzia przepływu pracy. Chociaż nie jest to tak zabawne składniowo, jak ...
var
| do this
| then do that
... nadal umożliwia przepływ zmiennej w dół łańcucha, a użycie dask zapewnia dodatkową korzyść w postaci równoległości, jeśli to możliwe.
Oto jak używam dask, aby uzyskać wzór łańcucha rurowego:
import dask
def a(foo):
return foo + 1
def b(foo):
return foo / 2
def c(foo,bar):
return foo + bar
workflow = {'a_task':(a,1),
'b_task':(b,'a_task',),
'c_task':(c,99,'b_task'),}
dask.get(workflow,'c_task')
Po pracy z eliksirem chciałem użyć wzorca rurociągów w Pythonie. To nie jest dokładnie ten sam wzorzec, ale jest podobny i, jak powiedziałem, ma dodatkowe zalety równoległości; jeśli powiesz dask, aby otrzymał zadanie w swoim przepływie pracy, które nie jest zależne od innych, aby uruchomiły się jako pierwsze, będą one działać równolegle.
Jeśli chcesz mieć prostszą składnię, możesz ją opakować w coś, co zajmie się nazewnictwem zadań za Ciebie. Oczywiście w takiej sytuacji wszystkie funkcje powinny przyjmować potok jako pierwszy argument i stracisz korzyści z paralizacji. Ale jeśli nie masz nic przeciwko, możesz zrobić coś takiego:
def dask_pipe(initial_var, functions_args):
'''
call the dask_pipe with an init_var, and a list of functions
workflow, last_task = dask_pipe(initial_var, {function_1:[], function_2:[arg1, arg2]})
workflow, last_task = dask_pipe(initial_var, [function_1, function_2])
dask.get(workflow, last_task)
'''
workflow = {}
if isinstance(functions_args, list):
for ix, function in enumerate(functions_args):
if ix == 0:
workflow['task_' + str(ix)] = (function, initial_var)
else:
workflow['task_' + str(ix)] = (function, 'task_' + str(ix - 1))
return workflow, 'task_' + str(ix)
elif isinstance(functions_args, dict):
for ix, (function, args) in enumerate(functions_args.items()):
if ix == 0:
workflow['task_' + str(ix)] = (function, initial_var)
else:
workflow['task_' + str(ix)] = (function, 'task_' + str(ix - 1), *args )
return workflow, 'task_' + str(ix)
def foo(df):
return df[['a','b']]
def bar(df, s1, s2):
return df.columns.tolist() + [s1, s2]
def baz(df):
return df.columns.tolist()
import dask
import pandas as pd
df = pd.DataFrame({'a':[1,2,3],'b':[1,2,3],'c':[1,2,3]})
Teraz, dzięki temu opakowaniu, możesz utworzyć potok według jednego z tych wzorców składniowych:
lubię to:
workflow, last_task = dask_pipe(df, [foo, baz])
print(dask.get(workflow, last_task))
workflow, last_task = dask_pipe(df, {foo:[], bar:['string1', 'string2']})
print(dask.get(workflow, last_task))
crime_by_state %>% filter(State=="New York", Year==2005) ...
od końca Jak dplyr zastąpić moje najczęstszych idiomów R .