@@ -36,20 +36,39 @@ object Selector {
36
36
// ----------------------------------------------------------------------------
37
37
38
38
trait Dataset [T ] {
39
- def select [A ](c : Column [T , A ]): Dataset [A ] =
40
- this .asInstanceOf [Dataset [A ]] // Use c.label to do an untyped select on actual Spark Dataset, and
39
+
40
+ def select [A ](c : Column [T , A ]): Dataset [A ] = new SelectedDataset [T , A ](this , c)
41
+ // Use c.label to do an untyped select on actual Spark Dataset, and
41
42
// cast the result to TypedDataset[A]
42
43
43
44
def col [S <: String , A ](s : S )(implicit unused ev : Exists [T , s.type , A ]) =
44
- new Column [T , A ](s) // ev is only here to check than this is safe, it's
45
- // never used at runtime!
45
+ new Column [T , A ](s) // ev is only here to check than this is safe, it's never used at runtime!
46
+
47
+ def collect (): Vector [T ]
48
+ }
49
+
50
+ class SelectedDataset [T , A ](ds : Dataset [T ], val col : Column [T , A ]) extends Dataset [A ] {
51
+ def collect (): Vector [A ] = {
52
+ // This would use collect of the underlying Spark structure plus a cast
53
+ ds match { // Dummy implementation
54
+ case SeqDataset (data) =>
55
+ println(s " selecting ` ${col.label}` from $data" )
56
+ col.label match {
57
+ case " a" => data.map(_.asInstanceOf [X4 [A ,_,_,_]].a).toVector
58
+ case " b" => data.map(_.asInstanceOf [X4 [_,A ,_,_]].b).toVector
59
+ case " c" => data.map(_.asInstanceOf [X4 [_,_,A ,_]].c).toVector
60
+ case " d" => data.map(_.asInstanceOf [X4 [_,_,_,A ]].d).toVector
61
+ }
62
+ }
63
+ }
64
+ }
46
65
47
- def collect () : Vector [T ] =
48
- Vector .empty [T ] // Uses collect of the underlying Spark structure plus a cast
66
+ case class SeqDataset [ T ]( data : Seq [T ]) extends Dataset [ T ] {
67
+ override def collect () : Vector [T ] = data.toVector
49
68
}
50
69
51
70
object Dataset {
52
- def create [T ](values : Seq [T ]): Dataset [T ] = new Dataset [T ] { }
71
+ def create [T ](values : Seq [T ]): Dataset [T ] = new SeqDataset [T ](values)
53
72
}
54
73
55
74
/** Expression used in `select`-like constructions.
@@ -115,15 +134,13 @@ object Test {
115
134
println(" unused" )
116
135
val unusedD = ds.col(" d" )
117
136
val outSpark1 : Vector [Boolean ] = ds.select(unusedD).collect()
118
- // FIXME implement Dataset opertations
119
- // assert(outSpark1 == outColl)
137
+ assert(outSpark1 == outColl)
120
138
}
121
139
122
140
println(" used" )
123
141
val usedD = ds.col(" d" )
124
142
val outSpark2 : Vector [Boolean ] = ds.select(usedD).collect()
125
- // FIXME implement Dataset opertations
126
- // assert(outSpark2 == outColl)
143
+ assert(outSpark2 == outColl)
127
144
128
145
println(" end" )
129
146
}
0 commit comments