طرح‌چه

ایندکس داده ها با استفاده از Debezium بر روی Typesense برای ایجاد سرویس جستجوی سریع | CDC pipeline

۲ شهریور ۱۴۰۴

ایندکس داده ها با استفاده از Debezium بر روی Typesense برای ایجاد سرویس جستجوی سریع | CDC pipeline
با استفاده از Debezium تغییرات اطلاعات رو از دیتابیس اصلی می خوانیم و بر روی RabbitMQ قرار می دهیم و در نهایت یک سرویس indexer ایجاد می کنیم تا مسیج هایی که روی RabbitMQ قرار گرفته اند را پردازش و اطلاعات را بر روی Typesense ایندکس کند.

در این قسمت از یک سرویس پیش ساخته کوچک برای مدیریت وبلاگ استفاده می کنیم و با استفاده از CDC یک سرچ سریع بر روی این پروژه پیاده سازی میکنیم.

معرفی اولیه

سرویس ما یک وبلاگ ساده است که قابلیت ایجاد, حذف و ویرایش محتوا را به ما میدهد. این پروژه از Golang به عنوان زبان بکند و از PostgreSQL به عنوان دیتابیس اصلی استفاده میکند. ما با استفاده از CDC میتوانیم تغییرات اطلاعات را از جدول blog بخوانیم و این تغییرات را بر روی دیتابیس typesense ذخیره کنیم.

سورس کد نهایی پروژه را میتوانید در گیت هاب ببینید و دانلود کنید. برای اجرای پروژه ابتدا با استفاده از ترمینال به پوشه پروژه بروید و سپس با استفاده از Docker با اجرای دستور زیر , پروژه را اجرا کنید:

docker compose up -d

و در نهایت میتوانید پروژه را با استفاده از دستور زیر خاموش کنید:

docker compose down --remove-orphans

اگر با داکر آشنا نیستید یا اون رو نصب نکردید بهتره که ابتدا داکر را نصب کنید و همچنین فرصت خوبی خواهد بود تا کمی با داکر آشنا شوید.

و اگر روی سیستم makefile دارید میتونید درون پوشه پروژه دستور های زیر را برای راه اندازی و خاموش کردن سرویس ها استفاده کنید:

// راه اندازی پروژه
make up

// خاموش کردن سرویس ها
make down

معماری پروژه

قبل از شروع به پیاده سازی  ساختار نهایی چیزی که میخواهیم داشته باشیم رو بررسی کنیم.

ما در مجموع از ابزاری زیر استفاده خواهیم کرد تا پروژه خودمون رو بسازیم:

  • از PostgreSQL به عنوان دیتابیس اصلی برای ذخیره سازی و نگهداری محتوا استفاده میکنیم.

  • از دیتابیس Typesense برای ایندکس کردن full text محتوا به منظور استفاده در سرچ استفاده می کنیم. بدین صورت اپلیکیشن ما میتونه خیلی سریع محتوای مورد نظر را جستجو کند و نتایج را برای کاربرهای اپلیکیشن ما فراهم کند.

  • از RabbitMQ به عنوان صف یا queue استفاده می کنیم. بدین صورت تغییراتی که روی دیتابیس اصلی انجام می شوند را درون این صف (queue) قرار می دهیم و سپس با پردازش مسیج های درون صف, اطلاعات را روی Typsense بروز رسانی می کنیم. 

    بدین صورت بدون اینکه فشاری به دیتابیس اصلی ما وارد شود تمامی اطلاعات را روی Typesense به صورت آنی ایندکس خواهیم کرد.

  • از Debezium برای خواندن لاگ های PostgreSQL و تبدیل لاگ ها به مسیج بر روی RabbitMQ استفاده میکنیم. بدین صورت تغییراتی که بر روی اطلاعات در PostgreSQL انجام می دهیم به صورت مسیج هایی درون RabbitMQ ذخیره میشوند.

  • یک اپلیکیشن indexer میسازیم که مسیج های نوشته شده بر روی RabbitMQ را بخواند و بر اساس این مسیج ها اطلاعات را بر روی Typsense ایندکس و آپدیت کند.

  • اپلیکیشن اصلی ما که وظایف آن به دو قسمت اصلی تقسیم میشوند:

    • عملکرد های مرتبط با دشبورد برای مدیریت پست های بلاگ که اطلاعات را بر روی دیتابیس اصلی (در اینجا PostgreSQL) ذخیره و بروز رسانی میکند.

    • عملکردهای عمومی از جمله صفحه اصلی, ادامه مطلب پست ها و جستجو درون مطالب که اطلاعات را از Typesense میخوانند و نمایش میدهند.

شکل کلی اپلیکیشن ما به صورت زیر خواهد بود:

Debezium CDC pipeline
معماری نهایی پروژه

راه اندازی Debezium بر روی PostgreSQL

برای راه اندازی Debezium بر روی Postgres طبق مستندات نیازمند انجام مراحل زیر هستیم: 

  • فعالسازی WAL (write ahead log) در PostgreSQL
  • اجرا کردن Debezium

فعالسازی WAL در PostgreSQL

برای اینکار میتوانیم یک فایل با پسوند conf در مسیر /etc/postgresql/ بسازیم. مثلا من این فایل با نام postgresql.conf به صورت /etc/postgresql/postgresql.conf میسازم.

سپس تنظیمات زیر را درون آن قرار میدهیم:

wal_level = logical
max_wal_senders = 4 # or more, depending on parallel replication needs
max_replication_slots = 4 # at least one per Debezium connector

# Logging
log_destination = 'stderr'
logging_collector = on
log_directory = 'log'
log_filename = 'postgresql-%Y-%m-%d_%H%M%S.log'
log_rotation_age = 1d
log_rotation_size = 100MB

# Enable CDC logging
log_statement = 'all'
log_min_duration_statement = 1000

اجرا کردن Debezium

ما برای اجرا کردن debezium از debezium-server استفاده میکنیم و طبق مستندات تنظیمات را برای اتصال به RabbitMQ و خواندن لاگ های PostgreSQL انجام میدهیم. تمامی این تنظیمات را میتوان با استفاده از متغیرهای محیطی نیز انجام داد. ما متغیرهای محیطی زیر را به debezium-server پاس میدهیم:

# postgres related configurations
DEBEZIUM_SOURCE_CONNECTOR_CLASS: io.debezium.connector.postgresql.PostgresConnector
DEBEZIUM_SOURCE_DATABASE_HOSTNAME: postgres
DEBEZIUM_SOURCE_DATABASE_PORT: 5432
DEBEZIUM_SOURCE_DATABASE_USER: bloguser
DEBEZIUM_SOURCE_DATABASE_PASSWORD: blogpass
DEBEZIUM_SOURCE_DATABASE_DBNAME: blog
DEBEZIUM_SOURCE_TOPIC_PREFIX: blog
DEBEZIUM_SOURCE_SCHEMA_INCLUDE_LIST: public
DEBEZIUM_SOURCE_PLUGIN_NAME: pgoutput
DEBEZIUM_SOURCE_SLOT_NAME: dbz_slot
DEBEZIUM_SOURCE_PUBLICATION_AUTOCREATE_MODE: filtered
DEBEZIUM_SOURCE_OFFSET_STORAGE_FILE_FILENAME: /tmp/offsets.dat
DEBEZIUM_SOURCE_OFFSET_FLUSH_INTERVAL_MS: 1000

# rabbitmq related configurations
DEBEZIUM_SINK_TYPE: rabbitmq
DEBEZIUM_SINK_RABBITMQ_CONNECTION_HOST: rabbitmq
DEBEZIUM_SINK_RABBITMQ_CONNECTION_PORT: 5672
DEBEZIUM_SINK_RABBITMQ_CONNECTION_USERNAME: admin
DEBEZIUM_SINK_RABBITMQ_CONNECTION_PASSWORD: admin123
DEBEZIUM_SINK_RABBITMQ_CONNECTION_VIRTUAL_HOST: /
DEBEZIUM_SINK_RABBITMQ_ACK_TIMEOUT: 30000
DEBEZIUM_SINK_RABBITMQ_EXCHANGE: debezium
DEBEZIUM_SINK_RABBITMQ_ROUTING_KEY: blog.posts
DEBEZIUM_SINK_RABBITMQ_AUTO_CREATE_ROUTING_KEY: true
DEBEZIUM_SINK_RABBITMQ_ROUTING_KEY_DURABLE: true

بعد از اجرا شدن Debezium باید بتوانیم پیام هایی که به RabbitMQ اجرا می شوند را در دشبورد RabbitMQ ببینیم.

article figures

در نمودار بالا میتونیم ببینیم که در یک بازه زمانی کوتاه 60 مسیج در ثانیه به RabbitMQ ارسال شده اند. در ادامه یک سرویس ایجاد میکنیم که این مسیج ها پردازش کند و نتیجه را بر روی Typesense ایندکس کند.

نکته: در صورتی که از پروژه ی ایجاد شده در گیت هاب ما استفاده میکنید, بعد از اجرای پروژه میتونید به آدرس http://localhost:15672/ برید و با یوزرنیم admin و پسوورد admin123 وارد شوید.

ایجاد سرویس Indexer

این سرویس مسئول پردازش مسیج هایی است که Debezium بر روی RabbitMQ مینویسد. پردازش هر مسیج شامل ایجاد , ویرایش یا حذف ایندکس می شود.

  •  اگر کاربر محتوای جدیدی ایجاد کند آن محتوا به صورت مسیج بر روی RabbitMQ قرار میگیرد و سپس سرویس indexer با پردازش این مسیج یک ایندکس جدید بر روی Typesense ایجاد میکند.

  • اگر کاربر محتوای فعلی را ویرایش کند این ویرایش به صورت یک مسیج بر روی RabbitMQ قرار میگیرد و سپس سرویس ایندکسر با پردازش این مسیج , محتوای ایندکس شده ی فعلی را بر روی Typesense بروز رسانی میکند.

  • و در نهایت در صورت حذف محتوا , با پردازش مسیج قرار گرفته بر روی RabbitMQ ایندکس متناظر با آن مسیج از روی typesense حذف می شوند.

// StartCDC starts the CDC pipeline
func (s *CDCService) StartCDC(ctx context.Context, queueName string) error {
	log.Printf("Starting CDC service, listening to queue: %s", queueName)

	// Connect to RabbitMQ
	if err := s.messageQueue.Connect(); err != nil {
		return fmt.Errorf("failed to connect to RabbitMQ: %w", err)
	}
	defer s.messageQueue.Close()

	// Connect to Typesense√
	if err := s.searchIndex.Connect(); err != nil {
		return fmt.Errorf("failed to connect to Typesense: %w", err)
	}
	defer s.searchIndex.Close()

	// Ensure the posts collection exists
	if err := s.ensurePostsCollection(); err != nil {
		return fmt.Errorf("failed to ensure posts collection: %w", err)
	}

	// Start consuming messages
	return s.messageQueue.ConsumeMessages(queueName, s.handleMessage)
}

// handleMessage processes individual messages from RabbitMQ
func (s *CDCService) handleMessage(message []byte) error {
	// Parse the CDC event (try Debezium format first, fallback to original format)
	event, err := domain.FromDebeziumJSON(message)
	if err != nil {
		log.Printf("Failed to parse CDC event: %v", err)
		return err
	}

	// Validate the event
	if !event.IsValid() {
		log.Printf("Invalid CDC event: %+v", event)
		return fmt.Errorf("invalid CDC event")
	}

	// Only process posts table events
	if event.Table != "posts" {
		log.Printf("Skipping non-posts table event: %s.%s", event.Database, event.Table)
		return nil
	}

	// Process the event based on its type
	switch event.Type {
	case domain.EventTypeInsert, domain.EventTypeUpdate, domain.EventTypeBootstrapInsert:
		return s.handleUpsert(event)
	case domain.EventTypeDelete:
		return s.handleDelete(event)
	case domain.EventTypeBootstrapStart:
		log.Printf("Bootstrap insert event received")
		return nil
	case domain.EventTypeBootstrapComplete:
		log.Printf("Bootstrap complete event received")
		return nil
	default:
		log.Printf("Unknown event type: %s", event.Type)
		return fmt.Errorf("unknown event type: %s", event.Type)
	}
}

// convertTimestamps converts nanosecond timestamps to Unix timestamps in the event data
func (s *CDCService) convertTimestamps(data map[string]interface{}) {
	// Convert created_at if present
	if createdVal, ok := data["created_at"]; ok {
		switch v := createdVal.(type) {
		case float64:
			if v > 1e12 { // If greater than 1 trillion, likely nanoseconds
				// Use time package to convert nanoseconds to Unix timestamp
				t := time.Unix(0, int64(v))
				data["created_at"] = t.Unix() // Extract Unix timestamp in seconds
			}
		case int64:
			if v > 1e12 { // If greater than 1 trillion, likely nanoseconds
				// Use time package to convert nanoseconds to Unix timestamp
				t := time.Unix(0, v)
				data["created_at"] = t.Unix() // Extract Unix timestamp in seconds
			}
		case int:
			if v > 1e12 { // If greater than 1 trillion, likely nanoseconds
				// Use time package to convert nanoseconds to Unix timestamp
				t := time.Unix(0, int64(v))
				data["created_at"] = t.Unix() // Extract Unix timestamp in seconds
			}
		}
	}

	// Convert updated_at if present
	if updatedVal, ok := data["updated_at"]; ok {
		switch v := updatedVal.(type) {
		case float64:
			if v > 1e12 { // If greater than 1 trillion, likely nanoseconds
				// Use time package to convert nanoseconds to Unix timestamp
				t := time.Unix(0, int64(v))
				data["updated_at"] = t.Unix() // Extract Unix timestamp in seconds
			}
		case int64:
			if v > 1e12 { // If greater than 1 trillion, likely nanoseconds
				// Use time package to convert nanoseconds to Unix timestamp
				t := time.Unix(0, v)
				data["updated_at"] = t.Unix() // Extract Unix timestamp in seconds
			}
		case int:
			if v > 1e12 { // If greater than 1 trillion, likely nanoseconds
				// Use time package to convert nanoseconds to Unix timestamp
				t := time.Unix(0, int64(v))
				data["updated_at"] = t.Unix() // Extract Unix timestamp in seconds
			}
		}
	}
}

// handleUpsert processes insert, update, and bootstrap events
func (s *CDCService) handleUpsert(event *domain.CDCEvent) error {
	eventType := event.Type
	log.Printf("Processing %s event for post ID: %v", eventType, event.Data["id"])

	// Convert timestamps from nanoseconds to Unix timestamps before processing
	s.convertTimestamps(event.Data)

	// Create search document from the event data
	doc, err := domain.NewSearchDocumentFromMap(event.Data)
	if err != nil {
		log.Printf("Failed to create search document: %v", err)
		return err
	}

	// Upsert the document to Typesense
	collectionName := "posts"
	if err := s.searchIndex.UpsertDocument(collectionName, doc); err != nil {
		log.Printf("Failed to upsert document to Typesense: %v", err)
		return err
	}

	log.Printf("Successfully indexed post ID: %s", doc.ID)
	return nil
}

// handleDelete processes delete events
func (s *CDCService) handleDelete(event *domain.CDCEvent) error {
	log.Printf("Processing delete event for post ID: %v", event.Data["id"])

	// Get the ID from the event data
	id, exists := event.GetID()
	if !exists {
		log.Printf("Failed to extract ID from delete event")
		return fmt.Errorf("failed to extract ID from delete event")
	}

	// Delete the document from Typesense
	collectionName := "posts"
	documentID := fmt.Sprintf("%d", id)
	if err := s.searchIndex.DeleteDocument(collectionName, documentID); err != nil {
		log.Printf("Failed to delete document from Typesense: %v", err)
		return err
	}

	log.Printf("Successfully removed post ID: %d from index", id)
	return nil
}

// ensurePostsCollection ensures the posts collection exists in Typesense
func (s *CDCService) ensurePostsCollection() error {
	schema := map[string]interface{}{
		"name": "posts",
		"fields": []map[string]interface{}{
			{"name": "id", "type": "string"},
			{"name": "title", "type": "string", "facet": false, "index": true},
			{"name": "image", "type": "string", "optional": true, "facet": false, "index": false},
			{"name": "excerpt", "type": "string", "optional": true, "facet": false, "index": true},
			{"name": "body", "type": "string", "facet": false, "index": true},
			{"name": "created_at", "type": "int64", "facet": false, "index": true},
			{"name": "updated_at", "type": "int64", "facet": false, "index": false},
		},
		"default_sorting_field": "created_at",
	}

	// Try to create the collection first
	err := s.searchIndex.CreateCollection(schema)
	if err != nil {
		// If collection already exists, we need to ensure it has the right schema
		// For now, we'll just log this and continue
		// In a production environment, you might want to handle schema updates
		log.Printf("Collection creation failed (might already exist): %v", err)
	}

	// Note: In Typesense, when you upsert a document with the same ID field value,
	// it will replace the existing document. The id field is used as the document
	// identifier to ensure proper updates.

	return nil
}

با توجه به کدهای بالا ما بر حسب نوع مسیجی که در RabbitMQ قرار گرفته عملیات ایجاد , ویرایش یا حذف ایندکس را در Typesense انجام می دهیم. کدهای کامل را میتوانید درون گیتهاب ببینید.

با اجرا کردن سرویس indexer باید داده ها درون Typesense ایندکس شوند و نتیجه را میتوان درون دشبورد typesense دید.

typesense dashboard
دشبورد typesense برای دیدن اطلاعات ایندکس شده.

برای مثال در تصویر میبینیم که 290 آیتم در کالکشن posts ایندکس شده اند.

سپس اپلیکیشن اصلی ما میتواند در بین اطلاعات ایندکس شده جستجو انجام دهد و نتایج را به کاربران نمایش دهد:

article figures

نکته: در صورتی که از پروژه ی ایجاد شده در گیت هاب ما استفاده میکنید, بعد از اجرای پروژه میتونید به آدرس http://localhost:8085/ برید تا سرویس وبلاگ رو ببینید.

در تصویر بالا درون سرچ هرچیزی را که جستجو کنیم درصورت وجود داشتن, لیست مطالب مرتبط به ما نمایش داده خواهد شد.


قسمت قبل: ایندکس داده ها با استفاده از Maxwell بر روی Typesense برای ایجاد سرویس جستجوی سریع | CDC pipeline


دیدگاه ها