Skip to content

Commit cf8cd92

Browse files
craig[bot]Yevgeniy Miretskiynvanbenschotenandreimatei
committed
45269: importccl: Parallelize avro import r=miretskiy a=miretskiy Parallelize avro importer to improve its throughput (2.8x improvement). Touches #40374. Fixes #45097. Release notes (performance): Faster avro import 45482: storage: integrate Concurrency Manager into Replica request path r=nvanbenschoten a=nvanbenschoten Related to #41720. Related to #44976. This commit integrates the new concurrency package into the storage package. Each Replica is given a concurrency manager, which replaces its existing latch manager and txn wait queue. The change also uses the concurrency manager to simplify the role of the intent resolver. The intent resolver no longer directly handles WriteIntentErrors. As a result, we are able to delete the contention queue entirely. With this change, all requests are now sequenced through the concurrency manager. When sequencing, latches are acquired and conflicting locks are detected. If any locks are found, the requests wait in lock wait-queues for the locks to be resolved. This is a major deviation from how things currently work because today, even with the contention queue, requests end up waiting for conflicting transactions to commit/abort in the txnWaitQueue after at least one RPC. Now, requests wait directly next to the intents/locks that they are waiting on and react directly to the resolution of these intents/locks. Once requests are sequenced by the concurrency manager, they are theoretically fully isolated from all conflicting requests. However, this is not strictly true today because we have not yet pulled all replicated locks into the concurrency manager's lock table. We will do so in a future change. Until then, the concurrency manager maintains a notion of "intent discovery", which is integrated into the Replica-level concurrency retry loop. Performance numbers will be published shortly. This will be followed by performance numbers using the SELECT FOR UPDATE locking (#40205) improvements that this change enables. 45484: sql: simplify connection state machine - stop tracking retry intent r=andreimatei a=andreimatei Before this patch, the SQL connection state machine had an optimization: if a transaction that hadn't used "SAVEPOINT cockroach_restart" encountered a retriable error that we can't auto-retry, then we'd release the txn's locks eagerly and enter the Aborted state. As opposed to transactions that had used the "SAVEPOINT cockroach_restart", which go to RestartWait. This optimization is a significant complication for the state machine, so this patch is removing it. All transactions now go to RestartWait, and wait for a ROLLBACK to release the locks. On the flip side, doing "RELEASE SAVEPOINT cockroach_restart" and "ROLLBACK SAVEPOINT cockroach_restart" now works even for transactions that haven't explicitly declared that savepoint, which is nice. Although I don't promise I'll keep it working. Release note: None Co-authored-by: Yevgeniy Miretskiy <[email protected]> Co-authored-by: Nathan VanBenschoten <[email protected]> Co-authored-by: Andrei Matei <[email protected]>
4 parents ec05f0a + 7bd8e7d + 1899ad4 + 198e4e7 commit cf8cd92

File tree

90 files changed

+2145
-2700
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

90 files changed

+2145
-2700
lines changed

pkg/ccl/importccl/import_processor.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -150,9 +150,12 @@ func makeInputConverter(
150150
case roachpb.IOFileFormat_PgDump:
151151
return newPgDumpReader(ctx, kvCh, spec.Format.PgDump, spec.Tables, evalCtx)
152152
case roachpb.IOFileFormat_Avro:
153-
return newAvroInputReader(ctx, kvCh, singleTable, spec.Format.Avro, evalCtx)
153+
return newAvroInputReader(
154+
kvCh, singleTable, spec.Format.Avro, spec.WalltimeNanos,
155+
int(spec.ReaderParallelism), evalCtx)
154156
default:
155-
return nil, errors.Errorf("Requested IMPORT format (%d) not supported by this node", spec.Format.Format)
157+
return nil, errors.Errorf(
158+
"Requested IMPORT format (%d) not supported by this node", spec.Format.Format)
156159
}
157160
}
158161

pkg/ccl/importccl/import_processor_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -577,7 +577,7 @@ func TestCSVImportCanBeResumed(t *testing.T) {
577577
defer leaktest.AfterTest(t)()
578578
defer setImportReaderParallelism(1)()
579579
const batchSize = 5
580-
defer TestingSetCsvInputReaderBatchSize(batchSize)()
580+
defer TestingSetParallelImporterReaderBatchSize(batchSize)()
581581
defer row.TestingSetDatumRowConverterBatchSize(2 * batchSize)()
582582
jobs.DefaultAdoptInterval = 100 * time.Millisecond
583583

@@ -683,7 +683,7 @@ func TestCSVImportCanBeResumed(t *testing.T) {
683683
func TestCSVImportMarksFilesFullyProcessed(t *testing.T) {
684684
defer leaktest.AfterTest(t)()
685685
const batchSize = 5
686-
defer TestingSetCsvInputReaderBatchSize(batchSize)()
686+
defer TestingSetParallelImporterReaderBatchSize(batchSize)()
687687
defer row.TestingSetDatumRowConverterBatchSize(2 * batchSize)()
688688
jobs.DefaultAdoptInterval = 100 * time.Millisecond
689689

pkg/ccl/importccl/import_stmt_test.go

Lines changed: 58 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ import (
1818
"net/http/httptest"
1919
"net/url"
2020
"path/filepath"
21-
"runtime"
2221
"strings"
2322
"sync"
2423
"testing"
@@ -53,7 +52,6 @@ import (
5352
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
5453
"github.com/pkg/errors"
5554
"github.com/stretchr/testify/require"
56-
"golang.org/x/sync/errgroup"
5755
)
5856

5957
func TestImportData(t *testing.T) {
@@ -2123,9 +2121,8 @@ func TestImportIntoCSV(t *testing.T) {
21232121
sqlDB.Exec(t, "INSERT INTO t (a, b) VALUES ($1, $2)", i, v)
21242122
}
21252123

2126-
stripFilenameQuotes := strings.ReplaceAll(testFiles.files[0][1:len(testFiles.files[0])-1], "?", "\\?")
21272124
sqlDB.ExpectErr(
2128-
t, fmt.Sprintf("%s: row 1: expected 3 fields, got 2", stripFilenameQuotes),
2125+
t, "row 1: expected 3 fields, got 2",
21292126
fmt.Sprintf(`IMPORT INTO t (a, b, c) CSV DATA (%s)`, testFiles.files[0]),
21302127
)
21312128
})
@@ -2136,9 +2133,8 @@ func TestImportIntoCSV(t *testing.T) {
21362133
sqlDB.Exec(t, `CREATE TABLE t (a INT)`)
21372134
defer sqlDB.Exec(t, `DROP TABLE t`)
21382135

2139-
stripFilenameQuotes := strings.ReplaceAll(testFiles.files[0][1:len(testFiles.files[0])-1], "?", "\\?")
21402136
sqlDB.ExpectErr(
2141-
t, fmt.Sprintf("%s: row 1: expected 1 fields, got 2", stripFilenameQuotes),
2137+
t, "row 1: expected 1 fields, got 2",
21422138
fmt.Sprintf(`IMPORT INTO t (a) CSV DATA (%s)`, testFiles.files[0]),
21432139
)
21442140
})
@@ -2374,6 +2370,46 @@ func BenchmarkImport(b *testing.B) {
23742370
))
23752371
}
23762372

2373+
// a importRowProducer implementation that returns 'n' rows.
2374+
type csvBenchmarkStream struct {
2375+
n int
2376+
pos int
2377+
data [][]string
2378+
}
2379+
2380+
func (s *csvBenchmarkStream) Progress() float32 {
2381+
return float32(s.pos) / float32(s.n)
2382+
}
2383+
2384+
func (s *csvBenchmarkStream) Scan() bool {
2385+
s.pos++
2386+
return s.pos <= s.n
2387+
}
2388+
2389+
func (s *csvBenchmarkStream) Err() error {
2390+
return nil
2391+
}
2392+
2393+
func (s *csvBenchmarkStream) Skip() error {
2394+
return nil
2395+
}
2396+
2397+
func (s *csvBenchmarkStream) Row() (interface{}, error) {
2398+
return s.data[s.pos%len(s.data)], nil
2399+
}
2400+
2401+
var _ importRowProducer = &csvBenchmarkStream{}
2402+
2403+
// BenchmarkConvertRecord-16 1000000 2107 ns/op 56.94 MB/s 3600 B/op 101 allocs/op
2404+
// BenchmarkConvertRecord-16 500000 2106 ns/op 56.97 MB/s 3606 B/op 101 allocs/op
2405+
// BenchmarkConvertRecord-16 500000 2100 ns/op 57.14 MB/s 3606 B/op 101 allocs/op
2406+
// BenchmarkConvertRecord-16 500000 2286 ns/op 52.49 MB/s 3606 B/op 101 allocs/op
2407+
// BenchmarkConvertRecord-16 500000 2378 ns/op 50.46 MB/s 3606 B/op 101 allocs/op
2408+
// BenchmarkConvertRecord-16 500000 2427 ns/op 49.43 MB/s 3606 B/op 101 allocs/op
2409+
// BenchmarkConvertRecord-16 500000 2399 ns/op 50.02 MB/s 3606 B/op 101 allocs/op
2410+
// BenchmarkConvertRecord-16 500000 2365 ns/op 50.73 MB/s 3606 B/op 101 allocs/op
2411+
// BenchmarkConvertRecord-16 500000 2376 ns/op 50.49 MB/s 3606 B/op 101 allocs/op
2412+
// BenchmarkConvertRecord-16 500000 2390 ns/op 50.20 MB/s 3606 B/op 101 allocs/op
23772413
func BenchmarkConvertRecord(b *testing.B) {
23782414
ctx := context.TODO()
23792415

@@ -2429,58 +2465,31 @@ func BenchmarkConvertRecord(b *testing.B) {
24292465
if err != nil {
24302466
b.Fatal(err)
24312467
}
2432-
recordCh := make(chan csvRecord)
2433-
kvCh := make(chan row.KVBatch)
2434-
group := errgroup.Group{}
24352468

2469+
kvCh := make(chan row.KVBatch)
24362470
// no-op drain kvs channel.
24372471
go func() {
24382472
for range kvCh {
24392473
}
24402474
}()
24412475

2442-
c := &csvInputReader{
2476+
descr := tableDesc.TableDesc()
2477+
importCtx := &parallelImportContext{
24432478
evalCtx: &evalCtx,
2479+
tableDesc: descr,
24442480
kvCh: kvCh,
2445-
recordCh: recordCh,
2446-
tableDesc: tableDesc.TableDesc(),
2447-
}
2448-
// start up workers.
2449-
numWorkers := runtime.NumCPU()
2450-
for i := 0; i < numWorkers; i++ {
2451-
workerID := i
2452-
group.Go(func() error {
2453-
return c.convertRecordWorker(ctx, workerID)
2454-
})
2455-
}
2456-
const batchSize = 500
2457-
2458-
minEmitted := make([]int64, numWorkers)
2459-
batch := csvRecord{
2460-
file: "some/path/to/some/file/of/csv/data.tbl",
2461-
rowOffset: 1,
2462-
r: make([][]string, 0, batchSize),
2463-
minEmitted: &minEmitted,
24642481
}
24652482

2466-
b.ResetTimer()
2467-
for i := 0; i < b.N; i++ {
2468-
if len(batch.r) > batchSize {
2469-
recordCh <- batch
2470-
batch.r = make([][]string, 0, batchSize)
2471-
batch.rowOffset = int64(i)
2472-
minEmitted = make([]int64, numWorkers)
2473-
}
2474-
2475-
batch.r = append(batch.r, tpchLineItemDataRows[i%len(tpchLineItemDataRows)])
2476-
}
2477-
recordCh <- batch
2478-
close(recordCh)
2479-
2480-
if err := group.Wait(); err != nil {
2481-
b.Fatal(err)
2483+
producer := &csvBenchmarkStream{
2484+
n: b.N,
2485+
pos: 0,
2486+
data: tpchLineItemDataRows,
24822487
}
2488+
consumer := &csvRowConsumer{importCtx: importCtx, opts: &roachpb.CSVOptions{}}
2489+
b.ResetTimer()
2490+
require.NoError(b, runParallelImport(ctx, importCtx, &importFileContext{}, producer, consumer))
24832491
close(kvCh)
2492+
b.ReportAllocs()
24842493
}
24852494

24862495
// TestImportControlJob tests that PAUSE JOB, RESUME JOB, and CANCEL JOB
@@ -3556,8 +3565,11 @@ func TestImportAvro(t *testing.T) {
35563565
},
35573566
}
35583567

3559-
for _, test := range tests {
3568+
for i, test := range tests {
35603569
t.Run(test.name, func(t *testing.T) {
3570+
// Play a bit with producer/consumer batch sizes.
3571+
defer TestingSetParallelImporterReaderBatchSize(13 * i)()
3572+
35613573
_, err := sqlDB.DB.ExecContext(context.Background(), `DROP TABLE IF EXISTS simple CASCADE`)
35623574
require.NoError(t, err)
35633575

0 commit comments

Comments
 (0)