Conversation
|
Claude finished @julienrbrt's task in 2m 40s —— View job Code Review: feat(pkg/da): support celestia-appTasks
OverviewThis PR introduces a clean architectural refactoring by extracting a Critical Issues1. The return datypes.ResultSubmit{
BaseResult: datypes.BaseResult{
Code: datypes.StatusError,
Message: "Submit not implemented: requires transaction signing...",
},
}This means celestia-app is a read-only DA client. Nodes configured as aggregators ( The PR title says "support celestia-app" but submission — the core operation of a DA layer for a sequencer — is missing. This should either be implemented (requires signer integration) or the factory should hard-fail if an aggregator tries to force 2. var opts SubmitOptions
if len(options) > 0 {
// Options would be parsed here if needed
}The High Priority Issues3. The previous 4. // Default to node client if detection fails
return ClientTypeNodeIf the address is a valid celestia-app endpoint that's temporarily unreachable during startup, the factory silently falls back to creating a node client, which will then fail differently. This can make misconfiguration very hard to debug. Consider logging a warning, or when 5. func createAppClient(cfg Config) datypes.BlobClient {
return daapp.NewClient(daapp.Config{
RPCAddress: cfg.Address,
Logger: cfg.Logger,
DefaultTimeout: 0, // Use default ← intentionally drops the user's setting
})
}The configured timeout in Medium Issues6. if strings.Contains(err.Error(), "height") && strings.Contains(err.Error(), "future") { ... }
if strings.Contains(err.Error(), "is not available, lowest height is") { ... }CometBFT error messages are not part of its stable API. These patterns could break silently on CometBFT version upgrades, causing 7. txBytes, err := base64.StdEncoding.DecodeString(tx)
if err != nil {
// Try raw bytes if not base64
txBytes = []byte(tx)
}When 8. height, commitment := SplitID(id)
if commitment == nil {
continue // silently skips
}The local Low Priority / Nitpicks9. if cfg.RPCAddress == "" {
return nil
}Returning 10. blockTime := time.Now()
if blockResult.Block.Header.Time != "" {
if t, err := time.Parse(time.RFC3339Nano, blockResult.Block.Header.Time); err == nil {The inner 11. Both 12. The test in What's Well Done
|
|
The latest Buf updates on your PR. Results from workflow CI / buf-check (pull_request).
|
Summary of ChangesHello @julienrbrt, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly enhances the Data Availability (DA) layer by introducing a new client for direct communication with Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Changelog
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request introduces support for celestia-app as a Data Availability (DA) layer alongside the existing celestia-node support. It refactors the DA client logic to use a unified BlobClient interface, allowing for different underlying implementations. While the architectural direction is sound, the current implementation contains several critical regressions in the celestia-node (formerly jsonrpc) client, specifically regarding error mapping, result metadata, and placeholder implementations for proof verification. Additionally, the celestia-app client is missing blob submission functionality, which is a significant feature gap given the PR's objective.
| proofs := make([]datypes.Proof, len(ids)) | ||
| for i, id := range ids { | ||
| height, commitment := SplitID(id) | ||
| if commitment == nil { | ||
| return nil, nil | ||
| } | ||
|
|
||
| proof, err := c.Blob.GetProof(ctx, height, ns, commitment) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
|
|
||
| // Serialize proof - for now just use the proof as-is | ||
| // In a real implementation, you'd marshal the proof properly | ||
| proofs[i] = []byte{} // Placeholder | ||
| _ = proof | ||
| } |
| func (c *Client) Validate(ctx context.Context, ids []datypes.ID, proofs []datypes.Proof, namespace []byte) ([]bool, error) { | ||
| if len(ids) != len(proofs) { | ||
| return nil, nil | ||
| } | ||
|
|
||
| if len(ids) == 0 { | ||
| return []bool{}, nil | ||
| } | ||
|
|
||
| ns, err := libshare.NewNamespaceFromBytes(namespace) | ||
| if err != nil { | ||
| return nil, err | ||
| } | ||
|
|
||
| results := make([]bool, len(ids)) | ||
| for i, id := range ids { | ||
| height, commitment := SplitID(id) | ||
| if commitment == nil { | ||
| continue | ||
| } | ||
|
|
||
| // Deserialize proof - placeholder | ||
| var proof *Proof | ||
| _ = proofs[i] // Would unmarshal here | ||
|
|
||
| included, err := c.Blob.Included(ctx, height, ns, proof, commitment) | ||
| if err != nil { | ||
| results[i] = false | ||
| } else { | ||
| results[i] = included | ||
| } | ||
| } | ||
|
|
||
| return results, nil | ||
| } |
| height, err := c.Blob.Submit(ctx, blobs, &opts) | ||
| if err != nil { | ||
| return datypes.ResultSubmit{ | ||
| BaseResult: datypes.BaseResult{ | ||
| Code: datypes.StatusError, | ||
| Message: "failed to submit blobs: " + err.Error(), | ||
| }, | ||
| } | ||
| } |
There was a problem hiding this comment.
The new implementation of Submit in the node client has lost the detailed error mapping that existed in the previous version (e.g., mapping to StatusTooBig, StatusAlreadyInMempool, etc.). It now returns a generic StatusError for all failures, which reduces the ability of the node to react appropriately to specific DA layer conditions.
| if len(ids) != len(proofs) { | ||
| return nil, nil | ||
| } |
There was a problem hiding this comment.
If the number of IDs and proofs do not match, the function returns nil, nil. This violates the interface expectation and may lead to nil pointer dereferences in callers. It should return an explicit error.
| if len(ids) != len(proofs) { | |
| return nil, nil | |
| } | |
| if len(ids) != len(proofs) { | |
| return nil, fmt.Errorf("mismatched IDs and proofs length: %d != %d", len(ids), len(proofs)) | |
| } |
| var _ datypes.BlobClient = (*Client)(nil) | ||
|
|
||
| // Submit submits blobs to the DA layer via celestia-node. | ||
| func (c *Client) Submit(ctx context.Context, data [][]byte, gasPrice float64, namespace []byte, options []byte) datypes.ResultSubmit { |
| func (c *Client) Submit(ctx context.Context, data [][]byte, _ float64, namespace []byte, options []byte) datypes.ResultSubmit { | ||
| // Calculate blob size | ||
| var blobSize uint64 | ||
| for _, b := range data { | ||
| blobSize += uint64(len(b)) | ||
| } | ||
|
|
||
| // Validate namespace | ||
| ns, err := share.NewNamespaceFromBytes(namespace) | ||
| if err != nil { | ||
| return datypes.ResultSubmit{ | ||
| BaseResult: datypes.BaseResult{ | ||
| Code: datypes.StatusError, | ||
| Message: fmt.Sprintf("invalid namespace: %v", err), | ||
| }, | ||
| } | ||
| } | ||
|
|
||
| // Check blob sizes | ||
| for i, raw := range data { | ||
| if uint64(len(raw)) > defaultMaxBlobSize { | ||
| return datypes.ResultSubmit{ | ||
| BaseResult: datypes.BaseResult{ | ||
| Code: datypes.StatusTooBig, | ||
| Message: datypes.ErrBlobSizeOverLimit.Error(), | ||
| }, | ||
| } | ||
| } | ||
| // Validate blob data | ||
| if len(raw) == 0 { | ||
| return datypes.ResultSubmit{ | ||
| BaseResult: datypes.BaseResult{ | ||
| Code: datypes.StatusError, | ||
| Message: fmt.Sprintf("blob %d is empty", i), | ||
| }, | ||
| } | ||
| } | ||
| } | ||
|
|
||
| // TODO: Implement actual blob submission | ||
| // This requires: | ||
| // 1. Creating a MsgPayForBlobs transaction | ||
| // 2. Signing the transaction | ||
| // 3. Broadcasting via /broadcast_tx_commit or /broadcast_tx_sync | ||
| // | ||
| // For now, return an error indicating this needs to be implemented | ||
| // with proper transaction signing infrastructure. | ||
| c.logger.Error(). | ||
| Int("blob_count", len(data)). | ||
| Str("namespace", ns.String()). | ||
| Msg("Submit not implemented - requires transaction signing infrastructure") | ||
|
|
||
| return datypes.ResultSubmit{ | ||
| BaseResult: datypes.BaseResult{ | ||
| Code: datypes.StatusError, | ||
| Message: "Submit not implemented: requires transaction signing. Use celestia-node client for submission or implement signer integration", | ||
| SubmittedCount: 0, | ||
| Height: 0, | ||
| Timestamp: time.Now(), | ||
| BlobSize: blobSize, | ||
| }, | ||
| } | ||
| } |
| if strings.Contains(err.Error(), "height") && strings.Contains(err.Error(), "future") { | ||
| return datypes.ResultRetrieve{ | ||
| BaseResult: datypes.BaseResult{ | ||
| Code: datypes.StatusHeightFromFuture, | ||
| Message: datypes.ErrHeightFromFuture.Error(), | ||
| Height: height, | ||
| Timestamp: time.Now(), | ||
| }, | ||
| } | ||
| } |
| func (c *Client) Get(ctx context.Context, ids []datypes.ID, namespace []byte) ([]datypes.Blob, error) { | ||
| if len(ids) == 0 { | ||
| return nil, nil | ||
| } | ||
|
|
||
| ns, err := share.NewNamespaceFromBytes(namespace) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("invalid namespace: %w", err) | ||
| } | ||
|
|
||
| // Group IDs by height for efficient fetching | ||
| blobsByHeight := make(map[uint64][]datypes.ID) | ||
| for _, id := range ids { | ||
| height, _, err := datypes.SplitID(id) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("invalid blob id: %w", err) | ||
| } | ||
| blobsByHeight[height] = append(blobsByHeight[height], id) | ||
| } | ||
|
|
||
| var result []datypes.Blob | ||
| for height, heightIDs := range blobsByHeight { | ||
| // Fetch block at height | ||
| retrieveResult := c.Retrieve(ctx, height, ns.Bytes()) | ||
| if retrieveResult.Code != datypes.StatusSuccess { | ||
| continue | ||
| } | ||
|
|
||
| // Match retrieved blobs with requested IDs | ||
| for i, blobID := range retrieveResult.IDs { | ||
| for _, requestedID := range heightIDs { | ||
| if bytes.Equal(blobID, requestedID) && i < len(retrieveResult.Data) { | ||
| result = append(result, retrieveResult.Data[i]) | ||
| break | ||
| } | ||
| } | ||
| } | ||
| } | ||
|
|
||
| return result, nil | ||
| } |
There was a problem hiding this comment.
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #3095 +/- ##
==========================================
- Coverage 60.92% 60.45% -0.48%
==========================================
Files 113 115 +2
Lines 11617 11901 +284
==========================================
+ Hits 7078 7195 +117
- Misses 3741 3913 +172
+ Partials 798 793 -5
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
Add direct support to celestia-app alongside celestia-node.