Skip to content

Commit ca3c040

Browse files
feat(bigquery): extract preview (#190)
* feat(bigquery): extract preview * fix(bigquery): fix preview fields not being populated
1 parent 33b589d commit ca3c040

File tree

1 file changed

+54
-1
lines changed

1 file changed

+54
-1
lines changed

plugins/extractors/bigquery/bigquery.go

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package bigquery
22

33
import (
44
"context"
5+
"encoding/json"
56
"fmt"
67
"html/template"
78
"strings"
@@ -21,6 +22,10 @@ import (
2122
"google.golang.org/protobuf/types/known/timestamppb"
2223
)
2324

25+
const (
26+
previewTotalRows = 30
27+
)
28+
2429
type Config struct {
2530
ProjectID string `mapstructure:"project_id" validate:"required"`
2631
ServiceAccountJSON string `mapstructure:"service_account_json"`
@@ -59,7 +64,6 @@ func (e *Extractor) Extract(ctx context.Context, config map[string]interface{},
5964
}
6065

6166
return
62-
6367
}
6468

6569
// Create big query client
@@ -101,6 +105,11 @@ func (e *Extractor) buildTable(ctx context.Context, t *bigquery.Table, md *bigqu
101105
partitionField = md.TimePartitioning.Field
102106
}
103107

108+
previewFields, previewRows, err := e.buildPreview(ctx, t)
109+
if err != nil {
110+
e.logger.Warn("error building preview", "err", err, "table", t.FullyQualifiedName())
111+
}
112+
104113
return assets.Table{
105114
Resource: &common.Resource{
106115
Urn: fmt.Sprintf("%s:%s.%s", t.ProjectID, t.DatasetID, t.TableID),
@@ -116,6 +125,8 @@ func (e *Extractor) buildTable(ctx context.Context, t *bigquery.Table, md *bigqu
116125
"project": t.ProjectID,
117126
"type": string(md.Type),
118127
"partition_field": partitionField,
128+
"preview": previewRows,
129+
"preview_fields": previewFields,
119130
}),
120131
Labels: md.Labels,
121132
},
@@ -170,6 +181,48 @@ func (e *Extractor) buildColumn(ctx context.Context, field *bigquery.FieldSchema
170181
return
171182
}
172183

184+
func (e *Extractor) buildPreview(ctx context.Context, t *bigquery.Table) (fields []interface{}, rows []interface{}, err error) {
185+
rows = []interface{}{}
186+
fields = []interface{}{}
187+
188+
ri := t.Read(ctx)
189+
totalRows := 0
190+
for totalRows < previewTotalRows {
191+
var row []bigquery.Value
192+
err = ri.Next(&row)
193+
if err == iterator.Done {
194+
break
195+
}
196+
if err != nil {
197+
return
198+
}
199+
200+
// populate row fields once
201+
if len(fields) < 1 {
202+
for _, schema := range ri.Schema {
203+
fields = append(fields, schema.Name)
204+
}
205+
}
206+
207+
rows = append(rows, row)
208+
totalRows++
209+
}
210+
211+
// this preview will be stored on Properties.Attributes which is a proto struct
212+
// we need to totally change []bigquery.Value to []interface{}
213+
// to prevent error when mapping this preview to Properties facets
214+
jsonBytes, err := json.Marshal(rows)
215+
if err != nil {
216+
return
217+
}
218+
err = json.Unmarshal(jsonBytes, &rows)
219+
if err != nil {
220+
return
221+
}
222+
223+
return
224+
}
225+
173226
func (e *Extractor) getColumnProfile(ctx context.Context, col *bigquery.FieldSchema, tm *bigquery.TableMetadata) (cp *facets.ColumnProfile, err error) {
174227
if col.Type == bigquery.BytesFieldType || col.Repeated || col.Type == bigquery.RecordFieldType {
175228
e.logger.Info("Skip profiling " + col.Name + " column")

0 commit comments

Comments
 (0)