[DISCUSSION] Add hint/option on PCollection

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

[DISCUSSION] Add hint/option on PCollection

Jean-Baptiste Onofré
Hi,

As part of the discussion about schema, Romain mentioned hint. I think it's
worth to have an explanation about that and especially it could be wider than
schema.

Today, to give information to the runner, we use PipelineOptions. The runner can
use these options, and apply for all inner representation of the PCollection in
the runner.

For instance, for the Spark runner, the persistence storage level (memory, disk,
...) can be defined via pipeline options.

Then, the Spark runner automatically defines if RDDs have to be persisted (using
the storage level defined in the pipeline options), for instance if the same
POutput/PCollection is read several time.

However, the user doesn't have any way to provide indication to the runner to
deal with a specific PCollection.

Imagine, the user has a pipeline like this: pipeline.apply().apply().apply(). We
have three PCollections involved in this pipeline. It's not currently possible
to give indications how the runner should "optimized" and deal with the second
PCollection only.

The idea is to add a method on the PCollection:

PCollection.addHint(String key, Object value);

For instance:

collection.addHint("spark.persist", StorageLevel.MEMORY_ONLY);

I see three direct usage of this:

1. Related to schema: the schema definition could be a hint
2. Related to the IO: add headers for the IO and the runner how to specifically
process a collection. In Apache Camel, we have headers on the message and
properties on the exchange similar to this. It allows to give some indication
how to process some messages on the Camel component. We can imagine the same of
the IO (using the PCollection hints to react accordingly).
3. Related to runner optimization: I see for instance a way to use RDD or
dataframe in Spark runner, or even specific optimization like persist. I had lot
of questions from Spark users saying: "in my Spark job, I know where and how I
should use persist (rdd.persist()), but I can't do such optimization using
Beam". So it could be a good improvements.

Thoughts ?

Regards
JB
--
Jean-Baptiste Onofré
[hidden email]
http://blog.nanthrax.net
Talend - http://www.talend.com
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSSION] Add hint/option on PCollection

Jean-Baptiste Onofré
Oops sorry bad mailing list, nevermind.

On 01/30/2018 11:12 AM, Jean-Baptiste Onofré wrote:

> Hi,
>
> As part of the discussion about schema, Romain mentioned hint. I think it's
> worth to have an explanation about that and especially it could be wider than
> schema.
>
> Today, to give information to the runner, we use PipelineOptions. The runner can
> use these options, and apply for all inner representation of the PCollection in
> the runner.
>
> For instance, for the Spark runner, the persistence storage level (memory, disk,
> ...) can be defined via pipeline options.
>
> Then, the Spark runner automatically defines if RDDs have to be persisted (using
> the storage level defined in the pipeline options), for instance if the same
> POutput/PCollection is read several time.
>
> However, the user doesn't have any way to provide indication to the runner to
> deal with a specific PCollection.
>
> Imagine, the user has a pipeline like this: pipeline.apply().apply().apply(). We
> have three PCollections involved in this pipeline. It's not currently possible
> to give indications how the runner should "optimized" and deal with the second
> PCollection only.
>
> The idea is to add a method on the PCollection:
>
> PCollection.addHint(String key, Object value);
>
> For instance:
>
> collection.addHint("spark.persist", StorageLevel.MEMORY_ONLY);
>
> I see three direct usage of this:
>
> 1. Related to schema: the schema definition could be a hint
> 2. Related to the IO: add headers for the IO and the runner how to specifically
> process a collection. In Apache Camel, we have headers on the message and
> properties on the exchange similar to this. It allows to give some indication
> how to process some messages on the Camel component. We can imagine the same of
> the IO (using the PCollection hints to react accordingly).
> 3. Related to runner optimization: I see for instance a way to use RDD or
> dataframe in Spark runner, or even specific optimization like persist. I had lot
> of questions from Spark users saying: "in my Spark job, I know where and how I
> should use persist (rdd.persist()), but I can't do such optimization using
> Beam". So it could be a good improvements.
>
> Thoughts ?
>
> Regards
> JB
>

--
Jean-Baptiste Onofré
[hidden email]
http://blog.nanthrax.net
Talend - http://www.talend.com